1. 可能出现事务不一致的情况分析
1.1 消息发送阶段
- 半消息发送成功,本地事务执行失败:RocketMQ 支持发送半消息(prepare 消息),如果半消息发送成功,但后续本地事务执行失败,可能导致消息被错误提交或回滚。例如在一个电商下单场景中,半消息发送成功表明订单消息已进入MQ,但本地库存扣减事务失败,此时若消息被错误提交,就会出现订单创建但库存未扣减的不一致。
- 半消息发送失败:可能由于网络问题、MQ 服务端故障等原因,半消息未能成功发送到MQ,此时本地事务若已执行,会造成本地事务和消息状态不一致。如在一个转账场景中,本地账户 A 已扣款,但由于半消息发送失败,账户 B 未收到转账消息,导致资金不一致。
1.2 消息存储阶段
- 消息存储过程中出现故障:MQ 服务端在存储消息时可能遇到磁盘故障、网络中断等问题,导致消息存储不完整或丢失。如果此时本地事务已提交,会出现本地有记录但 MQ 中消息缺失,消费者无法消费到该消息,进而导致业务不一致。
1.3 消息消费阶段
- 消息重复消费:由于网络波动、消费者故障重启等原因,MQ 可能会将消息重复发送给消费者。若消费者没有幂等性处理,会导致业务数据重复处理,比如重复下单、重复扣款等。
- 消息消费失败:消费者在处理消息时可能因为业务逻辑错误、依赖服务不可用等原因消费失败。若此时没有合理的重试机制或补偿机制,会导致业务流程无法完整执行,出现事务不一致。
2. 解决方案和实现思路
2.1 消息发送阶段
- 回查机制:当半消息发送成功,本地事务执行失败时,RocketMQ 提供回查机制。MQ 服务端会定期回查本地事务状态。在实现时,需要在本地事务执行处记录事务执行状态(成功/失败),并提供一个接口供 MQ 服务端回查。
- 可靠消息最终一致性方案:引入消息表,在本地数据库中创建消息表,与本地业务事务放在同一个事务中。先插入消息表记录(状态为待发送),本地事务成功后,再将消息表记录状态更新为已发送并发送半消息。如果半消息发送失败,可根据消息表记录重试发送。
2.2 消息存储阶段
- MQ 高可用架构:采用多副本机制,如 RocketMQ 的主从架构,主节点存储消息后,同步给从节点。即使主节点出现故障,从节点可接替工作,保证消息不丢失。
- 持久化策略:合理配置 RocketMQ 的持久化策略,如采用同步双写方式,确保消息在写入磁盘后才返回成功,减少因机器故障导致消息丢失的风险。
2.3 消息消费阶段
- 幂等性处理:在消费者端实现幂等性,例如在业务表中增加唯一索引,每次消费消息时先根据业务唯一标识查询是否已处理过该消息。如果已处理过则直接返回成功,避免重复处理。
- 重试与补偿机制:对于消费失败的消息,RocketMQ 提供重试队列。可设置合理的重试次数和时间间隔,若多次重试仍失败,可通过人工介入或自动补偿机制(如调用补偿接口恢复业务状态)来保证事务一致性。
3. 代码示例
3.1 消息发送(基于 RocketMQ 事务消息)
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.*;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));
producer.setExecutorService(executorService);
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 本地事务执行逻辑
try {
// 模拟本地业务操作,如订单创建、库存扣减等
System.out.println("执行本地事务:" + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务状态逻辑
System.out.println("回查本地事务:" + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message message = new Message("transaction_topic", "Hello RocketMQ".getBytes());
producer.sendMessageInTransaction(message, null);
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
3.2 消息消费(幂等性处理示例)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class IdempotentConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("idempotent_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("transaction_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 根据业务唯一标识判断是否已处理过该消息
String businessKey = msg.getKeys();
boolean isProcessed = checkIfProcessed(businessKey);
if (isProcessed) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
try {
// 处理业务逻辑
System.out.println("消费消息:" + new String(msg.getBody()));
// 记录已处理
markAsProcessed(businessKey);
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者已启动");
}
private static boolean checkIfProcessed(String businessKey) {
// 实际应用中根据业务唯一标识查询数据库判断是否已处理
// 这里简单返回 false 模拟未处理
return false;
}
private static void markAsProcessed(String businessKey) {
// 实际应用中记录已处理到数据库
System.out.println("标记消息已处理:" + businessKey);
}
}