1. 跨服务事务一致性处理
- 两阶段提交(2PC):
- 协调者(通常是发起事务的服务)首先向所有涉及事务的服务发送准备(Prepare)请求。每个服务执行事务的预操作,如检查数据完整性、锁定资源等。如果预操作成功,服务回复协调者“准备好”;否则回复“失败”。
- 若协调者收到所有服务的“准备好”回复,进入提交(Commit)阶段,向所有服务发送提交请求,各服务完成实际的事务操作。若有任何一个服务回复“失败”,协调者进入回滚(Rollback)阶段,向所有服务发送回滚请求,各服务撤销预操作。
- 基于事件溯源和CQRS(命令查询职责分离):
- 事件溯源:每个服务记录所有对数据的更改作为事件,存储在事件日志中。这些事件包含了事务发生的完整历史信息。
- CQRS:命令(Command)用于处理事务操作,更新数据并生成事件;查询(Query)从事件日志中重建数据状态以提供查询结果。不同服务通过事件总线进行通信,当一个服务完成事务操作并生成事件后,发布到事件总线,其他相关服务订阅这些事件并基于事件更新自身数据状态,从而保证跨服务的数据一致性。
2. 事务性能优化以适应分布式系统
- 减少事务范围:
- 尽量将事务限制在单个服务内部,避免不必要的跨服务事务。如果可能,对业务逻辑进行拆分,使得每个服务的事务操作尽可能独立,减少服务间的依赖和事务协调开销。
- 例如,将订单创建和库存扣减设计为两个独立的服务,通过异步消息队列在订单创建成功后通知库存服务进行扣减,这样就避免了订单服务和库存服务之间的紧密事务耦合,提高系统的性能和可扩展性。
- 异步处理:
- 对于一些非关键的事务操作,采用异步处理方式。例如,在用户注册成功后,发送欢迎邮件的操作可以通过消息队列异步处理,而不是将其包含在用户注册的事务中,从而减少事务的执行时间,提高系统的响应速度。
- 使用异步任务队列(如RabbitMQ、Kafka等),将事务相关的异步任务添加到队列中,由专门的消费者进行处理。这样可以避免事务长时间等待异步操作完成,降低事务阻塞时间。
- 重试机制:
- 由于网络波动等原因,分布式事务中的某些操作可能会失败。引入重试机制,对于因网络问题等导致的临时性失败,在一定时间间隔后自动重试,提高事务成功的概率。
- 例如,设置重试次数和重试间隔时间,当服务间调用失败时,按照设定的策略进行重试。但要注意避免无限重试导致的资源耗尽问题,需要设定合理的重试上限。
3. MongoDB相关特性和技术手段
- 多文档事务:
- MongoDB 4.0 及以上版本支持多文档事务。在分布式微服务架构中,当一个事务需要操作多个集合或文档时,可以利用这一特性。
- 通过
session.startTransaction()
开启事务,对多个文档的操作可以包含在事务块中,最后通过 session.commitTransaction()
提交事务或 session.abortTransaction()
回滚事务。例如:
const { MongoClient } = require('mongodb');
const uri = "mongodb://localhost:27017";
const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true });
async function run() {
try {
await client.connect();
const session = client.startSession();
session.startTransaction();
const database = client.db('test');
const collection1 = database.collection('collection1');
const collection2 = database.collection('collection2');
await collection1.insertOne({ data: 'value1' }, { session });
await collection2.insertOne({ data: 'value2' }, { session });
await session.commitTransaction();
} catch (e) {
console.error(e);
} finally {
await client.close();
}
}
run().catch(console.dir);
- 分布式锁:
- 利用MongoDB的文档原子操作实现分布式锁,确保在分布式环境下同一时间只有一个服务实例能够执行特定的事务操作,避免数据冲突。
- 例如,可以通过插入一个具有唯一索引的文档来获取锁,释放锁时删除该文档。如下代码片段展示了获取锁的操作:
const { MongoClient } = require('mongodb');
const uri = "mongodb://localhost:27017";
const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true });
async function acquireLock() {
try {
await client.connect();
const database = client.db('test');
const lockCollection = database.collection('locks');
const lockDocument = { _id: 'unique_lock_id', locked: true };
const result = await lockCollection.insertOne(lockDocument, { session, ordered: false });
return result.acknowledged;
} catch (e) {
if (e.code === 11000) { // 唯一索引冲突,锁已被占用
return false;
}
console.error(e);
return false;
} finally {
await client.close();
}
}
- 释放锁的操作如下:
async function releaseLock() {
try {
await client.connect();
const database = client.db('test');
const lockCollection = database.collection('locks');
const result = await lockCollection.deleteOne({ _id: 'unique_lock_id' });
return result.acknowledged;
} catch (e) {
console.error(e);
return false;
} finally {
await client.close();
}
}