MST

星途 面试题库

面试题:消息队列RocketMQ在Spring Boot集成中的消息发送重试机制

在RocketMQ与Spring Boot集成的项目里,若消息发送失败,RocketMQ默认的重试机制是怎样的?如果需要自定义重试策略,该如何实现?
23.2万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

RocketMQ默认重试机制

  1. 生产者重试
    • RocketMQ生产者发送消息失败时,默认会重试2次(不包括第一次发送,总共最多尝试3次)。
    • 重试间隔时间会逐渐增长,第一次重试间隔1000ms,第二次重试间隔2000ms。
    • 重试逻辑是在同一个Broker的同一个MessageQueue上进行,如果当前Broker不可用,会尝试其他Broker的相同MessageQueue。
  2. 消费者重试
    • 当消费者消费消息失败时,RocketMQ会自动进行重试。
    • 对于集群消费模式,默认重试16次。每次重试间隔时间按照一定的阶梯增长,第一次重试间隔10s,第二次20s,第三次40s,依此类推,最长间隔为2h。
    • 对于广播消费模式,消费失败不会重试,因为广播消费强调每个消费者都消费一次,不保证消费成功。

自定义重试策略实现

  1. 生产者自定义重试策略
    • 实现MQSendHook接口
      import org.apache.rocketmq.client.producer.SendCallback;
      import org.apache.rocketmq.client.producer.SendResult;
      import org.apache.rocketmq.client.producer.SendStatus;
      import org.apache.rocketmq.client.producer.TransactionSendResult;
      import org.apache.rocketmq.common.message.Message;
      import org.apache.rocketmq.common.message.MessageConst;
      import org.apache.rocketmq.common.message.MessageQueue;
      import org.apache.rocketmq.client.hook.MQSendHook;
      
      public class CustomProducerSendHook implements MQSendHook {
          @Override
          public String hookName() {
              return "CustomProducerSendHook";
          }
      
          @Override
          public void sendMessageBefore(final Message msg, final MessageQueue mq, final String brokerAddr) {
              // 发送前的操作,例如记录日志等
          }
      
          @Override
          public void sendMessageAfter(final SendResult sendResult, final Message msg, final MessageQueue mq,
                                       final String brokerAddr) {
              // 发送成功后的操作,例如记录日志等
          }
      
          @Override
          public void onException(final Throwable e, final Message msg, final MessageQueue mq, final String brokerAddr) {
              // 发送异常时的操作,例如自定义重试逻辑
          }
      
          @Override
          public void onTransaction(final TransactionSendResult sendResult, final Message msg, final MessageQueue mq,
                                    final String brokerAddr) {
              // 事务消息相关操作
          }
      }
      
    • 注册Hook:在Spring Boot配置文件或配置类中注册自定义的MQSendHook
      import org.apache.rocketmq.client.producer.DefaultMQProducer;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      @Configuration
      public class RocketMQConfig {
          @Bean
          public DefaultMQProducer producer() {
              DefaultMQProducer producer = new DefaultMQProducer("producer_group");
              producer.setNamesrvAddr("127.0.0.1:9876");
              producer.registerSendMessageHook(new CustomProducerSendHook());
              return producer;
          }
      }
      
  2. 消费者自定义重试策略
    • 实现MessageListenerConcurrently接口
      import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
      import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
      import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
      import org.apache.rocketmq.common.message.MessageExt;
      
      import java.util.List;
      
      public class CustomMessageListener implements MessageListenerConcurrently {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
              for (MessageExt msg : msgs) {
                  try {
                      // 消息处理逻辑
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  } catch (Exception e) {
                      // 自定义重试逻辑
                      Integer reconsumeTimes = msg.getReconsumeTimes();
                      if (reconsumeTimes < 3) { // 例如重试3次
                          return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                      } else {
                          // 超过重试次数的处理,例如记录到死信队列等
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                      }
                  }
              }
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      }
      
    • 注册监听器:在Spring Boot配置文件或配置类中注册自定义的消息监听器。
      import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      @Configuration
      public class RocketMQConsumerConfig {
          @Bean
          public DefaultMQPushConsumer consumer() throws Exception {
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
              consumer.setNamesrvAddr("127.0.0.1:9876");
              consumer.subscribe("topic_name", "*");
              consumer.registerMessageListener(new CustomMessageListener());
              return consumer;
          }
      }