方案
- 队列隔离:
- 为订单消息和库存消息分别创建独立的消息队列。订单消息使用一组队列,库存消息使用另一组队列。这样不同类型的消息在物理存储上隔离开,避免订单消息流量过大时占用过多存储资源影响库存消息存储。
- 例如,创建名为
order_queues
的订单队列组和stock_queues
的库存队列组。
- 消费端线程池隔离:
- 在消费端,为订单消息和库存消息的消费者分别配置独立的线程池。订单消费者使用一个线程池处理订单消息,库存消费者使用另一个线程池处理库存消息。这样订单消息处理的高并发不会导致库存消息处理线程被耗尽,保证库存更新的稳定性。
- 比如,订单消费者线程池配置为
corePoolSize = 10
,maximumPoolSize = 20
,库存消费者线程池配置为corePoolSize = 5
,maximumPoolSize = 10
。具体线程池参数根据业务负载调整。
- 流量控制:
- 利用RocketMQ的流控机制,对订单消息的消费速率进行限制。可以设置订单消息的最大消费TPS(Transactions Per Second),避免订单处理流量过大。例如,将订单消息的消费TPS限制为1000,即每秒最多处理1000条订单消息。这样可以控制订单处理对系统资源的占用,从而不影响库存更新的稳定性。
配置思路
- 队列隔离配置:
- 在RocketMQ控制台或通过配置文件,创建订单主题(如
order_topic
)并指定其队列数量和分布,例如设置为10个队列。同样,创建库存主题(如stock_topic
)并设置队列数量,如5个队列。
- 代码中发送消息时,指定正确的主题和队列选择策略。例如,订单消息发送代码:
DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
producer.start();
Message msg = new Message("order_topic", "TagA", "Order1".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
producer.shutdown();
- 库存消息发送代码类似,只是主题为
stock_topic
。
- 消费端线程池隔离配置:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20);
consumer.subscribe("order_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 订单消息处理逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("stock_consumer_group");
consumer.setConsumeThreadMin(5);
consumer.setConsumeThreadMax(10);
consumer.subscribe("stock_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 库存消息处理逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
- 流量控制配置:
- 可以在消费端代码中设置流量控制参数。例如,订单消费者设置消费速率:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setPullThresholdForQueue(1000);
consumer.setPullInterval(0);
consumer.setConsumeConcurrentlyMaxSpan(2000);
consumer.setConsumeTimeout(15);
consumer.subscribe("order_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 订单消息处理逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
- 通过调整
pullThresholdForQueue
等参数来限制消费速率,从而实现流量控制,保护库存更新的稳定性。