网络故障处理
- 心跳机制
- 在数据采集节点、边缘计算设备与中心处理平台之间,通过RocketMQ的自定义属性定期发送心跳消息。例如,每30秒发送一次心跳。若接收方在一定时间(如90秒)内未收到心跳,则判定网络连接可能出现问题。
- 心跳消息可设置特殊的Topic,如
heartbeat_topic
,并且在消息体中包含发送方的唯一标识,如节点ID。
- 自动重连
- 当检测到网络故障后,RocketMQ客户端利用其内置的重连机制进行自动重连。在客户端配置文件中设置合适的重连参数,如
retryTimesWhenSendFailed = 5
(表示发送失败时尝试重连5次),retryInterval = 1000
(每次重连间隔1秒)。
- 若重连失败,将故障信息记录到本地日志文件,包括故障时间、目标地址等。同时,启动备用网络连接(若有),例如通过VPN切换到备用网络线路。
节点故障处理
- 节点冗余
- 在数据采集节点和边缘计算设备层面,采用冗余部署。例如,对于关键的数据采集区域,部署多个采集节点,这些节点可以是热备(实时同步数据)或冷备(定期同步数据)方式。
- 在中心处理平台,使用集群方式部署,如RocketMQ的Master - Slave架构。Master节点负责处理主要的消息读写,Slave节点实时同步Master节点的数据,当Master节点故障时,Slave节点能够快速切换为主节点继续提供服务。
- 故障检测与切换
- 通过心跳机制和节点健康检查脚本结合,实时监控节点状态。例如,每5分钟运行一次健康检查脚本,检查节点的CPU、内存、磁盘空间等指标。若发现节点指标异常或心跳丢失,标记该节点为故障节点。
- 对于数据采集节点和边缘计算设备,故障发生时,相邻节点(通过网络拓扑发现)或管理节点会自动将故障节点的任务接管。在中心处理平台,RocketMQ的NameServer会感知到Master节点故障,通过选举算法将Slave节点提升为Master节点,客户端能够自动切换到新的Master节点进行消息交互。
消息丢失处理
- 消息持久化
- RocketMQ采用刷盘机制保证消息持久化。在Broker配置文件中设置
flushDiskType = SYNC_FLUSH
,确保消息在写入内存后立即同步到磁盘,防止因Broker重启等原因导致消息丢失。
- 同时,设置合适的文件存储路径和文件大小限制,如
storePathRootDir = /data/rocketmq/store
,maxMessageSize = 4194304
(4MB,根据实际需求调整)。
- 消息确认与补偿
- 生产者发送消息时,采用同步发送方式并等待Broker的确认。若未收到确认,根据重连和重发机制进行处理。例如:
// 生产者代码示例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("namesrv:9876");
producer.start();
Message msg = new Message("topic", "tag", "key", "message body".getBytes());
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 重发处理
}
producer.shutdown();
- 消费者在处理完消息后,向Broker发送确认消息。若Broker未收到确认,会根据配置的重试策略重新投递消息。消费者端采用幂等性处理,确保重复消费消息不会产生副作用。例如,在处理消息前,先根据消息的唯一标识(如MessageId)查询数据库,判断该消息是否已处理过。
- 对于长时间未确认的消息(可设置超时时间,如10分钟),通过补偿机制重新发送或人工介入处理。可以创建一个专门的补偿任务,定期扫描未确认消息队列,对超时消息进行重新投递或记录异常日志以便后续排查。
系统故障恢复
- 数据恢复
- 对于因节点故障或网络故障导致的数据未及时处理,在故障恢复后,利用RocketMQ的消息回溯功能。例如,设置消息保留时间为7天(
messageRetentionTime = 604800000
),在故障恢复后,消费者可以从故障时间点开始重新消费消息。
- 同时,结合本地缓存和日志记录,恢复故障期间丢失的临时数据。例如,数据采集节点在本地缓存最近100条未发送成功的消息,故障恢复后重新发送。
- 服务恢复
- 故障恢复后,节点按照预定的启动顺序重新启动服务。先启动中心处理平台的RocketMQ集群,确保NameServer、Broker等组件正常运行。然后依次启动边缘计算设备和数据采集节点的相关服务。
- 启动过程中,各节点重新建立与RocketMQ的连接,并根据配置重新订阅相应的Topic。同时,检查节点间的同步状态,对于数据采集节点,若有数据积压,逐步将积压数据发送到中心处理平台,避免瞬间大量数据涌入导致系统再次故障。