- 关键概念
- 消息唯一标识:为每条消息生成一个唯一的标识,在RocketMQ中,Producer在发送消息时可以为消息设置
keys
字段,这个 keys
通常可作为消息的唯一标识。例如订单号等具有业务唯一性的字段。
- 去重表:在业务系统中维护一张去重表,表中记录已经处理过的消息的唯一标识。当新消息到达时,先查询去重表,如果该消息标识已存在,表示该消息已处理过,不再重复处理。
- 具体实现方法
- 使用
TransactionMQProducer
:
- 发送半消息:通过
TransactionMQProducer
发送半消息(Half Message),半消息是指暂不能被Consumer消费的消息。例如:
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TopicTest",
"TagA",
"OrderID001",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
- **执行本地事务**:发送半消息成功后,RocketMQ会回调 `executeLocalTransaction` 方法来执行本地事务。在这个方法中,根据业务逻辑执行本地事务,并根据事务执行结果返回相应的状态(`LocalTransactionState.COMMIT_MESSAGE`、`LocalTransactionState.ROLLBACK_MESSAGE`、`LocalTransactionState.UNKNOW`)。例如:
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地业务逻辑,如更新数据库等
// 假设业务执行成功
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态,返回相应的状态
return LocalTransactionState.COMMIT_MESSAGE;
}
});
- **事务状态回查**:如果RocketMQ没有收到本地事务执行状态(返回 `LocalTransactionState.UNKNOW`),会通过 `checkLocalTransaction` 方法回调来检查本地事务状态,确保消息最终的一致性和幂等性。
- 基于业务端实现幂等:
- 生成唯一消息标识:在生产者端为每个消息生成唯一标识,如使用UUID或业务中的唯一键(如订单号)。
- 消息发送与去重:发送消息前,先查询去重表。如果去重表中不存在该消息标识,则发送消息,并在发送成功后将该标识插入去重表;如果去重表中已存在该标识,则不再发送消息,认为该消息已被处理过。例如在数据库的订单表中,可增加一个字段用于记录消息处理状态,每次处理订单消息时,先查询该订单是否已处理,若已处理则跳过。