MST

星途 面试题库

面试题:消息队列RocketMQ异步消息处理中的消息顺序性问题

在RocketMQ异步消息处理机制下,如何保证消息的顺序性?请阐述其原理及实现方式。
16.0万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

原理

  1. 消息分区:RocketMQ 通过将消息发送到特定的队列(Queue)来实现一定程度的顺序性。一个 Topic 可以包含多个 Queue,每个 Queue 内部是严格顺序的。
  2. 生产者发送策略:生产者在发送异步消息时,若要保证顺序,需要按照某种规则将相关消息发送到同一个 Queue 中。
  3. 消费者消费策略:消费者从特定 Queue 消费消息时,按照先进先出(FIFO)的顺序进行处理,这样就能保证在该 Queue 内消息的顺序性。

实现方式

  1. 生产者
    • 按业务规则路由:例如,根据订单号的哈希值取模,将同一订单相关的消息发送到同一个 Queue。示例代码如下(以 Java 为例):
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("namesrvAddr");
producer.start();
String[] tags = {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
    Message msg = new Message("TopicTest", tags[i % tags.length],
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 根据订单号(这里简单用 i 模拟)哈希取模选择队列
    int queueId = Math.abs(i % producer.getDefaultMQPushConsumer().getMessageQueueList("TopicTest").size());
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer id = (Integer) arg;
            return mqs.get(id);
        }
    }, queueId);
    System.out.printf("%s%n", sendResult);
}
producer.shutdown();
  1. 消费者
    • 单线程消费:消费者采用单线程消费模式,确保消息按顺序处理。在消费者配置中设置 consumeMessageBatchMaxSize 为 1,并且使用 ConsumeOrderly 监听器。示例代码如下(以 Java 为例):
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("namesrvAddr");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            // 处理消息
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
consumer.start();
System.out.println("Consumer Started.");

这样,通过生产者按规则发送到同一队列,以及消费者单线程消费同一队列消息,在 RocketMQ 异步消息处理机制下保证了消息的顺序性。