MST

星途 面试题库

面试题:特定场景下消息队列顺序消息实现的优化策略

假设存在一个高并发场景,其中订单创建、支付、发货等流程通过消息队列以顺序消息的形式流转。在这个场景下,如何对消息队列顺序消息的实现进行优化,以提高系统性能、降低延迟,同时保证消息顺序的严格性?请从消息队列选型、架构设计、代码实现等多方面进行阐述。
33.9万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

消息队列选型

  1. Kafka
    • 分区设计:在Kafka中,可以为每个订单分配特定的分区。例如,以订单ID的哈希值对分区数取模,将同一订单相关的消息都发送到同一个分区。这样,一个分区内的消息天然是顺序消费的。同时,Kafka通过多副本机制保证数据可靠性,即使某个副本所在的节点故障,也能从其他副本继续消费,不影响消息顺序。
    • 生产者配置:设置acks = all,确保所有副本都接收到消息后生产者才认为消息发送成功,保证消息不丢失,进而保证顺序性。同时,合理设置retries参数,当消息发送失败时自动重试,减少因网络等临时故障导致的消息顺序错乱。
  2. RocketMQ
    • 顺序消息支持:RocketMQ原生支持顺序消息。生产者可以通过MessageQueueSelector实现消息的有序发送,例如根据订单ID选择对应的队列。消费者端,RocketMQ保证每个队列的消息按顺序消费。并且RocketMQ有多种消息存储方式,如CommitLog和ConsumeQueue的组合,优化了消息存储和读取性能,减少消息消费延迟。
    • 事务消息:对于订单创建、支付、发货这种存在事务关系的场景,RocketMQ的事务消息可以保证消息的最终一致性和顺序性。例如,在订单创建成功后发送事务消息,支付和发货流程根据事务消息状态进行后续操作,确保整个流程的有序和正确。

架构设计

  1. 分层架构
    • 消息生产层:对订单创建、支付、发货等操作进行封装,在业务逻辑执行成功后,统一发送消息到消息队列。可以采用异步方式发送消息,减少业务处理线程的等待时间。例如,使用线程池来发送消息,避免因消息发送阻塞业务流程。
    • 消息处理层:根据不同的业务逻辑,如订单创建处理、支付处理、发货处理等,创建对应的消费者组。每个消费者组内的消费者按顺序消费消息。可以设置多个消费者实例提高消费并行度,但需要保证同一订单的消息由同一个消费者实例处理。例如,通过负载均衡算法将同一订单的消息分配到同一个消费者实例上。
    • 数据存储层:对于订单相关的数据,如订单状态、支付信息、发货信息等,采用数据库持久化。在消息处理过程中,更新数据库状态。可以使用读写分离架构,主库负责写操作,从库负责读操作,提高数据访问性能,同时保证数据一致性,为消息处理提供可靠的数据支持。
  2. 缓存机制
    • 本地缓存:在消费者端,可以使用本地缓存(如Guava Cache)缓存部分订单相关数据,减少对数据库的频繁访问。例如,缓存订单的基本信息,在处理支付和发货消息时,直接从本地缓存获取订单信息,提高处理速度。
    • 分布式缓存:如Redis,用于缓存一些全局配置信息、订单状态等。例如,将订单状态缓存到Redis中,在消息处理过程中快速获取订单当前状态,判断是否可以进行下一步操作,减少数据库查询压力,降低消息处理延迟。

代码实现

  1. 生产者代码
    • Kafka生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
String orderId = "123456";
String message = "订单创建消息";
ProducerRecord<String, String> record = new ProducerRecord<>("order-topic", orderId.hashCode() % numPartitions, orderId, message);
producer.send(record);
producer.close();
  • RocketMQ生产者
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("order-topic", "订单创建消息".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        String orderId = (String) arg;
        int index = Math.abs(orderId.hashCode()) % mqs.size();
        return mqs.get(index);
    }
}, "123456");
producer.shutdown();
  1. 消费者代码
    • Kafka消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-consumer-group");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("order-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理订单消息逻辑
        System.out.println("收到消息:" + record.value());
    }
}
  • RocketMQ消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order-topic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            // 处理订单消息逻辑
            System.out.println("收到消息:" + new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
consumer.start();
  1. 异常处理
    • 在生产者发送消息时,捕获可能的异常,如网络异常、Broker不可用等。例如,在Kafka生产者中,对send方法调用进行异常捕获:
try {
    producer.send(record).get();
} catch (InterruptedException | ExecutionException e) {
    // 记录日志,进行重试等操作
    e.printStackTrace();
}
  • 在消费者处理消息时,也要对可能出现的业务异常进行捕获。例如,在处理支付消息时,可能出现支付接口调用失败等异常,捕获异常后可以进行重试或记录到异常队列,后续人工处理。
try {
    // 处理支付消息逻辑
    payOrder(order);
} catch (PaymentException e) {
    // 记录日志,重试或放入异常队列
    e.printStackTrace();
}