MST

星途 面试题库

面试题:消息队列RocketMQ如何确保消息不丢失以保障可靠性投递

在RocketMQ中,从生产者、消息队列服务器、消费者这几个环节分析,采取了哪些措施来确保消息不会丢失,从而保障消息的可靠性投递?
47.4万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

生产者确保消息不丢失的措施

  1. 同步发送并等待确认:采用同步发送方式,生产者会等待MQ服务器的确认响应。只有收到MQ服务器成功接收消息的确认,才认为消息发送成功。例如在Java代码中:
Producer producer = new DefaultMQProducer("group1");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
    // 消息发送成功处理逻辑
} else {
    // 消息发送失败处理逻辑,如重试等
}
producer.shutdown();
  1. 设置重试机制:当消息发送失败时,生产者可按照设定的重试次数进行重试。默认情况下,RocketMQ生产者会重试2次。可以通过代码修改重试次数,例如:
producer.setRetryTimesWhenSendFailed(3);
  1. 事务消息:适用于对消息一致性要求较高的场景。生产者发送半消息到MQ服务器,MQ服务器响应成功后,生产者执行本地事务。根据本地事务执行结果,生产者再向MQ服务器发送Commit或Rollback指令。如果MQ服务器长时间未收到Commit或Rollback指令,会回调生产者的checkLocalTransaction方法来检查本地事务状态。例如:
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        return LocalTransactionState.COMMIT_MESSAGE;
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 检查本地事务状态
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
producer.shutdown();

消息队列服务器确保消息不丢失的措施

  1. 持久化机制:RocketMQ采用基于磁盘的文件存储方式来持久化消息。消息到达MQ服务器后,会被写入到CommitLog文件中。同时,为了提高读写性能,采用了异步刷盘和同步刷盘两种策略。
    • 同步刷盘:消息写入CommitLog文件后,会等待操作系统将数据真正落盘后才返回成功响应给生产者,保证消息不会因为服务器宕机等原因丢失。配置方式:
<broker>
    <flushDiskType>SYNC_FLUSH</flushDiskType>
</broker>
- **异步刷盘**:消息写入CommitLog文件后,立即返回成功响应给生产者,然后由专门的线程将数据异步刷盘。虽然性能较高,但在服务器突然宕机时可能会丢失少量未刷盘的消息。配置方式:
<broker>
    <flushDiskType>ASYNC_FLUSH</flushDiskType>
</broker>
  1. 主从复制:MQ服务器采用主从架构,主节点负责处理消息的读写,从节点会从主节点同步数据。当主节点出现故障时,从节点可以切换为主节点继续提供服务,确保消息不丢失。例如,在配置文件中可以设置主从关系:
<broker>
    <brokerId>0</brokerId>
    <brokerRole>SYNC_MASTER</brokerRole>
    <masterAddr>192.168.1.100:10911</masterAddr>
</broker>
<broker>
    <brokerId>1</brokerId>
    <brokerRole>SLAVE</brokerRole>
    <masterAddr>192.168.1.100:10911</masterAddr>
</broker>

消费者确保消息不丢失的措施

  1. 手动确认机制:消费者从MQ服务器拉取消息后,需要手动向MQ服务器发送确认消息已消费的指令。只有MQ服务器收到确认后,才会认为该消息已被成功消费。如果消费者在处理消息过程中出现异常,未发送确认消息,MQ服务器会认为消息未被成功消费,会重新将消息投递给其他消费者或再次投递给该消费者。例如在Java代码中:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 处理消息
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
  1. 消费重试:当消费者消费消息失败时,RocketMQ会根据配置的重试策略进行重试。对于顺序消息,会一直重试直到消费成功;对于并发消息,默认重试16次。可以通过配置修改重试次数等策略,例如:
consumer.setMaxReconsumeTimes(20);