MST

星途 面试题库

面试题:消息队列在异步通知系统中的消息可靠性保障

在一个使用消息队列构建的异步通知系统里,如何确保消息不丢失、不重复,并且按照顺序被处理?请结合你熟悉的消息队列产品(如 Kafka、RabbitMQ 等)说明具体的实现机制。
22.2万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

Kafka

  1. 消息不丢失
    • 生产者
      • 设置 acks = all,生产者会等待 ISR(In - Sync Replicas,与 leader 保持同步的副本集合)中的所有副本都确认收到消息后才认为消息发送成功。例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
Producer<String, String> producer = new KafkaProducer<>(props);
 - 启用重试机制,当消息发送失败时,生产者会自动重试。如上述代码中 `retries` 设置为 3 表示最多重试 3 次。
  • 消费者
    • 采用手动提交位移的方式,在消息处理完成后再提交位移。例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test - group");
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test - topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息
        System.out.println("Received message: " + record.value());
    }
    consumer.commitSync();
}
  1. 消息不重复
    • Kafka 0.11 版本引入了幂等性生产者。设置 enable.idempotence = true,生产者会自动为每条消息生成唯一的标识符,Kafka 内部会确保相同标识符的消息只被写入一次。例如:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", "true");
Producer<String, String> producer = new KafkaProducer<>(props);
  1. 消息顺序处理
    • 将需要顺序处理的消息发送到同一个分区,消费者按分区顺序消费。例如,假设订单消息需要按订单号顺序处理,可以将订单号作为分区键:
ProducerRecord<String, String> record = new ProducerRecord<>("order - topic", orderId, orderMessage);
producer.send(record);
  • 单个消费者从一个分区消费消息,就能保证消息按顺序处理。

RabbitMQ

  1. 消息不丢失
    • 生产者
      • 开启事务机制,生产者发送消息前开启事务 channel.txSelect(),消息发送成功后提交事务 channel.txCommit(),如果发送失败则回滚事务 channel.txRollback()。例如:
Channel channel = connection.createChannel();
channel.txSelect();
channel.basicPublish("", "test - queue", null, "Hello World!".getBytes("UTF - 8"));
channel.txCommit();
 - 采用 confirm 机制,生产者设置 `channel.confirmSelect()`,然后通过 `addConfirmListener` 监听消息的确认结果。例如:
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        // 消息确认成功
    }
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        // 消息确认失败
    }
});
channel.basicPublish("", "test - queue", null, "Hello World!".getBytes("UTF - 8"));
  • 消费者
    • 设置 autoAck = false,消费者手动确认消息,在消息处理完成后调用 channel.basicAck(deliveryTag, false) 确认消息。例如:
Channel channel = connection.createChannel();
channel.basicConsume("test - queue", false, "my - consumer - tag",
        (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF - 8");
            // 处理消息
            System.out.println("Received message: " + message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        },
        consumerTag -> { });
  1. 消息不重复
    • RabbitMQ 本身不保证消息绝对不重复,但消费者可以通过在消息中添加唯一标识符,在处理消息前先检查是否已经处理过该标识符对应的消息,来避免重复处理。例如,可以在消息头中添加 UUID:
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
      .headers(Collections.singletonMap("message - id", UUID.randomUUID().toString()))
      .build();
channel.basicPublish("", "test - queue", props, "Hello World!".getBytes("UTF - 8"));

在消费者端:

Channel channel = connection.createChannel();
channel.basicConsume("test - queue", false, "my - consumer - tag",
        (consumerTag, delivery) -> {
            String messageId = (String) delivery.getProperties().getHeaders().get("message - id");
            // 检查 messageId 是否已处理
            if (!isProcessed(messageId)) {
                String message = new String(delivery.getBody(), "UTF - 8");
                // 处理消息
                System.out.println("Received message: " + message);
                markAsProcessed(messageId);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        },
        consumerTag -> { });
  1. 消息顺序处理
    • 确保消息发送到同一个队列,并且使用单个消费者从该队列消费消息。例如,在声明队列时:
Channel channel = connection.createChannel();
channel.queueDeclare("order - queue", false, false, false, null);

然后使用单个消费者消费:

channel.basicConsume("order - queue", false, "my - consumer - tag",
        (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF - 8");
            // 处理消息
            System.out.println("Received message: " + message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        },
        consumerTag -> { });