MST

星途 面试题库

面试题:消息队列之RocketMQ消息生产者中等难度题

在RocketMQ中,消息生产者如何保证消息的有序发送?请简述其原理及实现方式。
45.4万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

原理

RocketMQ 中通过将消息发送到同一个队列来保证消息的有序性。在 RocketMQ 中,每个 Topic 可以包含多个 Message Queue,而消息发送时,如果能确保所有相关消息都发往同一个 Message Queue,那么消费者按照顺序消费该队列中的消息,就能实现消息的有序性。

实现方式

  1. 同步发送:使用 DefaultMQProducersend 方法进行同步发送消息,在发送下一条消息前,会等待当前消息发送成功。例如:
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("namesrvAddr");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
producer.shutdown();
  1. 使用 MessageQueueSelector:通过实现 MessageQueueSelector 接口来指定消息发往哪个队列。例如:
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("namesrvAddr");
producer.start();
Message msg1 = new Message("TopicTest", "TagA", "OrderID001", "Hello world 1".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msg2 = new Message("TopicTest", "TagA", "OrderID001", "Hello world 2".getBytes(RemotingHelper.DEFAULT_CHARSET));
List<Message> messages = Arrays.asList(msg1, msg2);
producer.send(messages, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, 0);
producer.shutdown();

在上述代码中,MessageQueueSelectorselect 方法会根据传入的参数 arg(这里是 0)来选择一个队列,确保具有相同业务标识(如订单号等)的消息发往同一个队列,从而保证有序性。