面试题答案
一键面试Kafka
- 消息不丢失:
- 生产者:
- 设置
acks = all
,生产者会等待 ISR(In - Sync Replicas,与 leader 保持同步的副本集合)中的所有副本都确认收到消息后才认为消息发送成功。例如:
- 设置
- 生产者:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
Producer<String, String> producer = new KafkaProducer<>(props);
- 启用重试机制,当消息发送失败时,生产者会自动重试。如上述代码中 `retries` 设置为 3 表示最多重试 3 次。
- 消费者:
- 采用手动提交位移的方式,在消息处理完成后再提交位移。例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test - group");
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test - topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.println("Received message: " + record.value());
}
consumer.commitSync();
}
- 消息不重复:
- Kafka 0.11 版本引入了幂等性生产者。设置
enable.idempotence = true
,生产者会自动为每条消息生成唯一的标识符,Kafka 内部会确保相同标识符的消息只被写入一次。例如:
- Kafka 0.11 版本引入了幂等性生产者。设置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", "true");
Producer<String, String> producer = new KafkaProducer<>(props);
- 消息顺序处理:
- 将需要顺序处理的消息发送到同一个分区,消费者按分区顺序消费。例如,假设订单消息需要按订单号顺序处理,可以将订单号作为分区键:
ProducerRecord<String, String> record = new ProducerRecord<>("order - topic", orderId, orderMessage);
producer.send(record);
- 单个消费者从一个分区消费消息,就能保证消息按顺序处理。
RabbitMQ
- 消息不丢失:
- 生产者:
- 开启事务机制,生产者发送消息前开启事务
channel.txSelect()
,消息发送成功后提交事务channel.txCommit()
,如果发送失败则回滚事务channel.txRollback()
。例如:
- 开启事务机制,生产者发送消息前开启事务
- 生产者:
Channel channel = connection.createChannel();
channel.txSelect();
channel.basicPublish("", "test - queue", null, "Hello World!".getBytes("UTF - 8"));
channel.txCommit();
- 采用 confirm 机制,生产者设置 `channel.confirmSelect()`,然后通过 `addConfirmListener` 监听消息的确认结果。例如:
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 消息确认成功
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 消息确认失败
}
});
channel.basicPublish("", "test - queue", null, "Hello World!".getBytes("UTF - 8"));
- 消费者:
- 设置
autoAck = false
,消费者手动确认消息,在消息处理完成后调用channel.basicAck(deliveryTag, false)
确认消息。例如:
- 设置
Channel channel = connection.createChannel();
channel.basicConsume("test - queue", false, "my - consumer - tag",
(consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF - 8");
// 处理消息
System.out.println("Received message: " + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> { });
- 消息不重复:
- RabbitMQ 本身不保证消息绝对不重复,但消费者可以通过在消息中添加唯一标识符,在处理消息前先检查是否已经处理过该标识符对应的消息,来避免重复处理。例如,可以在消息头中添加 UUID:
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(Collections.singletonMap("message - id", UUID.randomUUID().toString()))
.build();
channel.basicPublish("", "test - queue", props, "Hello World!".getBytes("UTF - 8"));
在消费者端:
Channel channel = connection.createChannel();
channel.basicConsume("test - queue", false, "my - consumer - tag",
(consumerTag, delivery) -> {
String messageId = (String) delivery.getProperties().getHeaders().get("message - id");
// 检查 messageId 是否已处理
if (!isProcessed(messageId)) {
String message = new String(delivery.getBody(), "UTF - 8");
// 处理消息
System.out.println("Received message: " + message);
markAsProcessed(messageId);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} else {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
},
consumerTag -> { });
- 消息顺序处理:
- 确保消息发送到同一个队列,并且使用单个消费者从该队列消费消息。例如,在声明队列时:
Channel channel = connection.createChannel();
channel.queueDeclare("order - queue", false, false, false, null);
然后使用单个消费者消费:
channel.basicConsume("order - queue", false, "my - consumer - tag",
(consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF - 8");
// 处理消息
System.out.println("Received message: " + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> { });