面试题答案
一键面试标签路由基本原理
在RocketMQ中,标签(Tag)是在消息的属性层面用于区分不同性质的消息。生产者发送消息时会为消息设置特定标签,消息到达Broker后,Broker基于这些标签对消息进行分类存储。当消费者订阅消息时,指定要消费带有特定标签的消息,Broker根据消费者订阅的标签,将符合条件的消息推送给消费者,实现消息的过滤与路由。
生产者设置标签
生产者在发送消息时,通过Message
对象的setTags
方法来设置标签。示例代码如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes("UTF-8"));
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
producer.shutdown();
}
}
上述代码中,Message
的第二个参数TagA
就是设置的标签。
消费者通过标签进行消息过滤
消费者通过subscribe
方法指定要订阅的主题和标签来过滤消息。示例代码如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
在上述代码中,consumer.subscribe("TopicTest", "TagA")
指定了只消费TopicTest
主题下标签为TagA
的消息。
实际应用场景中的优势
- 业务解耦:不同业务模块可以通过不同标签来发送和接收消息,使得消息系统中的消息流更加清晰,各个业务之间不会因为消息的相互干扰而影响稳定性,实现业务的解耦。例如,电商系统中,订单相关消息可以设置
OrderTag
,库存相关消息设置StockTag
,不同业务模块只关心自己标签的消息。 - 灵活的消息路由:在复杂的业务场景中,通过标签可以灵活地将消息路由到不同的消费者。例如,在一个大型分布式系统中,不同的服务可能对同一主题下不同类型的消息感兴趣,通过标签可以方便地将消息发送给对应的服务。
- 提高消费效率:消费者只接收自己感兴趣的消息,减少了不必要的消息接收和处理,提高了消费效率。因为Broker只推送符合标签的消息,消费者无需在本地对大量无关消息进行过滤,节省了网络带宽和本地处理资源。