MST

星途 面试题库

面试题:消息队列RocketMQ顺序消息在高并发场景下如何平衡性能与顺序性

在高并发业务场景中,使用RocketMQ顺序消息时可能会面临性能与顺序性之间的矛盾,你会采取哪些策略来解决这个问题,举例说明
22.6万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

1. 合理分区策略

  • 策略:根据业务逻辑将消息进行合理分区,让相关的消息进入同一个分区。例如在电商订单场景中,按订单号的哈希值分配到不同分区,这样同一订单的创建、支付、发货等消息就会在同一分区内,既保证了同一订单消息的顺序性,又可并行处理不同订单的消息,提升性能。
  • 代码示例(Java 中 RocketMQ 生产者设置分区示例):
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TopicTest" /* Topic */,
    "TagA" /* Tag */,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 根据订单号 hash 分配分区
msg.setShardingKey("orderId123");
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        String shardingKey = (String) arg;
        int hash = shardingKey.hashCode();
        int index = hash % mqs.size();
        return mqs.get(index);
    }
}, "orderId123");
producer.shutdown();

2. 批量消费

  • 策略:消费者端设置合理的批量消费数量,一次从队列中拉取多个消息进行处理。例如在日志记录场景中,将同一业务模块产生的多条日志消息批量消费并写入文件或数据库,在保证顺序性的同时减少拉取消息的次数,提升消费性能。
  • 代码示例(Java 中 RocketMQ 消费者批量消费示例):
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeMessageBatchMaxSize(10);
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 处理消息
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

3. 异步处理

  • 策略:在消费者处理消息时,将一些耗时的操作异步化处理。比如在一个用户注册场景,注册成功消息进入顺序队列,消费者接收到消息后,主流程先完成用户信息入库等核心操作,而发送欢迎邮件等耗时操作通过线程池或其他异步框架进行处理,既保证主流程消息处理顺序性,又不影响整体性能。
  • 代码示例(Java 中使用线程池异步处理消息示例):
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group3");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            executorService.submit(() -> {
                // 异步处理耗时操作
                System.out.println(new String(msg.getBody()));
            });
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

4. 优化消息处理逻辑

  • 策略:简化消息处理逻辑,减少不必要的计算和 I/O 操作。例如在数据统计场景中,将复杂的统计计算逻辑优化,避免重复计算,提高单个消息处理速度,从而在保证顺序性的基础上提升整体性能。
  • 示例:原本每次处理消息都要从数据库读取大量历史数据进行复杂计算,优化后将常用数据缓存起来,减少数据库读取次数,提高计算效率。