MST

星途 面试题库

面试题:消息队列RocketMQ如何在局部范围内保障消息顺序性

在RocketMQ中,假设我们有一个订单处理的业务场景,订单创建、订单支付、订单完成这几个消息需要顺序处理,在不考虑全局顺序性的情况下,你能阐述下RocketMQ是如何保障这些消息在局部范围内顺序消费的吗?
44.5万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试
  1. 生产者端
    • 生产者在发送消息时,可以通过设置 MessageQueueSelector 来将同一订单相关的消息发送到同一个队列。例如,根据订单ID进行路由,确保相同订单ID的消息都发送到相同的队列。示例代码如下(以Java为例):
    DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
    producer.setNamesrvAddr("namesrvAddr");
    producer.start();
    Message msg1 = new Message("OrderTopic", "TagA", "order1".getBytes(RemotingHelper.DEFAULT_CHARSET));
    Message msg2 = new Message("OrderTopic", "TagA", "order2".getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer id = (Integer) arg;
            int index = id % mqs.size();
            return mqs.get(index);
        }
    }, 1); // 假设订单ID为1
    SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer id = (Integer) arg;
            int index = id % mqs.size();
            return mqs.get(index);
        }
    }, 2); // 假设订单ID为2
    producer.shutdown();
    
  2. 消费者端
    • RocketMQ消费者采用单线程消费一个队列中的消息。当消费者启动时,在消费同一队列的消息时,按照消息进入队列的顺序依次处理。例如,如果订单创建、订单支付、订单完成的消息都在同一个队列中,消费者会先处理订单创建消息,然后处理订单支付消息,最后处理订单完成消息。
    • 消费者可以通过设置 ConsumeFromWhere 来确定从队列的哪个位置开始消费,一般默认 ConsumeFromLastOffset 从队列末尾开始消费新消息。示例代码如下(以Java为例):
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
    consumer.setNamesrvAddr("namesrvAddr");
    consumer.subscribe("OrderTopic", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt msg : msgs) {
                // 处理订单相关业务逻辑
                System.out.println("Consume message: " + new String(msg.getBody()));
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    

通过以上生产者端将相关消息发送到同一队列,以及消费者端单线程消费队列消息的机制,RocketMQ可以在局部范围内(即同一个队列)保障订单创建、订单支付、订单完成等消息的顺序消费。