面试题答案
一键面试RocketMQ客户端API消息消费失败后的重试策略
- 默认重试策略:
- 当消息消费失败(即
ConsumeConcurrentlyStatus.RECONSUME_LATER
或ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
返回时),RocketMQ客户端会自动进行重试。 - 对于普通消息,默认重试16次。重试间隔会逐渐变长,按照如下规律:10s, 30s, 1min, 2min, 3min, 4min, 5min, 6min, 7min, 8min, 9min, 10min, 20min, 30min, 1h, 2h。
- 对于顺序消息,重试间隔固定为5s,重试次数无限制,直到消费成功。
- 当消息消费失败(即
- 死信队列(DLQ):如果普通消息重试16次后仍然失败,该消息会被发送到死信队列(DLQ)。死信队列是每个主题默认对应的一个特殊队列,用于存放消费失败的消息,用户可以手动处理死信队列中的消息。
自定义重试次数和重试间隔的实现
- 自定义重试次数:
- 消费模式为并发消费时:在
DefaultMQPushConsumer
中,可以通过setMaxReconsumeTimes(int times)
方法来设置最大重试次数。例如:
- 消费模式为并发消费时:在
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setMaxReconsumeTimes(5);
- 消费模式为顺序消费时:顺序消费默认是无限制重试,若要限制重试次数,可在
ConsumeOrderlyContext
中自定义逻辑,记录重试次数,当达到自定义次数时不再重试。示例代码如下:
consumer.registerMessageListener(new MessageListenerOrderly() {
private AtomicInteger consumeTimes = new AtomicInteger(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
int times = consumeTimes.incrementAndGet();
if (times > 5) {
// 达到自定义重试次数5次,不再重试
return ConsumeOrderlyStatus.SUCCESS;
}
// 消费逻辑
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
});
- 自定义重试间隔:
- 消费模式为并发消费时:目前RocketMQ客户端没有直接提供设置重试间隔的方法。若要自定义重试间隔,一种可行的方式是使用线程池来控制重试逻辑。在消费失败时,使用线程池延迟执行消费任务,延迟时间为自定义的重试间隔。示例代码如下:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 正常消费逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 消费失败,自定义重试间隔10s
long delay = 10 * 1000;
executorService.schedule(() -> {
try {
// 重新消费逻辑
consumeMessage(msgs, context);
} catch (Exception ex) {
// 再次消费失败处理
}
}, delay, TimeUnit.MILLISECONDS);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
- 消费模式为顺序消费时:同样可以使用线程池来控制重试间隔,与并发消费类似,在消费失败时,使用线程池按照自定义的重试间隔延迟执行消费任务。
注意:在自定义重试逻辑时,要考虑系统资源的合理使用,避免过多延迟任务导致系统资源耗尽。同时,RocketMQ的重试机制设计旨在保证消息尽可能被成功消费,自定义重试逻辑需要权衡业务需求和系统性能。