代码实现思路
- 配置RocketMQ事务生产者:在Spring Boot项目中配置RocketMQ事务生产者,例如在
application.properties
或application.yml
中配置RocketMQ的相关参数,包括名称服务器地址等。
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private StockService stockService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 订单创建
orderService.createOrder();
// 库存扣减
stockService.deductStock();
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
try {
// 检查订单状态
boolean orderCreated = orderService.checkOrderCreated();
if (orderCreated) {
// 检查库存扣减状态
boolean stockDeducted = stockService.checkStockDeducted();
if (stockDeducted) {
return RocketMQLocalTransactionState.COMMIT;
} else {
try {
// 尝试补偿库存扣减
stockService.deductStock();
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
- 发送事务消息:在订单创建成功的逻辑中,发送事务消息。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void createOrder() {
// 订单创建逻辑
Message<String> message = MessageBuilder.withPayload("order_create_success").build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("order_topic", message, null);
// 根据发送结果进行相应处理
}
}
- 事务监听器:实现
RocketMQLocalTransactionListener
接口,在executeLocalTransaction
方法中执行订单创建和库存扣减逻辑,并根据执行结果返回提交或回滚状态。在checkLocalTransaction
方法中检查本地事务状态,进行补偿操作。
可能遇到的问题及解决方案
- 事务消息发送失败:
- 原因:网络问题、RocketMQ服务器故障等。
- 解决方案:增加重试机制,例如使用Spring Retry框架,对发送事务消息的操作进行重试。同时记录发送失败的日志,方便排查问题。
- 本地事务执行失败:
- 原因:数据库操作异常、业务逻辑错误等。
- 解决方案:在
executeLocalTransaction
方法中捕获异常,返回RocketMQLocalTransactionState.ROLLBACK
状态,确保消息不会被提交。同时记录详细的异常日志,便于定位问题。
- 事务状态检查失败:
- 原因:检查逻辑错误、数据不一致等。
- 解决方案:在
checkLocalTransaction
方法中,确保检查逻辑的准确性和完整性。可以通过数据库事务日志或状态标识来准确判断本地事务状态。如果检查失败,进行适当的补偿操作或回滚操作,并记录日志。
- 消息重复消费:
- 原因:由于网络波动、RocketMQ内部机制等,可能导致消息被重复消费。
- 解决方案:在库存系统消费消息时,增加幂等性处理逻辑。例如,使用数据库唯一约束、状态标识等方式,确保相同的库存扣减操作不会重复执行。