面试题答案
一键面试架构设计
- 生产者:在电商订单处理系统中,订单生成模块作为生产者,将包含订单金额、下单时间、商品类别等完整信息的订单消息发送到RocketMQ集群。每个订单消息是一个完整的业务数据单元。
- RocketMQ集群:负责接收、存储和转发消息。可以采用多Broker部署,提高系统的可靠性和消息处理能力。同时,配置合适的Topic和Tag来初步区分不同类型的消息。
- 消费者:各个业务模块作为消费者,根据自身需求从RocketMQ中订阅特定的消息。例如,财务模块可能只关心订单金额大于一定数值的消息;库存模块可能关注特定商品类别的订单消息。
消息模型
- Topic:定义一个通用的订单Topic,比如 “order - topic”,用于接收所有订单相关消息。
- Tag:根据商品类别定义Tag,如 “tag - electronics”(电子产品)、“tag - clothing”(服装)等。同时,也可以结合其他维度定义Tag,如 “tag - high - amount”(高金额订单)等。
- 消息体:消息体中封装完整的订单信息,包括订单金额、下单时间、商品类别、订单ID等详细数据。
代码实现思路
- 生产者代码
// 引入RocketMQ依赖 import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class OrderProducer { public static void main(String[] args) throws Exception { // 创建生产者实例 DefaultMQProducer producer = new DefaultMQProducer("group - order - producer"); // 设置NameServer地址 producer.setNamesrvAddr("localhost:9876"); // 启动生产者 producer.start(); // 构造订单消息 String orderInfo = "{\"orderId\":\"123456\",\"orderAmount\":100.0,\"orderTime\":\"2023 - 01 - 01 12:00:00\",\"productCategory\":\"electronics\"}"; Message message = new Message("order - topic", "tag - electronics", orderInfo.getBytes("UTF - 8")); // 发送消息 SendResult sendResult = producer.send(message); System.out.println("Send result: " + sendResult); // 关闭生产者 producer.shutdown(); } }
- 消费者代码
// 引入RocketMQ依赖 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class OrderConsumer { public static void main(String[] args) throws Exception { // 创建消费者实例 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group - order - consumer"); // 设置NameServer地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅Topic和Tag consumer.subscribe("order - topic", "tag - electronics"); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { String messageBody = new String(msg.getBody()); System.out.println("Received message: " + messageBody); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } }
可能面临的挑战和应对策略
- 复杂过滤条件
- 挑战:实际业务中过滤条件可能非常复杂,仅靠Tag无法满足,比如需要同时根据订单金额和下单时间过滤。
- 策略:可以使用SQL92表达式进行消息过滤。在RocketMQ配置中开启SQL过滤功能,生产者发送消息时设置相关属性,消费者订阅时使用SQL表达式。例如:
// 生产者设置属性 message.putUserProperty("orderAmount", "100.0"); message.putUserProperty("orderTime", "2023 - 01 - 01 12:00:00"); // 消费者订阅SQL表达式 consumer.subscribe("order - topic", MessageSelector.bySql("orderAmount > 50 AND orderTime > '2023 - 01 - 01 10:00:00'"));
- 性能问题
- 挑战:复杂过滤和大量订阅可能导致RocketMQ性能下降。
- 策略:合理设计Topic和Tag,避免过度复杂的过滤逻辑。对热点消息进行拆分,采用分布式消费方式提高处理速度。同时,定期监控RocketMQ集群性能指标,进行调优。
- 数据一致性
- 挑战:订单处理过程中可能涉及多个业务模块,消息处理顺序和结果可能影响数据一致性。
- 策略:使用事务消息确保订单消息在多个业务模块间的一致性处理。生产者发送事务消息,RocketMQ保证消息要么全部被消费,要么全部不被消费。同时,在业务模块中实现幂等性处理,避免重复消费导致的数据不一致。