面试题答案
一键面试RocketMQ默认重试机制
- 生产者重试:
- RocketMQ生产者发送消息失败时,默认会重试2次(不包括第一次发送,总共最多尝试3次)。
- 重试间隔时间会逐渐增长,第一次重试间隔1000ms,第二次重试间隔2000ms。
- 重试逻辑是在同一个Broker的同一个MessageQueue上进行,如果当前Broker不可用,会尝试其他Broker的相同MessageQueue。
- 消费者重试:
- 当消费者消费消息失败时,RocketMQ会自动进行重试。
- 对于集群消费模式,默认重试16次。每次重试间隔时间按照一定的阶梯增长,第一次重试间隔10s,第二次20s,第三次40s,依此类推,最长间隔为2h。
- 对于广播消费模式,消费失败不会重试,因为广播消费强调每个消费者都消费一次,不保证消费成功。
自定义重试策略实现
- 生产者自定义重试策略:
- 实现MQSendHook接口:
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.client.hook.MQSendHook; public class CustomProducerSendHook implements MQSendHook { @Override public String hookName() { return "CustomProducerSendHook"; } @Override public void sendMessageBefore(final Message msg, final MessageQueue mq, final String brokerAddr) { // 发送前的操作,例如记录日志等 } @Override public void sendMessageAfter(final SendResult sendResult, final Message msg, final MessageQueue mq, final String brokerAddr) { // 发送成功后的操作,例如记录日志等 } @Override public void onException(final Throwable e, final Message msg, final MessageQueue mq, final String brokerAddr) { // 发送异常时的操作,例如自定义重试逻辑 } @Override public void onTransaction(final TransactionSendResult sendResult, final Message msg, final MessageQueue mq, final String brokerAddr) { // 事务消息相关操作 } }
- 注册Hook:在Spring Boot配置文件或配置类中注册自定义的
MQSendHook
。import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RocketMQConfig { @Bean public DefaultMQProducer producer() { DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.registerSendMessageHook(new CustomProducerSendHook()); return producer; } }
- 实现MQSendHook接口:
- 消费者自定义重试策略:
- 实现MessageListenerConcurrently接口:
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class CustomMessageListener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { // 消息处理逻辑 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 自定义重试逻辑 Integer reconsumeTimes = msg.getReconsumeTimes(); if (reconsumeTimes < 3) { // 例如重试3次 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } else { // 超过重试次数的处理,例如记录到死信队列等 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
- 注册监听器:在Spring Boot配置文件或配置类中注册自定义的消息监听器。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RocketMQConsumerConfig { @Bean public DefaultMQPushConsumer consumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("topic_name", "*"); consumer.registerMessageListener(new CustomMessageListener()); return consumer; } }
- 实现MessageListenerConcurrently接口: