MST

星途 面试题库

面试题:消息队列之RocketMQ资源隔离策略应用场景

假设你正在开发一个电商系统,订单消息和库存消息都使用RocketMQ进行处理,为了避免订单处理流量过大影响库存更新的稳定性,你将如何运用RocketMQ的资源隔离策略?请详细说明方案及配置思路。
23.8万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

方案

  1. 队列隔离
    • 为订单消息和库存消息分别创建独立的消息队列。订单消息使用一组队列,库存消息使用另一组队列。这样不同类型的消息在物理存储上隔离开,避免订单消息流量过大时占用过多存储资源影响库存消息存储。
    • 例如,创建名为order_queues的订单队列组和stock_queues的库存队列组。
  2. 消费端线程池隔离
    • 在消费端,为订单消息和库存消息的消费者分别配置独立的线程池。订单消费者使用一个线程池处理订单消息,库存消费者使用另一个线程池处理库存消息。这样订单消息处理的高并发不会导致库存消息处理线程被耗尽,保证库存更新的稳定性。
    • 比如,订单消费者线程池配置为corePoolSize = 10maximumPoolSize = 20,库存消费者线程池配置为corePoolSize = 5maximumPoolSize = 10。具体线程池参数根据业务负载调整。
  3. 流量控制
    • 利用RocketMQ的流控机制,对订单消息的消费速率进行限制。可以设置订单消息的最大消费TPS(Transactions Per Second),避免订单处理流量过大。例如,将订单消息的消费TPS限制为1000,即每秒最多处理1000条订单消息。这样可以控制订单处理对系统资源的占用,从而不影响库存更新的稳定性。

配置思路

  1. 队列隔离配置
    • 在RocketMQ控制台或通过配置文件,创建订单主题(如order_topic)并指定其队列数量和分布,例如设置为10个队列。同样,创建库存主题(如stock_topic)并设置队列数量,如5个队列。
    • 代码中发送消息时,指定正确的主题和队列选择策略。例如,订单消息发送代码:
DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
producer.start();
Message msg = new Message("order_topic", "TagA", "Order1".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
producer.shutdown();
  • 库存消息发送代码类似,只是主题为stock_topic
  1. 消费端线程池隔离配置
    • 订单消费者代码配置线程池:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20);
consumer.subscribe("order_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 订单消息处理逻辑
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
  • 库存消费者代码配置线程池:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("stock_consumer_group");
consumer.setConsumeThreadMin(5);
consumer.setConsumeThreadMax(10);
consumer.subscribe("stock_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 库存消息处理逻辑
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
  1. 流量控制配置
    • 可以在消费端代码中设置流量控制参数。例如,订单消费者设置消费速率:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setPullThresholdForQueue(1000);
consumer.setPullInterval(0);
consumer.setConsumeConcurrentlyMaxSpan(2000);
consumer.setConsumeTimeout(15);
consumer.subscribe("order_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 订单消息处理逻辑
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
  • 通过调整pullThresholdForQueue等参数来限制消费速率,从而实现流量控制,保护库存更新的稳定性。