考虑因素
- 消息顺序性来源:生产者发送消息时要确保消息顺序,例如按业务订单ID等维度进行有序发送。
- 队列分配:消费者需要保证同一组相关消息被分配到同一个消费队列(ConsumeQueue),进而分配到同一个消费者实例处理。RocketMQ 是基于队列来保证局部顺序的,不同队列间无法保证顺序。
- 消费线程:同一消费队列的消息必须由同一个线程按顺序消费,避免多线程并发消费导致顺序混乱。
配置与编码实现
- 配置:
- 在生产者端,发送消息时,对于需要顺序消费的消息,尽量保证发送到同一个队列。可以根据业务 key 进行路由,例如:
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.start();
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 0);
- 在消费者端,配置 `MessageListenerOrderly` 监听器。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 处理消息
System.out.println("Consume message: " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
- 编码实现要点:
- 消费逻辑:在
MessageListenerOrderly
的 consumeMessage
方法中,对消息进行顺序处理。由于消息已经按顺序进入该方法,只要处理逻辑不出现异常,即可保证顺序消费。
- 异常处理:在消费过程中,如果出现异常,应根据业务情况进行处理。比如可以进行重试,确保消息消费成功,以维护顺序性。但需要注意避免无限重试导致的死循环。如果是不可恢复的异常,可以记录日志并跳过该消息,继续处理后续消息,但这可能会破坏严格顺序,需要谨慎处理。