生产者确保消息不丢失的措施
- 同步发送并等待确认:采用同步发送方式,生产者会等待MQ服务器的确认响应。只有收到MQ服务器成功接收消息的确认,才认为消息发送成功。例如在Java代码中:
Producer producer = new DefaultMQProducer("group1");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// 消息发送成功处理逻辑
} else {
// 消息发送失败处理逻辑,如重试等
}
producer.shutdown();
- 设置重试机制:当消息发送失败时,生产者可按照设定的重试次数进行重试。默认情况下,RocketMQ生产者会重试2次。可以通过代码修改重试次数,例如:
producer.setRetryTimesWhenSendFailed(3);
- 事务消息:适用于对消息一致性要求较高的场景。生产者发送半消息到MQ服务器,MQ服务器响应成功后,生产者执行本地事务。根据本地事务执行结果,生产者再向MQ服务器发送Commit或Rollback指令。如果MQ服务器长时间未收到Commit或Rollback指令,会回调生产者的checkLocalTransaction方法来检查本地事务状态。例如:
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
producer.shutdown();
消息队列服务器确保消息不丢失的措施
- 持久化机制:RocketMQ采用基于磁盘的文件存储方式来持久化消息。消息到达MQ服务器后,会被写入到CommitLog文件中。同时,为了提高读写性能,采用了异步刷盘和同步刷盘两种策略。
- 同步刷盘:消息写入CommitLog文件后,会等待操作系统将数据真正落盘后才返回成功响应给生产者,保证消息不会因为服务器宕机等原因丢失。配置方式:
<broker>
<flushDiskType>SYNC_FLUSH</flushDiskType>
</broker>
- **异步刷盘**:消息写入CommitLog文件后,立即返回成功响应给生产者,然后由专门的线程将数据异步刷盘。虽然性能较高,但在服务器突然宕机时可能会丢失少量未刷盘的消息。配置方式:
<broker>
<flushDiskType>ASYNC_FLUSH</flushDiskType>
</broker>
- 主从复制:MQ服务器采用主从架构,主节点负责处理消息的读写,从节点会从主节点同步数据。当主节点出现故障时,从节点可以切换为主节点继续提供服务,确保消息不丢失。例如,在配置文件中可以设置主从关系:
<broker>
<brokerId>0</brokerId>
<brokerRole>SYNC_MASTER</brokerRole>
<masterAddr>192.168.1.100:10911</masterAddr>
</broker>
<broker>
<brokerId>1</brokerId>
<brokerRole>SLAVE</brokerRole>
<masterAddr>192.168.1.100:10911</masterAddr>
</broker>
消费者确保消息不丢失的措施
- 手动确认机制:消费者从MQ服务器拉取消息后,需要手动向MQ服务器发送确认消息已消费的指令。只有MQ服务器收到确认后,才会认为该消息已被成功消费。如果消费者在处理消息过程中出现异常,未发送确认消息,MQ服务器会认为消息未被成功消费,会重新将消息投递给其他消费者或再次投递给该消费者。例如在Java代码中:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 处理消息
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
- 消费重试:当消费者消费消息失败时,RocketMQ会根据配置的重试策略进行重试。对于顺序消息,会一直重试直到消费成功;对于并发消息,默认重试16次。可以通过配置修改重试次数等策略,例如:
consumer.setMaxReconsumeTimes(20);