MST

星途 面试题库

面试题:如何在 Kafka 消费者端设计一个幂等性的消费逻辑来解决消息重复消费问题?

假设你正在开发一个 Kafka 消费者应用,要求设计一套完整的幂等性消费逻辑,以确保即使出现消息重复消费,业务结果也不会受到影响。请描述具体的实现思路、涉及的数据结构和关键代码片段。
38.2万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

实现思路

  1. 消息唯一标识:为每个消息分配一个唯一标识符。Kafka 中的消息本身就有 offset,但为了业务层面的幂等性,通常在消息体中添加一个业务唯一 ID,如订单号等。
  2. 消费记录存储:使用外部存储(如数据库、Redis 等)来记录已经消费成功的消息 ID。每次消费消息时,先检查该消息 ID 是否已在存储中。
  3. 业务操作原子性:将业务操作设计成原子操作,确保在重复消费时,即使检查到消息已消费,业务操作也不会出现不一致情况。例如,在数据库操作中使用事务。

涉及的数据结构

  1. 数据库表
    • 若使用关系型数据库,创建一张表,例如 consumed_messages,包含字段 message_id(消息唯一标识,作为主键),consume_time(消费时间,可用于统计等)。
    • 若使用 Redis,可以使用 SET 数据结构,SET 中的每个元素是已消费消息的 ID。

关键代码片段(以 Java 结合 Kafka 和 Redis 为例)

  1. 引入依赖
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring - kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring - boot - starter - data - redis</artifactId>
    </dependency>
    
  2. Kafka 消费者配置
    @Configuration
    public class KafkaConsumerConfig {
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "my - group");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    }
    
  3. 幂等性消费逻辑
    @Service
    public class KafkaConsumerService {
    
        @Autowired
        private RedisTemplate<String, String> redisTemplate;
    
        @KafkaListener(topics = "my - topic", groupId = "my - group")
        public void consume(String message) {
            // 假设消息格式为 json,解析出消息唯一 ID
            JSONObject jsonObject = JSON.parseObject(message);
            String messageId = jsonObject.getString("message_id");
    
            // 检查 Redis 中是否已消费该消息
            Boolean isConsumed = redisTemplate.opsForSet().isMember("consumed_messages", messageId);
            if (isConsumed) {
                // 已消费,直接返回
                return;
            }
    
            try {
                // 业务逻辑,如保存到数据库
                saveToDatabase(message);
                // 将消息 ID 存入 Redis
                redisTemplate.opsForSet().add("consumed_messages", messageId);
            } catch (Exception e) {
                // 处理异常,如记录日志
                e.printStackTrace();
            }
        }
    
        private void saveToDatabase(String message) {
            // 实际数据库保存逻辑
            System.out.println("Saving message to database: " + message);
        }
    }