RocketMQ实现分布式事务的原理
- 半消息(Half Message):
- 生产者首先向RocketMQ发送半消息,这类消息对消费者不可见。例如,在订单创建与库存扣减场景中,订单服务先向RocketMQ发送一条表示要创建订单的半消息。
- 半消息的作用是预占一个消息位置,为后续的事务处理做准备。
- 本地事务执行:
- 发送半消息成功后,生产者执行本地事务,比如订单服务在本地数据库中插入订单记录。
- 事务状态回查:
- RocketMQ会定时回查生产者本地事务状态。如果生产者在执行本地事务时因为某些原因(如网络抖动)未能及时向RocketMQ反馈事务状态,RocketMQ会主动询问。
- 生产者根据本地事务执行结果向RocketMQ返回事务状态(提交、回滚或未知)。如果是提交,消息就会对消费者可见;如果是回滚,消息会被删除;如果是未知,RocketMQ会继续回查。
- 消息消费:
- 当消息对消费者可见后,消费者(如库存服务)消费消息并执行相应的业务逻辑,比如扣减库存。
可能出现的一致性问题及应对策略
- 消息丢失:
- 问题描述:在事务执行过程中,半消息可能因为网络问题等原因丢失,导致后续事务无法正常进行。
- 应对策略:
- RocketMQ本身有高可用机制,通过多副本(Master - Slave结构)保证消息不丢失。如果Master节点出现故障,Slave节点可以接替工作。
- 生产者在发送半消息时,可以设置重试机制,确保消息发送成功。
- 事务状态回查失败:
- 问题描述:RocketMQ回查生产者本地事务状态时,可能因为生产者服务不可用等原因导致回查失败,无法确定事务最终状态。
- 应对策略:
- 生产者服务需要保证高可用性,例如采用集群部署方式。
- 可以在本地事务执行时,记录详细的日志,以便在回查时能够准确判断事务状态。
- 消息重复消费:
- 问题描述:消费者在消费消息时,可能因为网络等原因导致消费确认未及时返回,RocketMQ会重新投递消息,造成重复消费。
- 应对策略:
- 消费者业务逻辑要实现幂等性。例如,在库存扣减时,可以先查询库存是否已经扣减过,避免重复扣减。
- 可以使用数据库的唯一约束,在执行库存扣减操作前,通过数据库层面保证操作的唯一性。
分布式电商系统中订单创建与库存扣减一致性的详细设计方案
- 系统架构:
- 订单服务:负责接收用户下单请求,创建订单并向RocketMQ发送半消息。执行本地订单创建事务后,向RocketMQ反馈事务状态。
- 库存服务:监听RocketMQ消息,消费订单创建成功的消息后执行库存扣减操作。
- RocketMQ:负责存储半消息,回查事务状态,以及向库存服务投递消息。
- 流程设计:
- 用户下单,订单服务接收到请求。
- 订单服务向RocketMQ发送创建订单的半消息。
- 订单服务执行本地事务,在本地数据库中插入订单记录。
- 订单服务向RocketMQ反馈事务状态。
- RocketMQ根据事务状态处理消息,如果是提交,将消息发送给库存服务。
- 库存服务消费消息,查询库存是否足够,足够则扣减库存,更新库存数据库。
核心代码片段
- 订单服务发送半消息及处理事务:
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.*;
public class OrderService {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));
producer.setExecutorService(executorService);
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地订单创建事务
try {
// 模拟订单创建操作,如插入数据库
System.out.println("执行本地订单创建事务");
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务状态回查
System.out.println("事务状态回查");
// 根据本地事务记录判断事务状态
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message message = new Message("transaction_topic", "创建订单".getBytes());
producer.sendMessageInTransaction(message, null);
// 防止主线程退出
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
}
- 库存服务消费消息扣减库存:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class StockService {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("stock_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("transaction_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 模拟库存扣减操作
System.out.println("收到消息,执行库存扣减操作:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("库存服务消费者启动");
}
}