MST

星途 面试题库

面试题:消息队列Kafka中如何处理消息的顺序性问题

在使用Kafka进行后端开发时,阐述一下Kafka默认情况下消息顺序性是怎样的,若要严格保证消息顺序,你需要采取哪些措施?
37.1万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

Kafka默认情况下消息顺序性

  1. 生产者端:Kafka生产者在默认情况下,对于发送到同一个分区的消息,能保证消息是按照发送顺序写入分区的。这是因为生产者内部有一个RecordAccumulator(记录累加器),会将待发送的消息按分区进行缓冲,然后按照一定的规则(如缓冲区满、达到批次大小、达到等待时间等)批量发送到对应的分区。所以只要是发往同一分区的消息,它们在分区中的顺序与生产者发送顺序一致。
  2. 消费者端:在默认情况下,Kafka消费者消费消息时,对于每个分区,消费者会按照消息在分区中的顺序依次读取。然而,在一个消费者组中有多个消费者实例时,不同分区的消息消费顺序是无法保证的。因为Kafka的设计初衷是为了实现高吞吐量和分布式处理,每个消费者实例会独立消费分配给自己的分区,不同实例之间的消费进度和顺序不受彼此约束。

若要严格保证消息顺序需采取的措施

  1. 使用单分区:最简单直接的方法就是只使用一个分区。这样所有的消息都在同一个分区中,生产者按顺序发送,消费者按顺序消费,从而保证消息的全局顺序性。但这种方法的缺点是无法利用Kafka的分区并行性优势,在高吞吐量场景下可能性能不足。
  2. 自定义分区器:通过自定义分区器,将需要保证顺序的消息发送到同一个分区。例如,如果消息中有一个唯一标识(如订单ID),可以根据这个标识计算出分区号,确保具有相同标识的消息都发送到同一个分区。在Java中实现自定义分区器如下:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 假设key是订单ID,根据订单ID计算分区
        int orderId = Integer.parseInt(key.toString());
        int numPartitions = cluster.partitionsForTopic(topic).size();
        return Math.abs(orderId % numPartitions);
    }

    @Override
    public void close() {
        // 关闭资源的逻辑
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 配置相关逻辑
    }
}

然后在生产者配置中指定这个分区器:

Properties props = new Properties();
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
// 其他生产者配置...
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  1. 使用事务:Kafka从0.11.0.0版本开始支持事务。通过使用事务,可以保证跨分区和跨会话的消息原子性写入,从而在一定程度上保证消息顺序。具体步骤如下:
    • 生产者开启事务:producer.initTransactions();
    • 在事务内发送消息:
producer.beginTransaction();
try {
    producer.send(new ProducerRecord<>(topic1, key1, value1));
    producer.send(new ProducerRecord<>(topic2, key2, value2));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    producer.abortTransaction();
} catch (KafkaException e) {
    producer.abortTransaction();
}
  1. 消费者端单线程消费:在消费者端,使用单线程消费一个或多个分区的消息。这样可以避免多线程消费导致的消息顺序混乱问题。例如,在Java中可以使用KafkaConsumerpoll方法在单线程中循环消费消息:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息
        System.out.println("Received message: " + record.value());
    }
}

但这种方式会降低消费的并行度,在高吞吐量场景下可能影响消费效率。