MST

星途 面试题库

面试题:Java的Spring Boot集成Kafka时如何处理消息的幂等性

在Spring Boot与Kafka集成场景下,消息幂等性非常重要。请说明在生产者和消费者端分别有哪些策略来保证消息的幂等性?并给出相应的代码示例及原理分析。
32.9万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

生产者端保证消息幂等性

  1. 策略
    • 使用Kafka生产者的幂等性特性。Kafka从0.11.0.0版本开始引入了生产者幂等性,通过给每个Producer在每个Topic - Partition上分配一个唯一的PID(Producer ID),并使用单调递增的序列号(Sequence Number)来保证消息的幂等性。
  2. 代码示例
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class IdempotentProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 开启幂等性
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key1", "value1");
        try {
            producer.send(record).get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}
  1. 原理分析
    • 当生产者发送消息时,Kafka会为每个消息分配一个序列号,这个序列号在PID和Topic - Partition的范围内是唯一的。
    • Kafka Broker端会缓存最近的序列号,如果接收到的消息序列号小于或等于缓存中的序列号,则会丢弃该消息,从而保证了消息在Broker端的幂等性。即使生产者因为网络问题等原因重复发送消息,Broker也只会处理一次。

消费者端保证消息幂等性

  1. 策略
    • 使用唯一标识和去重表:在消息中添加一个唯一标识(如UUID),消费者消费消息时,先查询去重表(可以是数据库表等),如果该唯一标识已经存在,则不处理该消息;如果不存在,则处理消息并将唯一标识插入去重表。
    • 事务性处理:对于支持事务的存储系统(如JDBC数据库),消费者在处理消息时,将消息处理逻辑和去重逻辑放在一个事务中,保证原子性。
  2. 代码示例(以使用唯一标识和去重表为例,基于Spring Boot和JPA)
    • 定义消息实体
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class MessageRecord {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String messageId;
    // 其他消息相关字段

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }
}
  • 定义Repository
import org.springframework.data.jpa.repository.JpaRepository;

public interface MessageRecordRepository extends JpaRepository<MessageRecord, Long> {
    boolean existsByMessageId(String messageId);
}
  • 消费者逻辑
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {
    @Autowired
    private MessageRecordRepository messageRecordRepository;

    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void consume(String message) {
        String messageId = "这里假设从消息中解析出唯一标识";
        if (!messageRecordRepository.existsByMessageId(messageId)) {
            // 处理消息逻辑
            System.out.println("Consumed message: " + message);
            MessageRecord record = new MessageRecord();
            record.setMessageId(messageId);
            messageRecordRepository.save(record);
        }
    }
}
  1. 原理分析
    • 使用唯一标识和去重表:通过唯一标识,消费者可以识别重复消息。去重表起到了缓存已处理消息标识的作用,避免对重复消息进行重复处理。
    • 事务性处理:将消息处理和去重操作放在事务中,保证了要么都成功(处理消息并记录已处理标识),要么都失败,避免部分处理的情况,从而实现幂等性。