生产者批量发送消息确保一致性和事务性
- 实现原理:
- Kafka通过引入事务ID(Transaction ID)来管理事务。生产者在开始事务前,会向Kafka集群请求一个事务ID。事务ID与生产者的唯一标识(PID,Producer ID)绑定,并且PID在生产者重启后会发生变化,但事务ID保持不变。
- 生产者使用
initTransactions
方法初始化事务,之后可以使用beginTransaction
开启事务,使用send
方法批量发送消息,最后使用commitTransaction
提交事务或者abortTransaction
中止事务。
- 当生产者批量发送消息时,Kafka会将这些消息作为一个事务的一部分进行处理。如果部分消息发送失败,整个事务将不会提交,已成功发送的消息也不会对消费者可见,从而保证了消息的一致性。
- 具体操作方法:
Properties props = new Properties();
props.put("bootstrap.servers", "your - kafka - brokers");
props.put("transactional.id", "my - transaction - id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("my - topic", "key" + i, "value" + i));
}
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.abortTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
e.printStackTrace();
}
消费者批量消费消息确保原子性
- 实现原理:
- Kafka消费者可以使用自动提交偏移量(Auto - commit offsets)或手动提交偏移量(Manual - commit offsets)。在事务性消费场景下,手动提交偏移量结合事务机制来确保原子性。
- 消费者在开启事务后,批量拉取消息并进行处理。处理完成后,消费者将偏移量作为事务的一部分提交到Kafka。如果事务提交成功,偏移量被持久化,Kafka认为这批消息已被成功处理;如果事务失败,偏移量不会提交,下次消费时会再次获取这批消息,从而避免消息丢失。
- 为了避免重复消费,Kafka引入了幂等性生产者(Idempotent Producer)和事务机制结合。幂等性生产者会自动为每条消息生成唯一的标识符,Kafka会根据这些标识符来判断消息是否重复,丢弃重复消息。
- 具体操作方法:
Properties props = new Properties();
props.put("bootstrap.servers", "your - kafka - brokers");
props.put("group.id", "my - group - id");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my - topic"));
try {
consumer.beginTransaction();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.println("Received message: " + record.value());
}
consumer.commitTransaction();
} catch (KafkaException e) {
consumer.abortTransaction();
e.printStackTrace();
}