步骤
- 开启事务支持:在MongoDB 4.0及以上版本,要在客户端代码中明确开启事务支持。例如在Node.js中使用
mongodb
驱动:
const { MongoClient } = require('mongodb');
const uri = "mongodb://localhost:27017";
const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true });
async function run() {
try {
await client.connect();
const session = client.startSession();
session.startTransaction();
// 后续操作
await session.commitTransaction();
} catch (e) {
console.error(e);
await session.abortTransaction();
} finally {
await client.close();
}
}
run().catch(console.dir);
- 创建Change Streams:可以在集合或数据库级别创建Change Streams。以集合级别为例,在Node.js中:
const collection = client.db('yourDB').collection('yourCollection');
const changeStream = collection.watch();
changeStream.on('change', (change) => {
console.log('Data change detected:', change);
// 在这里进行协同操作
});
- 执行协同操作:在Change Streams捕获到数据变化(
change
事件触发)后,根据变化类型(如insert
、update
、delete
)执行相应的协同操作。例如,如果是insert
操作,可以向其他系统发送通知:
changeStream.on('change', (change) => {
if (change.operationType === 'insert') {
const newDocument = change.fullDocument;
// 发送通知逻辑,例如调用外部API
}
});
关键注意点
- 版本要求:确保MongoDB版本在4.0及以上,因为事务和Change Streams的完整功能组合从这个版本开始支持。
- 事务边界:Change Streams捕获的数据变化必须在事务的生命周期内才会被正确处理。确保事务开启后创建Change Streams,并在事务结束前处理相关变化。
- 数据一致性:虽然Change Streams提供实时数据变化捕获,但在事务环境下,要注意数据一致性。例如,在事务未提交前,Change Streams可能捕获到中间状态,需根据业务需求处理。
- 资源管理:Change Streams持续监听数据变化,可能消耗一定资源。合理配置监听时间间隔、资源限制等,避免对数据库性能造成过大影响。
- 错误处理:在事务和Change Streams操作过程中,要妥善处理各种可能的错误,如网络故障、数据库异常等。对于事务,要及时
abortTransaction
;对于Change Streams,要处理连接错误等异常情况。