MST

星途 面试题库

面试题:特定场景下消息队列协议的适配

假设你正在开发一个电商系统,该系统需要处理高并发的订单消息,同时要确保消息的可靠传递和顺序性。在这种场景下,你会选择哪种消息队列协议,并阐述如何对其进行适配以满足系统需求?
29.1万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

选择 Kafka 协议

  1. 原因
    • 高并发处理能力:Kafka 采用分布式架构,具有高吞吐量的特点,能够轻松应对电商系统高并发订单消息的处理。它通过分区(Partition)机制,将数据分散存储在多个 Broker 节点上,可并行处理大量消息。
    • 可靠传递:Kafka 支持多副本(Replica)机制,每个分区可以有多个副本,其中一个副本为 Leader,其他为 Follower。当 Leader 副本所在的 Broker 节点出现故障时,Follower 副本会选举出一个新的 Leader,确保数据不丢失,保证了消息的可靠传递。
    • 顺序性保证:在 Kafka 中,同一个分区内的消息是有序的。对于需要顺序处理的订单消息,可以通过将具有相同业务标识(如订单号)的消息发送到同一个分区,这样消费者从该分区消费消息时,就能保证消息的顺序性。
  2. 适配措施
    • 生产者端
      • 消息分区策略:为了保证订单消息的顺序性,需要自定义分区器。例如,根据订单号的哈希值对分区数取模,将相同订单号的消息发送到同一个分区。示例代码(以 Java 为例):
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;

public class OrderIdPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int numPartitions = cluster.partitionsForTopic(topic).size();
        String orderId = (String) key;
        return Math.abs(Utils.murmur2(orderId.getBytes())) % numPartitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String,?> configs) {

    }
}

然后在生产者配置中指定该分区器:

Properties props = new Properties();
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OrderIdPartitioner.class.getName());
// 其他生产者配置...
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
 - **消息确认机制**:使用 `acks = all` 配置,确保所有副本都收到消息后生产者才认为消息发送成功,提高消息可靠性。
props.put(ProducerConfig.ACKS_CONFIG, "all");
  • 消费者端
    • 单线程消费:为保证消息顺序处理,每个分区由一个消费者线程进行消费。可以通过设置 group.id,使多个消费者组成一个消费组,每个消费者负责消费不同的分区。
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order - group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("order - topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理订单消息
        System.out.println("Received message: " + record.value());
    }
}
 - **偏移量管理**:采用手动提交偏移量的方式,确保消息处理成功后再提交偏移量,防止消息重复消费。
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 处理完消息后手动提交偏移量
consumer.commitSync();
  • Kafka 集群配置
    • 副本因子设置:根据系统的可靠性要求,合理设置副本因子。例如,设置为 3,即每个分区有 2 个 Follower 副本,提高数据容错能力。
# server.properties 文件中配置
default.replication.factor = 3
 - **分区数规划**:根据预估的订单消息量和系统处理能力,合理规划分区数。分区数过少可能导致处理能力不足,分区数过多可能增加管理开销。可以通过性能测试来确定最优的分区数。