生产者端保证消息幂等性
- 策略:
- 使用Kafka生产者的幂等性特性。Kafka从0.11.0.0版本开始引入了生产者幂等性,通过给每个Producer在每个Topic - Partition上分配一个唯一的PID(Producer ID),并使用单调递增的序列号(Sequence Number)来保证消息的幂等性。
- 代码示例:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class IdempotentProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 开启幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key1", "value1");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
- 原理分析:
- 当生产者发送消息时,Kafka会为每个消息分配一个序列号,这个序列号在PID和Topic - Partition的范围内是唯一的。
- Kafka Broker端会缓存最近的序列号,如果接收到的消息序列号小于或等于缓存中的序列号,则会丢弃该消息,从而保证了消息在Broker端的幂等性。即使生产者因为网络问题等原因重复发送消息,Broker也只会处理一次。
消费者端保证消息幂等性
- 策略:
- 使用唯一标识和去重表:在消息中添加一个唯一标识(如UUID),消费者消费消息时,先查询去重表(可以是数据库表等),如果该唯一标识已经存在,则不处理该消息;如果不存在,则处理消息并将唯一标识插入去重表。
- 事务性处理:对于支持事务的存储系统(如JDBC数据库),消费者在处理消息时,将消息处理逻辑和去重逻辑放在一个事务中,保证原子性。
- 代码示例(以使用唯一标识和去重表为例,基于Spring Boot和JPA):
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@Entity
public class MessageRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String messageId;
// 其他消息相关字段
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
}
import org.springframework.data.jpa.repository.JpaRepository;
public interface MessageRecordRepository extends JpaRepository<MessageRecord, Long> {
boolean existsByMessageId(String messageId);
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@Autowired
private MessageRecordRepository messageRecordRepository;
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consume(String message) {
String messageId = "这里假设从消息中解析出唯一标识";
if (!messageRecordRepository.existsByMessageId(messageId)) {
// 处理消息逻辑
System.out.println("Consumed message: " + message);
MessageRecord record = new MessageRecord();
record.setMessageId(messageId);
messageRecordRepository.save(record);
}
}
}
- 原理分析:
- 使用唯一标识和去重表:通过唯一标识,消费者可以识别重复消息。去重表起到了缓存已处理消息标识的作用,避免对重复消息进行重复处理。
- 事务性处理:将消息处理和去重操作放在事务中,保证了要么都成功(处理消息并记录已处理标识),要么都失败,避免部分处理的情况,从而实现幂等性。