面试题答案
一键面试RocketMQ确保消息顺序投递的原理与实现细节
- 原理
- 生产者端:RocketMQ 生产者通过选择特定的 MessageQueue 来发送消息,保证相同业务逻辑的消息发送到同一个 MessageQueue 中。例如,在电商订单系统中,与同一订单相关的消息(如订单创建、订单支付、订单发货等)都发送到同一个 MessageQueue 。
- 消费者端:RocketMQ 消费者针对每个 MessageQueue 维护一个线程池来消费消息。由于同一 MessageQueue 中的消息是顺序存储的,消费者按顺序从该 MessageQueue 中拉取并消费消息,从而确保了消息的顺序性。
- 实现细节
- 生产者发送:生产者可以通过实现
MessageQueueSelector
接口来自定义消息发送到哪个 MessageQueue 。例如:
- 生产者发送:生产者可以通过实现
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.start();
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
producer.shutdown();
这里通过 MessageQueueSelector
根据 orderId
选择特定的 MessageQueue
发送消息。
- 消费者消费:消费者端默认使用
ConsumeMessageOrderlyService
来按顺序消费消息。该服务会对每个 MessageQueue 分配一个线程进行消费。当一个 MessageQueue 中的消息被消费时,只有当前消息消费成功,才会继续消费下一条消息。如果消费失败,会根据重试策略进行重试。
与 RabbitMQ、Kafka 在处理消息顺序性方面的架构设计差异
- RabbitMQ
- 原理:RabbitMQ 本身没有直接提供全局消息顺序性保证机制。但是可以通过将所有消息发送到同一个队列(单队列模式),并使用单消费者来确保消息顺序。因为单队列单消费者场景下,消息是按照生产者发送的顺序进入队列并被消费的。
- 实现细节:生产者将所有消息发送到同一个队列,消费者从该队列消费。例如,在 Python 中使用
pika
库:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_queue')
channel.basic_publish(exchange='', routing_key='order_queue', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
消费者代码:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_queue')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='order_queue', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
- 差异:与 RocketMQ 相比,RabbitMQ 的单队列单消费者模式扩展性较差,当消息量增大时,单队列和单消费者会成为性能瓶颈。而 RocketMQ 通过多队列结合 MessageQueueSelector 以及消费者线程池,既保证了顺序性又有较好的扩展性。
- Kafka
- 原理:Kafka 保证分区内消息的顺序性。生产者通过分区器将消息发送到特定分区,消费者从分区按顺序拉取消息。在 Kafka 中,一个主题(Topic)可以包含多个分区(Partition)。
- 实现细节:生产者可以自定义分区器,例如在 Java 中:
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String,?> configs) {}
}
生产者配置分区器:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "MyPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key1", "value1"));
producer.close();
消费者从分区消费消息:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my - topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
- 差异:Kafka 强调分区内顺序性,不同分区之间的消息顺序无法保证。而 RocketMQ 可以通过合理的设计,在多个队列的情况下也能保证特定业务逻辑消息的顺序性。并且 Kafka 的消费者是基于拉模式,RocketMQ 消费者基于拉模式但有长轮询机制,在顺序性保证方面的一些细节处理上有所不同。