MST

星途 面试题库

面试题:消息队列RocketMQ生产者中如何处理消息发送失败的情况

在RocketMQ生产者(Producer)使用API发送消息时,如果遇到消息发送失败,按照最佳实践,应该从哪些方面进行处理,简述处理流程和可能用到的API方法。
26.1万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试
  1. 处理流程
    • 重试发送
      • 立即重试一定次数(例如3次)。这是因为网络抖动等瞬时性问题可能导致消息发送失败,重试可能解决问题。
      • 在每次重试之间可以设置一个短暂的时间间隔(如100 - 500毫秒),避免短时间内大量重试对系统造成压力。
    • 检查配置
      • 检查生产者的配置,如NameServer地址是否正确,网络连接是否正常。如果NameServer地址错误,消息无法正确路由到Broker。
      • 确认Topic是否存在且权限配置正确。如果Topic不存在或没有权限,会导致消息发送失败。
    • 检查消息内容
      • 确认消息大小是否超过限制。RocketMQ对单个消息大小有限制(默认4M),如果超过限制,需要对消息进行拆分处理。
      • 检查消息体是否符合业务逻辑和编码要求,例如消息体是否为正确的序列化格式。
    • 报警与记录
      • 如果经过重试和检查配置、消息内容后仍然发送失败,记录详细的错误日志,包括错误信息、消息内容、发送时间等,方便后续排查问题。
      • 同时触发报警,通知相关运维或开发人员进行处理。
  2. 可能用到的API方法
    • 重试发送:在RocketMQ的Java客户端中,DefaultMQProducer类的send方法本身支持重试机制。可以通过设置DefaultMQProducerretryTimesWhenSendFailed属性来设置重试次数,例如:
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setRetryTimesWhenSendFailed(3);
- **检查配置**:
    - 可以通过`DefaultMQProducer`的`setNamesrvAddr`方法来设置和检查NameServer地址,例如:
producer.setNamesrvAddr("127.0.0.1:9876");
    - 关于Topic的权限和存在性检查,需要借助RocketMQ的管理工具或者相关的Admin API(如`DefaultMQAdminExt`类)来进行检查。
- **检查消息内容**:
    - 可以在发送消息前获取消息大小,判断是否超过限制,例如:
Message msg = new Message("TopicTest" /* Topic */,
                          "TagA" /* Tag */,
                          ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
if (msg.getBody().length > 4 * 1024 * 1024) {
    // 处理消息过大的情况,如拆分消息
}
- **报警与记录**:使用日志框架(如Log4j、SLF4J等)记录错误日志,例如使用SLF4J记录发送消息失败的日志:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

private static final Logger logger = LoggerFactory.getLogger(Producer.class);

try {
    SendResult sendResult = producer.send(msg);
} catch (Exception e) {
    logger.error("Send message failed", e);
}

报警部分通常会使用一些监控报警系统的SDK,如钉钉机器人报警SDK等,这里暂不详细展开具体代码。