MST

星途 面试题库

面试题:消息队列RocketMQ客户端API的消息消费重试机制

阐述RocketMQ客户端API中消息消费失败后的重试策略是怎样的?如果需要自定义重试次数和重试间隔,该如何实现?
38.8万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

RocketMQ客户端API消息消费失败后的重试策略

  1. 默认重试策略
    • 当消息消费失败(即ConsumeConcurrentlyStatus.RECONSUME_LATERConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT返回时),RocketMQ客户端会自动进行重试。
    • 对于普通消息,默认重试16次。重试间隔会逐渐变长,按照如下规律:10s, 30s, 1min, 2min, 3min, 4min, 5min, 6min, 7min, 8min, 9min, 10min, 20min, 30min, 1h, 2h。
    • 对于顺序消息,重试间隔固定为5s,重试次数无限制,直到消费成功。
  2. 死信队列(DLQ):如果普通消息重试16次后仍然失败,该消息会被发送到死信队列(DLQ)。死信队列是每个主题默认对应的一个特殊队列,用于存放消费失败的消息,用户可以手动处理死信队列中的消息。

自定义重试次数和重试间隔的实现

  1. 自定义重试次数
    • 消费模式为并发消费时:在DefaultMQPushConsumer中,可以通过setMaxReconsumeTimes(int times)方法来设置最大重试次数。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setMaxReconsumeTimes(5);
  • 消费模式为顺序消费时:顺序消费默认是无限制重试,若要限制重试次数,可在ConsumeOrderlyContext中自定义逻辑,记录重试次数,当达到自定义次数时不再重试。示例代码如下:
consumer.registerMessageListener(new MessageListenerOrderly() {
    private AtomicInteger consumeTimes = new AtomicInteger(0);
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        int times = consumeTimes.incrementAndGet();
        if (times > 5) {
            // 达到自定义重试次数5次,不再重试
            return ConsumeOrderlyStatus.SUCCESS;
        }
        // 消费逻辑
        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
    }
});
  1. 自定义重试间隔
    • 消费模式为并发消费时:目前RocketMQ客户端没有直接提供设置重试间隔的方法。若要自定义重试间隔,一种可行的方式是使用线程池来控制重试逻辑。在消费失败时,使用线程池延迟执行消费任务,延迟时间为自定义的重试间隔。示例代码如下:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            // 正常消费逻辑
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            // 消费失败,自定义重试间隔10s
            long delay = 10 * 1000;
            executorService.schedule(() -> {
                try {
                    // 重新消费逻辑
                    consumeMessage(msgs, context);
                } catch (Exception ex) {
                    // 再次消费失败处理
                }
            }, delay, TimeUnit.MILLISECONDS);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
});
  • 消费模式为顺序消费时:同样可以使用线程池来控制重试间隔,与并发消费类似,在消费失败时,使用线程池按照自定义的重试间隔延迟执行消费任务。

注意:在自定义重试逻辑时,要考虑系统资源的合理使用,避免过多延迟任务导致系统资源耗尽。同时,RocketMQ的重试机制设计旨在保证消息尽可能被成功消费,自定义重试逻辑需要权衡业务需求和系统性能。