面试题答案
一键面试实现思路
- 消息唯一标识:为每个消息分配一个唯一标识符。Kafka 中的消息本身就有 offset,但为了业务层面的幂等性,通常在消息体中添加一个业务唯一 ID,如订单号等。
- 消费记录存储:使用外部存储(如数据库、Redis 等)来记录已经消费成功的消息 ID。每次消费消息时,先检查该消息 ID 是否已在存储中。
- 业务操作原子性:将业务操作设计成原子操作,确保在重复消费时,即使检查到消息已消费,业务操作也不会出现不一致情况。例如,在数据库操作中使用事务。
涉及的数据结构
- 数据库表:
- 若使用关系型数据库,创建一张表,例如
consumed_messages
,包含字段message_id
(消息唯一标识,作为主键),consume_time
(消费时间,可用于统计等)。 - 若使用 Redis,可以使用
SET
数据结构,SET
中的每个元素是已消费消息的 ID。
- 若使用关系型数据库,创建一张表,例如
关键代码片段(以 Java 结合 Kafka 和 Redis 为例)
- 引入依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring - kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring - boot - starter - data - redis</artifactId> </dependency>
- Kafka 消费者配置:
@Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my - group"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(props); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
- 幂等性消费逻辑:
@Service public class KafkaConsumerService { @Autowired private RedisTemplate<String, String> redisTemplate; @KafkaListener(topics = "my - topic", groupId = "my - group") public void consume(String message) { // 假设消息格式为 json,解析出消息唯一 ID JSONObject jsonObject = JSON.parseObject(message); String messageId = jsonObject.getString("message_id"); // 检查 Redis 中是否已消费该消息 Boolean isConsumed = redisTemplate.opsForSet().isMember("consumed_messages", messageId); if (isConsumed) { // 已消费,直接返回 return; } try { // 业务逻辑,如保存到数据库 saveToDatabase(message); // 将消息 ID 存入 Redis redisTemplate.opsForSet().add("consumed_messages", messageId); } catch (Exception e) { // 处理异常,如记录日志 e.printStackTrace(); } } private void saveToDatabase(String message) { // 实际数据库保存逻辑 System.out.println("Saving message to database: " + message); } }