异常处理深度定制
- 消息发送异常
- 技术点:在发送消息时,使用
SendCallback
接口来处理异步发送的结果。对于同步发送,可以捕获MQClientException
、RemotingException
、MQBrokerException
、InterruptedException
等异常。例如:
DefaultMQProducer producer = new DefaultMQProducer("groupName");
producer.start();
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
try {
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
// 进行详细的异常日志记录
log.error("消息发送异常", e);
// 进行重试逻辑
if (shouldRetry(e)) {
retrySendMessage(msg, producer);
}
}
- 挑战:重试次数和间隔时间的合理设置是个挑战。重试次数过多可能导致资源浪费,间隔时间过长可能影响实时性,过短可能无法解决临时故障。需要通过业务场景和压测来确定合适的值。
- 消息消费异常
- 技术点:实现
MessageListenerConcurrently
或MessageListenerOrderly
接口,在consumeMessage
方法中捕获异常。对于消费失败的消息,可以选择重试或发送到死信队列。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupName");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
// 业务处理逻辑
processMessage(msg);
} catch (Exception e) {
// 记录消费异常日志
log.error("消息消费异常", e);
// 重试逻辑
if (shouldRetry(e)) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else {
// 发送到死信队列
sendToDLQ(msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
- 挑战:区分可重试异常和不可重试异常比较困难,可能需要深入了解业务逻辑。同时,死信队列的管理和监控也需要额外关注,以避免死信队列堆积过多影响系统性能。
容错机制深度定制
- Broker容错
- 技术点:RocketMQ采用主从架构,通过配置多副本(
brokerRole=SYNC_MASTER
或ASYNC_MASTER
)来提高容错能力。可以通过调整刷盘策略(flushDiskType=SYNC_FLUSH
或ASYNC_FLUSH
)来平衡性能和数据可靠性。例如,在broker.conf
文件中配置:
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
- 挑战:同步刷盘和同步主从复制虽然提高了数据可靠性,但会降低系统的写入性能。需要在性能和可靠性之间找到平衡点,并且在主从切换时,可能会存在短暂的数据不一致问题,需要应用层进行一定的补偿处理。
- 网络容错
- 技术点:RocketMQ客户端内置了自动重连机制。可以通过配置
clientChannelMaxIdleTimeSeconds
等参数来优化网络连接的管理。例如,适当减小clientChannelMaxIdleTimeSeconds
的值,使客户端在网络空闲时更快地检测到连接异常并进行重连。
DefaultMQProducer producer = new DefaultMQProducer("groupName");
producer.setClientChannelMaxIdleTimeSeconds(120);
producer.start();
- 挑战:频繁的重连可能会消耗系统资源,影响系统的稳定性。需要根据网络环境的实际情况来调整参数,同时,在重连过程中,消息的发送和消费可能会出现短暂中断,需要在业务层面进行适当的处理,以满足实时性要求。