MST

星途 面试题库

面试题:消息队列之RocketMQ消息优先级实现机制

在RocketMQ中,如何实现消息优先级?请简要描述其原理及相关配置和使用方式。
23.9万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

原理

RocketMQ 本身并不直接支持消息优先级。这是因为消息在 RocketMQ 中的处理机制主要基于队列模型,其设计理念更侧重于高吞吐量和低延迟,而非优先处理特定优先级的消息。但可以通过一些间接的方式模拟实现消息优先级。 一种常见的间接实现思路是:在生产者端按照消息优先级将消息发送到不同的队列,在消费者端为不同队列设置不同的消费线程数,优先级高的队列设置较多的消费线程,使得优先级高的消息能够被更快地处理。

相关配置与使用方式

  1. 生产者
    • 发送消息时,根据消息的优先级将其分配到不同的队列。例如,可以定义一个规则,根据消息优先级计算队列编号。假设消息优先级分为 3 级(1 - 高,2 - 中,3 - 低),有 9 个队列,可以将优先级为 1 的消息发送到队列 0 - 2,优先级为 2 的消息发送到队列 3 - 5,优先级为 3 的消息发送到队列 6 - 8 。
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 假设这里计算得到的 queueId 符合上述按优先级分配队列的规则
    int queueId = calculateQueueIdByPriority(priority); 
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Integer id = (Integer) arg;
            return mqs.get(id);
        }
    }, queueId);
    producer.shutdown();
    
  2. 消费者
    • 在消费者端,针对不同优先级对应的队列设置不同数量的消费线程。例如,针对高优先级消息队列设置 10 个消费线程,中优先级设置 5 个,低优先级设置 2 个。
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.subscribe("TopicTest", "*");
    // 假设 highPriorityQueues 是高优先级队列集合
    consumer.setConsumeThreadMin(10);
    consumer.setConsumeThreadMax(10);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    
    • 通过这种方式,在整体上模拟出消息优先级的效果,高优先级消息队列因为消费线程多,能够更快地处理消息。