MST

星途 面试题库

面试题:消息队列之RocketMQ幂等性 - 业务场景下的挑战与应对

假设你负责一个电商订单系统,使用RocketMQ进行消息传递。在订单创建和支付确认这两个关键流程中,可能会由于网络波动等原因导致消息重复消费,进而造成订单重复创建或重复支付。请详细阐述你会如何结合RocketMQ的特性,设计一套完整的幂等性解决方案来避免这些问题。
15.8万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

1. 数据库层面实现幂等

  • 唯一索引
    • 在订单表中,针对订单号等业务唯一标识字段创建唯一索引。例如,订单表order有字段order_id(订单号),通过SQL语句CREATE UNIQUE INDEX idx_order_id ON order (order_id);创建唯一索引。
    • 当消息重复消费创建订单时,由于唯一索引的存在,重复插入相同order_id的订单数据会抛出唯一约束异常,应用层捕获该异常并忽略,从而保证订单不会重复创建。
  • 状态机控制
    • 订单表中添加order_status字段表示订单状态,如0表示未支付,1表示已支付等。
    • 当处理支付确认消息时,通过SQL语句UPDATE order SET order_status = 1 WHERE order_id = 'xxx' AND order_status = 0,只有当订单状态为未支付时才更新为已支付状态。如果消息重复消费,再次执行该语句时,由于订单状态已经是1,不会产生实际更新操作,保证了幂等性。

2. RocketMQ 消息特性结合实现幂等

  • 消息唯一标识
    • 在发送消息时,为每条消息设置唯一的msgId。例如,在Java中使用RocketMQ的DefaultMQProducer发送消息时:
    Message msg = new Message("TopicTest",
        "TagA",
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    msg.setKeys("uniqueMsgKey" + i); // 设置唯一标识
    SendResult sendResult = producer.send(msg);
    
    • 在消费端,利用msg.getKeys()获取唯一标识。通过在本地缓存(如Guava Cache)或数据库中记录已处理消息的唯一标识。每次处理消息前,先检查该唯一标识是否已处理过。如果已处理过,则直接返回,不进行实际业务处理。
  • 消费端幂等处理逻辑
    • 采用本地事务结合消息确认机制。以订单支付为例,在消费端处理支付确认消息时:
    public class OrderPaymentConsumer implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                String uniqueKey = msg.getKeys();
                // 检查本地缓存或数据库是否已处理该消息
                if (isMessageProcessed(uniqueKey)) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                try {
                    // 开始本地事务
                    boolean result = processOrderPayment(msg);
                    if (result) {
                        // 本地事务成功,标记消息已处理
                        markMessageProcessed(uniqueKey);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } else {
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                } catch (Exception e) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
    

3. 分布式缓存实现幂等

  • 使用Redis
    • 在消费消息前,先根据消息的唯一标识(如订单号)在Redis中设置一个分布式锁。例如,使用Jedis客户端:
    Jedis jedis = new Jedis("localhost", 6379);
    String uniqueKey = "order:payment:" + orderId;
    String lockValue = UUID.randomUUID().toString();
    boolean isLockAcquired = jedis.set(uniqueKey, lockValue, "NX", "EX", 10) != null;
    if (isLockAcquired) {
        try {
            // 处理订单支付业务逻辑
            processOrderPayment();
        } finally {
            // 释放锁
            if (lockValue.equals(jedis.get(uniqueKey))) {
                jedis.del(uniqueKey);
            }
        }
    } else {
        // 锁已被占用,说明消息可能重复消费,直接返回
        return;
    }
    
    • 这样,即使消息重复消费,由于分布式锁的存在,只有一个线程能进入业务处理逻辑,保证了幂等性。