MST

星途 面试题库

面试题:消息队列RocketMQ中标签路由的基本原理是什么

请阐述RocketMQ消息过滤机制中标签路由的基本原理,包括生产者如何设置标签,消费者如何通过标签进行消息过滤以及这种路由方式在实际应用场景中的优势。
29.3万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

标签路由基本原理

在RocketMQ中,标签(Tag)是在消息的属性层面用于区分不同性质的消息。生产者发送消息时会为消息设置特定标签,消息到达Broker后,Broker基于这些标签对消息进行分类存储。当消费者订阅消息时,指定要消费带有特定标签的消息,Broker根据消费者订阅的标签,将符合条件的消息推送给消费者,实现消息的过滤与路由。

生产者设置标签

生产者在发送消息时,通过Message对象的setTags方法来设置标签。示例代码如下:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes("UTF-8"));
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }

        producer.shutdown();
    }
}

上述代码中,Message的第二个参数TagA就是设置的标签。

消费者通过标签进行消息过滤

消费者通过subscribe方法指定要订阅的主题和标签来过滤消息。示例代码如下:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer Started.");
    }
}

在上述代码中,consumer.subscribe("TopicTest", "TagA")指定了只消费TopicTest主题下标签为TagA的消息。

实际应用场景中的优势

  1. 业务解耦:不同业务模块可以通过不同标签来发送和接收消息,使得消息系统中的消息流更加清晰,各个业务之间不会因为消息的相互干扰而影响稳定性,实现业务的解耦。例如,电商系统中,订单相关消息可以设置OrderTag,库存相关消息设置StockTag,不同业务模块只关心自己标签的消息。
  2. 灵活的消息路由:在复杂的业务场景中,通过标签可以灵活地将消息路由到不同的消费者。例如,在一个大型分布式系统中,不同的服务可能对同一主题下不同类型的消息感兴趣,通过标签可以方便地将消息发送给对应的服务。
  3. 提高消费效率:消费者只接收自己感兴趣的消息,减少了不必要的消息接收和处理,提高了消费效率。因为Broker只推送符合标签的消息,消费者无需在本地对大量无关消息进行过滤,节省了网络带宽和本地处理资源。