面试题答案
一键面试原理
- 消息分区:RocketMQ 通过将消息发送到特定的队列(Queue)来实现一定程度的顺序性。一个 Topic 可以包含多个 Queue,每个 Queue 内部是严格顺序的。
- 生产者发送策略:生产者在发送异步消息时,若要保证顺序,需要按照某种规则将相关消息发送到同一个 Queue 中。
- 消费者消费策略:消费者从特定 Queue 消费消息时,按照先进先出(FIFO)的顺序进行处理,这样就能保证在该 Queue 内消息的顺序性。
实现方式
- 生产者:
- 按业务规则路由:例如,根据订单号的哈希值取模,将同一订单相关的消息发送到同一个 Queue。示例代码如下(以 Java 为例):
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("namesrvAddr");
producer.start();
String[] tags = {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("TopicTest", tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 根据订单号(这里简单用 i 模拟)哈希取模选择队列
int queueId = Math.abs(i % producer.getDefaultMQPushConsumer().getMessageQueueList("TopicTest").size());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
return mqs.get(id);
}
}, queueId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
- 消费者:
- 单线程消费:消费者采用单线程消费模式,确保消息按顺序处理。在消费者配置中设置
consumeMessageBatchMaxSize
为 1,并且使用ConsumeOrderly
监听器。示例代码如下(以 Java 为例):
- 单线程消费:消费者采用单线程消费模式,确保消息按顺序处理。在消费者配置中设置
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("namesrvAddr");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
// 处理消息
System.out.println(new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
这样,通过生产者按规则发送到同一队列,以及消费者单线程消费同一队列消息,在 RocketMQ 异步消息处理机制下保证了消息的顺序性。