MST

星途 面试题库

面试题:微服务架构下Spring Cloud Alibaba分布式消息队列的可靠性保证

在微服务架构中使用Spring Cloud Alibaba分布式消息队列时,如何保证消息的可靠性?例如,如何确保消息不丢失、不重复消费?请详细阐述相关的技术方案和实现细节。
34.4万 热度难度
后端开发微服务架构

知识考点

AI 面试

面试题答案

一键面试

保证消息不丢失

  1. 生产者端
    • 开启事务:在发送消息前开启事务,发送成功后提交事务,失败则回滚。例如在Spring Boot中,配置DataSourceTransactionManager,在发送消息的业务方法上添加@Transactional注解。
    @Service
    public class MessageProducer {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        @Autowired
        private PlatformTransactionManager transactionManager;
    
        public void sendMessage(String topic, String message) {
            TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
            transactionTemplate.execute(status -> {
                try {
                    rocketMQTemplate.send(topic, MessageBuilder.withPayload(message).build());
                    return TransactionSynchronizationManager.STATUS_COMMITTED;
                } catch (Exception e) {
                    status.setRollbackOnly();
                    return TransactionSynchronizationManager.STATUS_ROLLED_BACK;
                }
            });
        }
    }
    
    • 同步发送并确认:使用同步发送方式,等待MQ返回发送结果确认。例如RocketMQTemplatesyncSend方法。
    SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());
    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
        // 处理发送失败逻辑,如重试
    }
    
    • 使用消息轨迹:RocketMQ提供消息轨迹功能,可记录消息从生产者到消费者的全链路过程,方便排查消息丢失问题。在生产者配置中开启消息轨迹:
    rocketmq:
      producer:
        group: my-producer-group
        enable-message-trace: true
        customize-trace-topic: my-trace-topic
    
  2. MQ Broker端
    • 刷盘策略:采用同步刷盘策略,确保消息写入磁盘后才返回成功响应给生产者。在RocketMQ的broker.conf配置文件中设置:
    flushDiskType = SYNC_FLUSH
    
    • 高可用配置:搭建主从集群,采用2M - 2S模式(2个Master和2个Slave),Master将消息同步给Slave,提高消息存储的可靠性。
  3. 消费者端
    • 手动确认消费:采用手动ACK模式,消费者处理完消息后,向MQ发送确认消息。例如在RocketMQ的消费者配置中:
    @RocketMQMessageListener(topic = "my - topic", consumerGroup = "my - consumer - group",ackMode = AckMode.MANUAL)
    @Component
    public class MyConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            try {
                // 处理消息逻辑
                System.out.println("Received message: " + message);
                // 手动确认消费
                RocketMQPushConsumerLocalContext context = RocketMQPushConsumerLocalContext.get();
                context.getAckCallback().ack();
            } catch (Exception e) {
                // 处理异常,可选择重试或记录日志等
                RocketMQPushConsumerLocalContext context = RocketMQPushConsumerLocalContext.get();
                context.getAckCallback().nack();
            }
        }
    }
    

保证消息不重复消费

  1. 消费者幂等性处理
    • 数据库唯一约束:如果消息处理逻辑涉及数据库操作,对关键业务字段添加唯一约束。例如,订单表中的订单编号设为唯一,当重复消息处理同一订单时,数据库会抛出唯一约束冲突异常,消费者捕获异常并忽略,认为消息已处理成功。
    • 状态机控制:为消息处理对象设计状态机。以订单为例,订单状态有“未支付”“已支付”“已发货”等,当处理支付消息时,只有订单状态为“未支付”时才进行支付操作并更新状态为“已支付”,重复收到支付消息时,若订单状态已是“已支付”,则直接忽略。
    • 消息唯一标识:生产者为每条消息生成唯一ID,消费者收到消息后,先根据唯一ID查询本地记录(如Redis)判断是否已处理过该消息。例如:
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void onMessage(String message) {
        String messageId = getMessageIdFromMessage(message);
        if (stringRedisTemplate.hasKey(messageId)) {
            // 已处理过,忽略
            return;
        }
        try {
            // 处理消息逻辑
            stringRedisTemplate.opsForValue().set(messageId, "processed", 3600, TimeUnit.SECONDS);
        } catch (Exception e) {
            // 处理异常
        }
    }
    
  2. MQ层面保证
    • 使用事务消息:生产者发送事务消息,MQ保证消息要么成功写入且被消费者处理,要么不写入,从而避免重复消费。在RocketMQ中,生产者发送事务消息示例:
    rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), null);
    
    • 合理配置消息重试机制:避免因重试次数过多导致消息重复消费。在消费者配置中合理设置最大重试次数,如:
    rocketmq:
      consumer:
        group: my - consumer - group
        max - retry - times: 3