RocketMQ消息重试机制底层实现
- 消息重新投递
- 生产者重试:生产者发送消息失败时,默认会重试2次(可通过
sendMsgTimeout
等参数配置)。如果是网络闪断等临时性故障,重试机制能确保消息成功发送到Broker。例如,在发送消息过程中,网络短暂抖动导致连接中断,生产者会在一定时间间隔后重新尝试发送,直至达到重试次数上限。
- 消费者重试:消费者消费消息失败时,RocketMQ会将消息重新投递回原队列。对于顺序消息,消费者集群消费模式下,消费失败后消息会一直重试,直到消费成功,且重试期间不会投递后续消息;广播消费模式下,消费失败的消息不会重试。对于非顺序消息,消费者消费失败后,消息会进入到该消费者组对应的重试队列(
%RETRY%+consumerGroup
),RocketMQ会根据重试策略重新投递。
- 重试逻辑在代码层面实现
- 生产者代码层面:在
DefaultMQProducer
类的sendDefaultImpl
方法中,通过for
循环实现重试逻辑。如下代码片段(简化示意):
for (int i = 0; i < maxReconsumeTimes; i++) {
try {
return communicationMode.sendCallBack(..);
} catch (MQClientException e) {
if (i < maxReconsumeTimes - 1) {
// 等待一定时间后重试
Thread.sleep(sleepTime);
} else {
throw e;
}
}
}
- 消费者代码层面:以
DefaultMQPushConsumer
为例,在MessageListenerConcurrently
的实现类中,消费消息的逻辑在consumeMessage
方法内。当消费返回ConsumeConcurrentlyStatus.RECONSUME_LATER
时,消息会被标记为消费失败,进入重试流程。在RebalancePushImpl
类的sendMessageBack
方法中,会将消息发回重试队列。例如:
private void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException {
// 构建回发的消息
Message newMsg = new Message(MixAll.getRetryTopic(this.consumerGroup), msg.getBody());
// 设置相关属性
newMsg.setFlag(msg.getFlag());
newMsg.setProperties(msg.getProperties());
newMsg.setDelayTimeLevel(delayLevel);
// 发送回重试队列
this.mQClientFactory.getMQClientAPIImpl().sendMessage(..);
}
高并发、海量消息场景下优化策略及理由
- 优化策略
- 调整重试次数和间隔:适当增加重试次数,但避免过多重试占用大量资源。同时,动态调整重试间隔,例如采用指数退避算法,初始间隔短,随着重试次数增加,间隔逐渐变长。如初始间隔1秒,每次重试间隔翻倍,这样可以避免短时间内大量重试请求集中发送,减轻Broker和网络压力。
- 重试队列分区:对重试队列进行分区,根据消息的某些特征(如消息ID的哈希值)将不同消息分配到不同分区,提高并发处理能力。这样可以并行处理不同分区的重试消息,减少消息在重试队列中的积压。
- 异步重试:采用异步方式进行消息重试,避免阻塞正常消息的消费流程。例如,使用线程池来处理重试任务,当消费失败时,将重试任务提交到线程池,主线程继续处理后续消息,提高整体的消费效率。
- 理由
- 调整重试次数和间隔:在高并发海量消息场景下,临时性故障可能更频繁,适当增加重试次数有助于消息成功消费。采用指数退避算法调整间隔,可以避免短时间内大量重试请求对系统造成冲击,保证系统的稳定性。
- 重试队列分区:高并发场景下,重试队列可能成为瓶颈。通过分区,可以将重试消息的处理分散,提高并发处理能力,减少消息积压,从而加快重试消息的处理速度。
- 异步重试:高并发场景下,消息消费速度要求快。采用异步重试可以避免重试过程阻塞正常消息消费,提高整个系统的消息处理吞吐量,保证系统在海量消息情况下仍能高效运行。