面试题答案
一键面试策略和技术手段
- 单队列单消费者:使用单个消息队列,只配置一个消费者。这样所有消息按顺序进入队列,消费者按顺序消费,能确保严格顺序。但缺点是无法利用多消费者并行处理提高效率,适用于并发量不是特别高的场景。
- 分区和路由:对消息进行分区,将相关联的消息路由到同一个分区,每个分区配置一个消费者。例如,根据订单ID取模将同一订单相关的创建、支付、发货等消息发送到固定分区。在Kafka中,通过自定义分区器实现,消费者组内每个消费者负责消费一个或多个分区,保证每个分区内消息顺序消费。
- 消息标记和重试:为消息添加顺序标记,如序列号。消费者消费消息时检查序列号连续性,若出现乱序,记录缺失消息并进行重试。可结合死信队列,将无法按序消费的消息发送到死信队列,后续人工处理或定时重试。例如RabbitMQ可通过设置消息属性和死信队列相关参数实现。
- 事务机制:部分消息队列支持事务,如RocketMQ。在发送一组相关消息时,使用事务确保要么所有消息都成功发送并按顺序进入队列,要么都失败回滚。消费者在事务内按顺序消费消息,保证业务逻辑一致性。
常见消息队列产品具体实现思路
- Kafka
- 分区和路由:实现自定义分区器,根据订单ID等业务标识计算分区号,将同一订单的所有消息发送到同一分区。
public class OrderIdPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int numPartitions = cluster.partitionsForTopic(topic).size(); return Math.abs(key.hashCode()) % numPartitions; } @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }
- 消费者组:配置消费者组,每个消费者组内消费者数量与分区数量相同,且每个消费者只负责消费一个分区的消息。
- RabbitMQ
- 单队列单消费者:创建单个队列,只启动一个消费者监听该队列。
- 消息标记和重试:在消息属性中添加序列号,消费者消费时校验序列号。利用RabbitMQ的死信队列,将乱序消息发送到死信队列。
// 生产者发送消息设置序列号 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .headers(Collections.singletonMap("sequenceId", sequenceId)) .build(); channel.basicPublish("", queueName, properties, message.getBytes("UTF-8")); // 消费者校验序列号 String sequenceId = (String) delivery.getProperties().getHeaders().get("sequenceId"); // 校验逻辑
- RocketMQ
- 事务机制:生产者使用事务消息发送,确保一组相关消息顺序进入队列。
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group"); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 本地事务执行逻辑 return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 事务状态检查逻辑 return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); Message message = new Message("topic", "tags", "keys", "message body".getBytes()); SendResult sendResult = producer.sendMessageInTransaction(message, null);
- 顺序消费:消费者通过设置
MessageListenerOrderly
实现顺序消费。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); consumer.subscribe("topic", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 按顺序消费消息逻辑 return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start();