MST

星途 面试题库

面试题:消息队列之RocketMQ复杂消息路由场景

假设在一个分布式电商系统中,订单消息根据不同的订单类型(如普通订单、团购订单、秒杀订单)需要路由到不同的消费组进行处理,在RocketMQ中如何基于其消息路由策略实现这种需求?请详细阐述实现思路和关键步骤。
48.5万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

实现思路

  1. 利用消息属性:在发送订单消息时,为消息设置一个属性来标识订单类型,如“orderType”,值分别为“普通订单”“团购订单”“秒杀订单”。
  2. 消费组设计:针对不同订单类型,分别创建对应的消费组,如“ordinary_order_group”用于处理普通订单,“group_buy_order_group”用于处理团购订单,“seckill_order_group”用于处理秒杀订单。
  3. 消息过滤:RocketMQ支持在消费端基于消息属性进行过滤,消费组只接收处理符合自身订单类型的消息。

关键步骤

  1. 发送端
    • 引入依赖:在Maven项目的pom.xml文件中引入RocketMQ的客户端依赖。
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.4</version>
    </dependency>
    
    • 设置消息属性:在发送消息时,根据订单类型设置消息属性。
    DefaultMQProducer producer = new DefaultMQProducer("producer_group");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    
    // 假设order为订单对象,order.getType()获取订单类型
    String orderType = order.getType();
    Message msg = new Message("OrderTopic", "TagA", ("订单内容").getBytes(RemotingHelper.DEFAULT_CHARSET));
    msg.putUserProperty("orderType", orderType);
    SendResult sendResult = producer.send(msg);
    producer.shutdown();
    
  2. 消费端
    • 创建消费组:为不同订单类型创建消费组。
    DefaultMQPushConsumer ordinaryConsumer = new DefaultMQPushConsumer("ordinary_order_group");
    ordinaryConsumer.setNamesrvAddr("localhost:9876");
    ordinaryConsumer.subscribe("OrderTopic", "TagA");
    ordinaryConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        for (Message msg : msgs) {
            String orderType = msg.getUserProperty("orderType");
            if ("普通订单".equals(orderType)) {
                // 处理普通订单逻辑
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    ordinaryConsumer.start();
    
    DefaultMQPushConsumer groupBuyConsumer = new DefaultMQPushConsumer("group_buy_order_group");
    groupBuyConsumer.setNamesrvAddr("localhost:9876");
    groupBuyConsumer.subscribe("OrderTopic", "TagA");
    groupBuyConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        for (Message msg : msgs) {
            String orderType = msg.getUserProperty("orderType");
            if ("团购订单".equals(orderType)) {
                // 处理团购订单逻辑
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    groupBuyConsumer.start();
    
    DefaultMQPushConsumer seckillConsumer = new DefaultMQPushConsumer("seckill_order_group");
    seckillConsumer.setNamesrvAddr("localhost:9876");
    seckillConsumer.subscribe("OrderTopic", "TagA");
    seckillConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        for (Message msg : msgs) {
            String orderType = msg.getUserProperty("orderType");
            if ("秒杀订单".equals(orderType)) {
                // 处理秒杀订单逻辑
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    seckillConsumer.start();
    
  3. 服务器端配置:确保RocketMQ服务器正常运行,并且配置了合适的Topic和相应的权限,使得生产者能够发送消息,消费者能够订阅和接收消息。