面试题答案
一键面试原理
RocketMQ 中通过将消息发送到同一个队列来保证消息的有序性。在 RocketMQ 中,每个 Topic 可以包含多个 Message Queue,而消息发送时,如果能确保所有相关消息都发往同一个 Message Queue,那么消费者按照顺序消费该队列中的消息,就能实现消息的有序性。
实现方式
- 同步发送:使用
DefaultMQProducer
的send
方法进行同步发送消息,在发送下一条消息前,会等待当前消息发送成功。例如:
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("namesrvAddr");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
producer.shutdown();
- 使用 MessageQueueSelector:通过实现
MessageQueueSelector
接口来指定消息发往哪个队列。例如:
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("namesrvAddr");
producer.start();
Message msg1 = new Message("TopicTest", "TagA", "OrderID001", "Hello world 1".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msg2 = new Message("TopicTest", "TagA", "OrderID001", "Hello world 2".getBytes(RemotingHelper.DEFAULT_CHARSET));
List<Message> messages = Arrays.asList(msg1, msg2);
producer.send(messages, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 0);
producer.shutdown();
在上述代码中,MessageQueueSelector
的 select
方法会根据传入的参数 arg
(这里是 0
)来选择一个队列,确保具有相同业务标识(如订单号等)的消息发往同一个队列,从而保证有序性。