MST

星途 面试题库

面试题:RocketMQ消息过滤与标签路由在复杂业务场景的设计与实践

假设存在一个电商订单处理系统,涉及订单创建、支付、发货、退款等多种业务操作,每个操作都会产生消息。要求使用RocketMQ的消息过滤机制与标签路由来设计一个高效的消息处理架构,确保不同业务模块能够准确、及时地接收到相关消息。请详细描述你的设计思路,包括标签的设计、消息发送与消费逻辑以及可能遇到的问题及解决方案。
21.5万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 标签设计
    • 为订单创建操作设计标签TAG_CREATE_ORDER
    • 支付操作标签TAG_PAYMENT
    • 发货操作标签TAG_SHIPMENT
    • 退款操作标签TAG_REFUND
  2. 消息发送逻辑
    • 当订单创建时,生产者发送消息到RocketMQ,并打上TAG_CREATE_ORDER标签。
    Message msg = new Message("OrderTopic", "TAG_CREATE_ORDER", "订单创建消息内容".getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.send(msg);
    
    • 支付完成后,发送带有TAG_PAYMENT标签的消息。
    Message paymentMsg = new Message("OrderTopic", "TAG_PAYMENT", "支付完成消息内容".getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult paymentSendResult = producer.send(paymentMsg);
    
    • 发货操作完成,发送TAG_SHIPMENT标签消息。
    Message shipmentMsg = new Message("OrderTopic", "TAG_SHIPMENT", "发货完成消息内容".getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult shipmentSendResult = producer.send(shipmentMsg);
    
    • 退款操作完成,发送TAG_REFUND标签消息。
    Message refundMsg = new Message("OrderTopic", "TAG_REFUND", "退款完成消息内容".getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult refundSendResult = producer.send(refundMsg);
    
  3. 消息消费逻辑
    • 订单创建相关业务模块消费带有TAG_CREATE_ORDER标签的消息。
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CreateOrderConsumerGroup");
    consumer.subscribe("OrderTopic", "TAG_CREATE_ORDER");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 处理订单创建消息逻辑
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    
    • 支付业务模块消费TAG_PAYMENT标签消息。
    DefaultMQPushConsumer paymentConsumer = new DefaultMQPushConsumer("PaymentConsumerGroup");
    paymentConsumer.subscribe("OrderTopic", "TAG_PAYMENT");
    paymentConsumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 处理支付消息逻辑
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    paymentConsumer.start();
    
    • 发货业务模块消费TAG_SHIPMENT标签消息。
    DefaultMQPushConsumer shipmentConsumer = new DefaultMQPushConsumer("ShipmentConsumerGroup");
    shipmentConsumer.subscribe("OrderTopic", "TAG_SHIPMENT");
    shipmentConsumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 处理发货消息逻辑
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    shipmentConsumer.start();
    
    • 退款业务模块消费TAG_REFUND标签消息。
    DefaultMQPushConsumer refundConsumer = new DefaultMQPushConsumer("RefundConsumerGroup");
    refundConsumer.subscribe("OrderTopic", "TAG_REFUND");
    refundConsumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            // 处理退款消息逻辑
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    refundConsumer.start();
    

可能遇到的问题及解决方案

  1. 消息堆积
    • 问题:高并发场景下,消息生产速度超过消费速度,导致消息堆积。
    • 解决方案:增加消费者实例,提高消费并行度;优化消费逻辑,减少单个消息处理时间。例如,在消费端配置consumer.setConsumeThreadMin(10); consumer.setConsumeThreadMax(20);来调整消费线程数。
  2. 标签配置错误
    • 问题:生产者或消费者标签配置错误,导致消息无法准确路由或接收。
    • 解决方案:加强代码审查,在测试环境进行充分的功能测试,确保标签配置正确。同时,在生产环境增加监控,当发现某个消费者组长时间没有消费到消息时,检查标签配置。
  3. RocketMQ服务故障
    • 问题:RocketMQ服务器出现故障,影响消息收发。
    • 解决方案:采用多Master多Slave的高可用架构,当Master节点故障时,Slave节点可以切换为Master继续提供服务。同时,增加监控和报警机制,及时发现并处理RocketMQ服务故障。