MST

星途 面试题库

面试题:消息队列RocketMQ与Spring Boot集成下的事务消息处理

假设你正在开发一个电商系统,使用RocketMQ与Spring Boot集成。在订单创建成功后需要发送消息通知库存系统扣减库存,要求保证订单创建和库存扣减这两个操作的原子性,即要么都成功,要么都失败。请详细描述如何利用RocketMQ的事务消息机制来实现该需求,包括代码实现思路和可能遇到的问题及解决方案。
14.7万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

代码实现思路

  1. 配置RocketMQ事务生产者:在Spring Boot项目中配置RocketMQ事务生产者,例如在application.propertiesapplication.yml中配置RocketMQ的相关参数,包括名称服务器地址等。
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService;

    @Autowired
    private StockService stockService;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 订单创建
            orderService.createOrder();
            // 库存扣减
            stockService.deductStock();
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        try {
            // 检查订单状态
            boolean orderCreated = orderService.checkOrderCreated();
            if (orderCreated) {
                // 检查库存扣减状态
                boolean stockDeducted = stockService.checkStockDeducted();
                if (stockDeducted) {
                    return RocketMQLocalTransactionState.COMMIT;
                } else {
                    try {
                        // 尝试补偿库存扣减
                        stockService.deductStock();
                        return RocketMQLocalTransactionState.COMMIT;
                    } catch (Exception e) {
                        return RocketMQLocalTransactionState.ROLLBACK;
                    }
                }
            } else {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}
  1. 发送事务消息:在订单创建成功的逻辑中,发送事务消息。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void createOrder() {
        // 订单创建逻辑

        Message<String> message = MessageBuilder.withPayload("order_create_success").build();
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("order_topic", message, null);
        // 根据发送结果进行相应处理
    }
}
  1. 事务监听器:实现RocketMQLocalTransactionListener接口,在executeLocalTransaction方法中执行订单创建和库存扣减逻辑,并根据执行结果返回提交或回滚状态。在checkLocalTransaction方法中检查本地事务状态,进行补偿操作。

可能遇到的问题及解决方案

  1. 事务消息发送失败
    • 原因:网络问题、RocketMQ服务器故障等。
    • 解决方案:增加重试机制,例如使用Spring Retry框架,对发送事务消息的操作进行重试。同时记录发送失败的日志,方便排查问题。
  2. 本地事务执行失败
    • 原因:数据库操作异常、业务逻辑错误等。
    • 解决方案:在executeLocalTransaction方法中捕获异常,返回RocketMQLocalTransactionState.ROLLBACK状态,确保消息不会被提交。同时记录详细的异常日志,便于定位问题。
  3. 事务状态检查失败
    • 原因:检查逻辑错误、数据不一致等。
    • 解决方案:在checkLocalTransaction方法中,确保检查逻辑的准确性和完整性。可以通过数据库事务日志或状态标识来准确判断本地事务状态。如果检查失败,进行适当的补偿操作或回滚操作,并记录日志。
  4. 消息重复消费
    • 原因:由于网络波动、RocketMQ内部机制等,可能导致消息被重复消费。
    • 解决方案:在库存系统消费消息时,增加幂等性处理逻辑。例如,使用数据库唯一约束、状态标识等方式,确保相同的库存扣减操作不会重复执行。