面试题答案
一键面试实现思路
- 利用消息属性:在发送订单消息时,为消息设置一个属性来标识订单类型,如“orderType”,值分别为“普通订单”“团购订单”“秒杀订单”。
- 消费组设计:针对不同订单类型,分别创建对应的消费组,如“ordinary_order_group”用于处理普通订单,“group_buy_order_group”用于处理团购订单,“seckill_order_group”用于处理秒杀订单。
- 消息过滤:RocketMQ支持在消费端基于消息属性进行过滤,消费组只接收处理符合自身订单类型的消息。
关键步骤
- 发送端:
- 引入依赖:在Maven项目的
pom.xml
文件中引入RocketMQ的客户端依赖。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.4</version> </dependency>
- 设置消息属性:在发送消息时,根据订单类型设置消息属性。
DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); // 假设order为订单对象,order.getType()获取订单类型 String orderType = order.getType(); Message msg = new Message("OrderTopic", "TagA", ("订单内容").getBytes(RemotingHelper.DEFAULT_CHARSET)); msg.putUserProperty("orderType", orderType); SendResult sendResult = producer.send(msg); producer.shutdown();
- 引入依赖:在Maven项目的
- 消费端:
- 创建消费组:为不同订单类型创建消费组。
DefaultMQPushConsumer ordinaryConsumer = new DefaultMQPushConsumer("ordinary_order_group"); ordinaryConsumer.setNamesrvAddr("localhost:9876"); ordinaryConsumer.subscribe("OrderTopic", "TagA"); ordinaryConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (Message msg : msgs) { String orderType = msg.getUserProperty("orderType"); if ("普通订单".equals(orderType)) { // 处理普通订单逻辑 } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); ordinaryConsumer.start(); DefaultMQPushConsumer groupBuyConsumer = new DefaultMQPushConsumer("group_buy_order_group"); groupBuyConsumer.setNamesrvAddr("localhost:9876"); groupBuyConsumer.subscribe("OrderTopic", "TagA"); groupBuyConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (Message msg : msgs) { String orderType = msg.getUserProperty("orderType"); if ("团购订单".equals(orderType)) { // 处理团购订单逻辑 } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); groupBuyConsumer.start(); DefaultMQPushConsumer seckillConsumer = new DefaultMQPushConsumer("seckill_order_group"); seckillConsumer.setNamesrvAddr("localhost:9876"); seckillConsumer.subscribe("OrderTopic", "TagA"); seckillConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (Message msg : msgs) { String orderType = msg.getUserProperty("orderType"); if ("秒杀订单".equals(orderType)) { // 处理秒杀订单逻辑 } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); seckillConsumer.start();
- 服务器端配置:确保RocketMQ服务器正常运行,并且配置了合适的Topic和相应的权限,使得生产者能够发送消息,消费者能够订阅和接收消息。