MST

星途 面试题库

面试题:消息队列之RocketMQ在微服务架构中的基本应用

在微服务架构中,RocketMQ如何实现解耦和异步通信?请举例说明。
30.5万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

解耦实现

  1. 生产者与消费者分离
    • 在微服务架构中,假设存在订单微服务和库存微服务。订单微服务作为生产者,当有新订单生成时,它只需将订单相关消息发送到RocketMQ的指定主题(如“order - topic”),而无需关心哪个微服务(库存微服务)会消费这些消息以及如何消费。库存微服务作为消费者,从“order - topic”主题订阅消息并进行库存扣减等操作。这样订单微服务和库存微服务之间就实现了解耦,即使库存微服务出现故障或进行升级改造,订单微服务依然可以正常生成订单并发送消息。
  2. 消息队列缓冲
    • RocketMQ的队列机制起到缓冲作用。例如,在电商大促期间,订单生成量剧增,订单微服务持续向RocketMQ发送订单消息。库存微服务由于处理能力有限,不能立即处理完所有消息。RocketMQ会将这些消息暂存在队列中,库存微服务按照自身处理能力从队列中逐步拉取消息进行处理,避免订单微服务因库存微服务处理不及时而阻塞。

异步通信实现

  1. 异步发送消息
    • 订单微服务可以采用异步方式向RocketMQ发送订单消息。例如在Java代码中,可以使用如下方式:
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("namesrvAddr");
producer.start();
Message msg = new Message("order - topic", "TagA", "Order1".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("异步发送成功: " + sendResult);
    }

    @Override
    public void onException(Throwable e) {
        System.out.println("异步发送失败: " + e);
    }
});
producer.shutdown();
  • 订单微服务发送消息后无需等待消息处理结果,可继续执行其他业务逻辑,如返回订单生成成功给用户等操作。库存微服务在消息到达队列后,异步地从队列中拉取消息并处理,实现了两个微服务之间的异步通信。
  1. 消费者异步处理
    • 库存微服务从RocketMQ拉取到订单消息后,也可以异步处理。比如库存微服务内部使用线程池来处理消息,避免单个消息处理时间过长阻塞后续消息的拉取。示例代码如下(伪代码):
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("namesrvAddr");
consumer.subscribe("order - topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            executorService.submit(() -> {
                // 处理库存扣减等业务逻辑
            });
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
  • 这样库存微服务在处理消息时也是异步的,提高了整体系统的并发处理能力和响应速度。