MST

星途 面试题库

面试题:从消息队列架构角度分析,RocketMQ如何在分布式环境下实现消息的顺序性,与其他主流消息队列相比有什么独特之处?

详细说明RocketMQ在分布式架构里确保消息顺序投递的原理与实现细节,并对比RabbitMQ、Kafka等常见消息队列在处理消息顺序性方面的架构设计差异。
26.4万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

RocketMQ确保消息顺序投递的原理与实现细节

  1. 原理
    • 生产者端:RocketMQ 生产者通过选择特定的 MessageQueue 来发送消息,保证相同业务逻辑的消息发送到同一个 MessageQueue 中。例如,在电商订单系统中,与同一订单相关的消息(如订单创建、订单支付、订单发货等)都发送到同一个 MessageQueue 。
    • 消费者端:RocketMQ 消费者针对每个 MessageQueue 维护一个线程池来消费消息。由于同一 MessageQueue 中的消息是顺序存储的,消费者按顺序从该 MessageQueue 中拉取并消费消息,从而确保了消息的顺序性。
  2. 实现细节
    • 生产者发送:生产者可以通过实现 MessageQueueSelector 接口来自定义消息发送到哪个 MessageQueue 。例如:
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.start();
Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);
producer.shutdown();

这里通过 MessageQueueSelector 根据 orderId 选择特定的 MessageQueue 发送消息。

  • 消费者消费:消费者端默认使用 ConsumeMessageOrderlyService 来按顺序消费消息。该服务会对每个 MessageQueue 分配一个线程进行消费。当一个 MessageQueue 中的消息被消费时,只有当前消息消费成功,才会继续消费下一条消息。如果消费失败,会根据重试策略进行重试。

与 RabbitMQ、Kafka 在处理消息顺序性方面的架构设计差异

  1. RabbitMQ
    • 原理:RabbitMQ 本身没有直接提供全局消息顺序性保证机制。但是可以通过将所有消息发送到同一个队列(单队列模式),并使用单消费者来确保消息顺序。因为单队列单消费者场景下,消息是按照生产者发送的顺序进入队列并被消费的。
    • 实现细节:生产者将所有消息发送到同一个队列,消费者从该队列消费。例如,在 Python 中使用 pika 库:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_queue')
channel.basic_publish(exchange='', routing_key='order_queue', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消费者代码:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_queue')
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
channel.basic_consume(queue='order_queue', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  • 差异:与 RocketMQ 相比,RabbitMQ 的单队列单消费者模式扩展性较差,当消息量增大时,单队列和单消费者会成为性能瓶颈。而 RocketMQ 通过多队列结合 MessageQueueSelector 以及消费者线程池,既保证了顺序性又有较好的扩展性。
  1. Kafka
    • 原理:Kafka 保证分区内消息的顺序性。生产者通过分区器将消息发送到特定分区,消费者从分区按顺序拉取消息。在 Kafka 中,一个主题(Topic)可以包含多个分区(Partition)。
    • 实现细节:生产者可以自定义分区器,例如在 Java 中:
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
    @Override
    public void close() {}
    @Override
    public void configure(Map<String,?> configs) {}
}

生产者配置分区器:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "MyPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key1", "value1"));
producer.close();

消费者从分区消费消息:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my - topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
  • 差异:Kafka 强调分区内顺序性,不同分区之间的消息顺序无法保证。而 RocketMQ 可以通过合理的设计,在多个队列的情况下也能保证特定业务逻辑消息的顺序性。并且 Kafka 的消费者是基于拉模式,RocketMQ 消费者基于拉模式但有长轮询机制,在顺序性保证方面的一些细节处理上有所不同。