面试题答案
一键面试1. RocketMQ顺序消息消费机制原理
- 生产者端:RocketMQ顺序消息是通过将同一类消息发送到同一个队列来实现顺序性。生产者在发送消息时,会根据业务的顺序规则(例如订单ID)选择特定的队列。例如,如果按照订单ID的哈希值对队列数取模来选择队列,那么同一个订单相关的消息就会被发送到同一个队列。
- 队列层面:每个队列内部是严格有序的。消息一旦进入队列,就按照先进先出(FIFO)的原则进行存储和读取。
- 消费者端:消费者从队列中拉取消息时,也是按照FIFO的顺序。一个队列只会被一个消费者线程消费,这样就保证了消费顺序与生产顺序一致。
2. 消息处理失败的处理方式
- 原理:当某条消息处理失败时,不能直接跳过这条消息继续消费下一条,否则会破坏顺序性。RocketMQ提供了一种重试机制,将处理失败的消息重新发送回队列的尾部,等待下一轮消费。同时,为了避免无限重试导致死循环,RocketMQ会对重试次数进行限制。
- 代码实现:
- 生产者发送消息:
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 100);
producer.shutdown();
- **消费者消费消息**:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
try {
// 处理消息
System.out.println("Consume message: " + new String(msg.getBody()));
} catch (Exception e) {
// 处理失败,返回RECONSUME_LATER,消息会被重新发送到队列尾部
return ConsumeOrderlyStatus.RECONSUME_LATER;
}
}
// 处理成功
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
在上述代码中,消费者实现MessageListenerOrderly
接口来进行顺序消息消费。当消息处理失败时,返回ConsumeOrderlyStatus.RECONSUME_LATER
,RocketMQ会将该消息重新发送到队列尾部,等待下一轮消费。同时,RocketMQ内部会对重试次数进行记录,当达到最大重试次数后,会将消息发送到死信队列(DLQ),以避免无限重试。