MST

星途 面试题库

面试题:消息队列之Spring Boot项目中Kafka的消息可靠性保障

在Spring Boot集成Kafka开发消息驱动模块时,如何确保消息的可靠性?比如如何处理消息的发送确认、消费者的偏移量管理以及Kafka集群的副本机制在其中起到什么作用,请详细阐述。
44.0万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

消息发送确认

  1. 同步发送
    • 使用 KafkaTemplatesend 方法返回 ListenableFuture<SendResult<K, V>>。通过调用 get 方法同步等待结果,获取发送状态。
    • 示例代码:
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    public void sendMessage(String topic, String message) {
        try {
            SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
            if (result.getRecordMetadata() != null) {
                System.out.println("消息发送成功, 分区: " + result.getRecordMetadata().partition() +
                        ", 偏移量: " + result.getRecordMetadata().offset());
            } else {
                System.out.println("消息发送失败");
            }
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("消息发送异常: " + e.getMessage());
        }
    }
    
  2. 异步发送
    • 利用 addCallback 方法注册成功和失败回调函数。
    • 示例代码:
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    public void sendMessageAsync(String topic, String message) {
        kafkaTemplate.send(topic, message)
       .addCallback(success -> {
            System.out.println("消息发送成功, 分区: " + success.getRecordMetadata().partition() +
                    ", 偏移量: " + success.getRecordMetadata().offset());
        }, failure -> {
            System.out.println("消息发送失败: " + failure.getMessage());
        });
    }
    

消费者偏移量管理

  1. 自动提交偏移量
    • application.properties 中配置 spring.kafka.consumer.enable-auto-commit=true,默认每 spring.kafka.consumer.auto - commit - interval - ms = 5000(5秒)自动提交一次偏移量。
    • 优点是简单,缺点是可能导致重复消费,比如在消息处理过程中,还未处理完就提交了偏移量,重启后会从已提交的偏移量之后开始消费,未处理完的消息就会再次被消费。
  2. 手动提交偏移量
    • 配置 spring.kafka.consumer.enable-auto-commit=false
    • 在消息处理逻辑中,手动调用 Acknowledgmentacknowledge 方法提交偏移量。
    • 示例代码:
    @KafkaListener(topics = "test - topic", groupId = "test - group")
    public void listen(String message, Acknowledgment ack) {
        try {
            // 处理消息逻辑
            System.out.println("接收到消息: " + message);
            // 手动提交偏移量
            ack.acknowledge();
        } catch (Exception e) {
            // 处理异常,比如可以选择不提交偏移量,让消息重新消费
        }
    }
    

Kafka集群副本机制的作用

  1. 数据冗余:Kafka 集群通过副本机制将数据复制到多个 broker 节点上。每个分区可以有多个副本,其中一个为 leader 副本,其他为 follower 副本。当 leader 副本所在的 broker 节点出现故障时,follower 副本中的一个会被选举为新的 leader 副本,保证数据不丢失,从而提高数据的可靠性。
  2. 提高可用性:副本机制使得 Kafka 集群在部分节点故障时仍能正常工作。生产者可以继续向新的 leader 副本发送消息,消费者也可以从新的 leader 副本消费消息,整个集群不会因为某个节点的故障而停止服务,增强了系统的可用性。
  3. 负载均衡:follower 副本可以分担 leader 副本的部分读请求,特别是在大规模数据读取场景下,通过将读请求分散到多个副本上,提高了系统的整体性能和吞吐量。