MST

星途 面试题库

面试题:消息队列RocketMQ的生产者API中如何实现批量发送消息并保证可靠性?

在RocketMQ的生产者开发中,若要批量发送消息,需要注意哪些API细节以确保消息能够可靠地发送到Broker?请描述具体步骤和关键参数设置。
37.1万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

批量发送消息的API细节

  1. 消息大小限制
    • RocketMQ默认情况下,批量消息大小不能超过4MB。在构建批量消息时,需要确保所有消息的总大小不超过此限制。如果消息大小超过限制,会抛出RemotingTooMuchRequestException异常。
  2. 消息集合类型
    • 使用List<Message>集合来存储要批量发送的消息。例如:
    List<Message> messages = new ArrayList<>();
    Message msg1 = new Message("TopicTest", "TagA", "OrderID001", "Hello world 0".getBytes(RemotingHelper.DEFAULT_CHARSET));
    Message msg2 = new Message("TopicTest", "TagA", "OrderID002", "Hello world 1".getBytes(RemotingHelper.DEFAULT_CHARSET));
    messages.add(msg1);
    messages.add(msg2);
    
  3. 发送方法选择
    • 使用生产者的send(List<Message> msgs)方法来批量发送消息。例如:
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.start();
    try {
        SendResult sendResult = producer.send(messages);
        System.out.printf("%s%n", sendResult);
    } catch (Exception e) {
        e.printStackTrace();
    }
    producer.shutdown();
    
  4. 分区策略
    • 批量消息会发送到同一个队列。如果希望自定义队列选择策略,可以实现MessageQueueSelector接口,并在send方法中传入。例如:
    producer.send(messages, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            // 简单示例:根据消息的key选择队列
            long id = (long) arg;
            int index = (int) (id % mqs.size());
            return mqs.get(index);
        }
    }, orderId);
    
  5. 异常处理
    • 批量发送消息时,要正确处理可能抛出的异常。常见异常包括:
      • MQClientException:客户端相关异常,例如生产者未正确启动等。
      • RemotingException:网络通信异常,可能是与Broker连接问题等。
      • MQBrokerException:Broker端异常,例如Broker繁忙等。
      • InterruptedException:线程中断异常。 可以使用如下方式捕获和处理异常:
    try {
        SendResult sendResult = producer.send(messages);
        System.out.printf("%s%n", sendResult);
    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
        e.printStackTrace();
        // 这里可以添加更具体的异常处理逻辑,例如重试等
    }
    

关键参数设置

  1. 生产者组名称
    • DefaultMQProducer构造函数中的producerGroup参数是必须设置的。它用于标识一组生产者,在故障恢复和负载均衡等方面有重要作用。例如:
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    
  2. Name Server地址
    • 可以通过producer.setNamesrvAddr("ip:port")方法设置Name Server的地址。Name Server用于管理Broker的路由信息,生产者需要通过Name Server找到Broker。例如:
    producer.setNamesrvAddr("127.0.0.1:9876");
    
  3. 发送超时时间
    • 可以通过producer.setSendMsgTimeout(int timeout)方法设置发送消息的超时时间,单位是毫秒。默认值是3000(3秒)。如果在这个时间内没有收到Broker的响应,会抛出RemotingException。例如:
    producer.setSendMsgTimeout(5000);
    
  4. 重试次数
    • 可以通过producer.setRetryTimesWhenSendFailed(int times)方法设置消息发送失败时的重试次数。默认是2次。例如:
    producer.setRetryTimesWhenSendFailed(3);
    
    • 注意,重试可能会导致消息重复,在业务上需要考虑幂等性处理。