面试题答案
一键面试- 生产者端:
- 生产者在发送消息时,可以通过设置
MessageQueueSelector
来将同一订单相关的消息发送到同一个队列。例如,根据订单ID进行路由,确保相同订单ID的消息都发送到相同的队列。示例代码如下(以Java为例):
DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); producer.setNamesrvAddr("namesrvAddr"); producer.start(); Message msg1 = new Message("OrderTopic", "TagA", "order1".getBytes(RemotingHelper.DEFAULT_CHARSET)); Message msg2 = new Message("OrderTopic", "TagA", "order2".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 1); // 假设订单ID为1 SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 2); // 假设订单ID为2 producer.shutdown();
- 生产者在发送消息时,可以通过设置
- 消费者端:
- RocketMQ消费者采用单线程消费一个队列中的消息。当消费者启动时,在消费同一队列的消息时,按照消息进入队列的顺序依次处理。例如,如果订单创建、订单支付、订单完成的消息都在同一个队列中,消费者会先处理订单创建消息,然后处理订单支付消息,最后处理订单完成消息。
- 消费者可以通过设置
ConsumeFromWhere
来确定从队列的哪个位置开始消费,一般默认ConsumeFromLastOffset
从队列末尾开始消费新消息。示例代码如下(以Java为例):
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setNamesrvAddr("namesrvAddr"); consumer.subscribe("OrderTopic", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { // 处理订单相关业务逻辑 System.out.println("Consume message: " + new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start();
通过以上生产者端将相关消息发送到同一队列,以及消费者端单线程消费队列消息的机制,RocketMQ可以在局部范围内(即同一个队列)保障订单创建、订单支付、订单完成等消息的顺序消费。