MST

星途 面试题库

面试题:如何在高并发场景下确保消息队列的消息顺序性

假设在一个电商系统中,订单创建、支付、发货等操作通过消息队列进行异步处理,在高并发环境下,你将采取哪些策略和技术手段来保证这些消息严格按照顺序消费,以避免业务逻辑错误,结合常见消息队列产品说明具体实现思路。
36.9万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

策略和技术手段

  1. 单队列单消费者:使用单个消息队列,只配置一个消费者。这样所有消息按顺序进入队列,消费者按顺序消费,能确保严格顺序。但缺点是无法利用多消费者并行处理提高效率,适用于并发量不是特别高的场景。
  2. 分区和路由:对消息进行分区,将相关联的消息路由到同一个分区,每个分区配置一个消费者。例如,根据订单ID取模将同一订单相关的创建、支付、发货等消息发送到固定分区。在Kafka中,通过自定义分区器实现,消费者组内每个消费者负责消费一个或多个分区,保证每个分区内消息顺序消费。
  3. 消息标记和重试:为消息添加顺序标记,如序列号。消费者消费消息时检查序列号连续性,若出现乱序,记录缺失消息并进行重试。可结合死信队列,将无法按序消费的消息发送到死信队列,后续人工处理或定时重试。例如RabbitMQ可通过设置消息属性和死信队列相关参数实现。
  4. 事务机制:部分消息队列支持事务,如RocketMQ。在发送一组相关消息时,使用事务确保要么所有消息都成功发送并按顺序进入队列,要么都失败回滚。消费者在事务内按顺序消费消息,保证业务逻辑一致性。

常见消息队列产品具体实现思路

  1. Kafka
    • 分区和路由:实现自定义分区器,根据订单ID等业务标识计算分区号,将同一订单的所有消息发送到同一分区。
    public class OrderIdPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            int numPartitions = cluster.partitionsForTopic(topic).size();
            return Math.abs(key.hashCode()) % numPartitions;
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }
    
    • 消费者组:配置消费者组,每个消费者组内消费者数量与分区数量相同,且每个消费者只负责消费一个分区的消息。
  2. RabbitMQ
    • 单队列单消费者:创建单个队列,只启动一个消费者监听该队列。
    • 消息标记和重试:在消息属性中添加序列号,消费者消费时校验序列号。利用RabbitMQ的死信队列,将乱序消息发送到死信队列。
    // 生产者发送消息设置序列号
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
      .headers(Collections.singletonMap("sequenceId", sequenceId))
      .build();
    channel.basicPublish("", queueName, properties, message.getBytes("UTF-8"));
    
    // 消费者校验序列号
    String sequenceId = (String) delivery.getProperties().getHeaders().get("sequenceId");
    // 校验逻辑
    
  3. RocketMQ
    • 事务机制:生产者使用事务消息发送,确保一组相关消息顺序进入队列。
    TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
    producer.setTransactionListener(new TransactionListener() {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // 本地事务执行逻辑
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // 事务状态检查逻辑
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    });
    producer.start();
    Message message = new Message("topic", "tags", "keys", "message body".getBytes());
    SendResult sendResult = producer.sendMessageInTransaction(message, null);
    
    • 顺序消费:消费者通过设置 MessageListenerOrderly 实现顺序消费。
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
    consumer.subscribe("topic", "*");
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            // 按顺序消费消息逻辑
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();