MST

星途 面试题库

面试题:消息队列RocketMQ消息过滤与订阅机制在复杂业务场景下的设计与实践

假设存在一个电商订单处理系统,需要根据订单金额、下单时间、商品类别等多个维度对消息进行过滤和订阅,且不同的业务模块有不同的订阅需求。请设计一套基于RocketMQ消息过滤与订阅机制的解决方案,详细说明架构设计、消息模型、代码实现思路以及可能面临的挑战和应对策略。
48.0万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

架构设计

  1. 生产者:在电商订单处理系统中,订单生成模块作为生产者,将包含订单金额、下单时间、商品类别等完整信息的订单消息发送到RocketMQ集群。每个订单消息是一个完整的业务数据单元。
  2. RocketMQ集群:负责接收、存储和转发消息。可以采用多Broker部署,提高系统的可靠性和消息处理能力。同时,配置合适的Topic和Tag来初步区分不同类型的消息。
  3. 消费者:各个业务模块作为消费者,根据自身需求从RocketMQ中订阅特定的消息。例如,财务模块可能只关心订单金额大于一定数值的消息;库存模块可能关注特定商品类别的订单消息。

消息模型

  1. Topic:定义一个通用的订单Topic,比如 “order - topic”,用于接收所有订单相关消息。
  2. Tag:根据商品类别定义Tag,如 “tag - electronics”(电子产品)、“tag - clothing”(服装)等。同时,也可以结合其他维度定义Tag,如 “tag - high - amount”(高金额订单)等。
  3. 消息体:消息体中封装完整的订单信息,包括订单金额、下单时间、商品类别、订单ID等详细数据。

代码实现思路

  1. 生产者代码
    // 引入RocketMQ依赖
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    public class OrderProducer {
        public static void main(String[] args) throws Exception {
            // 创建生产者实例
            DefaultMQProducer producer = new DefaultMQProducer("group - order - producer");
            // 设置NameServer地址
            producer.setNamesrvAddr("localhost:9876");
            // 启动生产者
            producer.start();
    
            // 构造订单消息
            String orderInfo = "{\"orderId\":\"123456\",\"orderAmount\":100.0,\"orderTime\":\"2023 - 01 - 01 12:00:00\",\"productCategory\":\"electronics\"}";
            Message message = new Message("order - topic", "tag - electronics", orderInfo.getBytes("UTF - 8"));
    
            // 发送消息
            SendResult sendResult = producer.send(message);
            System.out.println("Send result: " + sendResult);
    
            // 关闭生产者
            producer.shutdown();
        }
    }
    
  2. 消费者代码
    // 引入RocketMQ依赖
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class OrderConsumer {
        public static void main(String[] args) throws Exception {
            // 创建消费者实例
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group - order - consumer");
            // 设置NameServer地址
            consumer.setNamesrvAddr("localhost:9876");
            // 订阅Topic和Tag
            consumer.subscribe("order - topic", "tag - electronics");
    
            // 注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        String messageBody = new String(msg.getBody());
                        System.out.println("Received message: " + messageBody);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            // 启动消费者
            consumer.start();
        }
    }
    

可能面临的挑战和应对策略

  1. 复杂过滤条件
    • 挑战:实际业务中过滤条件可能非常复杂,仅靠Tag无法满足,比如需要同时根据订单金额和下单时间过滤。
    • 策略:可以使用SQL92表达式进行消息过滤。在RocketMQ配置中开启SQL过滤功能,生产者发送消息时设置相关属性,消费者订阅时使用SQL表达式。例如:
    // 生产者设置属性
    message.putUserProperty("orderAmount", "100.0");
    message.putUserProperty("orderTime", "2023 - 01 - 01 12:00:00");
    
    // 消费者订阅SQL表达式
    consumer.subscribe("order - topic", MessageSelector.bySql("orderAmount > 50 AND orderTime > '2023 - 01 - 01 10:00:00'"));
    
  2. 性能问题
    • 挑战:复杂过滤和大量订阅可能导致RocketMQ性能下降。
    • 策略:合理设计Topic和Tag,避免过度复杂的过滤逻辑。对热点消息进行拆分,采用分布式消费方式提高处理速度。同时,定期监控RocketMQ集群性能指标,进行调优。
  3. 数据一致性
    • 挑战:订单处理过程中可能涉及多个业务模块,消息处理顺序和结果可能影响数据一致性。
    • 策略:使用事务消息确保订单消息在多个业务模块间的一致性处理。生产者发送事务消息,RocketMQ保证消息要么全部被消费,要么全部不被消费。同时,在业务模块中实现幂等性处理,避免重复消费导致的数据不一致。