MST

星途 面试题库

面试题:Java消息队列在分布式并发系统中的一致性保证

在分布式并发系统中,使用Java消息队列(如RocketMQ)来实现分布式事务,保证数据一致性是个复杂的问题。请详细阐述RocketMQ实现分布式事务的原理,分析可能出现的一致性问题及应对策略。并且给出一个在分布式电商系统中,使用RocketMQ实现订单创建与库存扣减一致性的详细设计方案及核心代码片段。
21.6万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

RocketMQ实现分布式事务的原理

  1. 半消息(Half Message)
    • 生产者首先向RocketMQ发送半消息,这类消息对消费者不可见。例如,在订单创建与库存扣减场景中,订单服务先向RocketMQ发送一条表示要创建订单的半消息。
    • 半消息的作用是预占一个消息位置,为后续的事务处理做准备。
  2. 本地事务执行
    • 发送半消息成功后,生产者执行本地事务,比如订单服务在本地数据库中插入订单记录。
  3. 事务状态回查
    • RocketMQ会定时回查生产者本地事务状态。如果生产者在执行本地事务时因为某些原因(如网络抖动)未能及时向RocketMQ反馈事务状态,RocketMQ会主动询问。
    • 生产者根据本地事务执行结果向RocketMQ返回事务状态(提交、回滚或未知)。如果是提交,消息就会对消费者可见;如果是回滚,消息会被删除;如果是未知,RocketMQ会继续回查。
  4. 消息消费
    • 当消息对消费者可见后,消费者(如库存服务)消费消息并执行相应的业务逻辑,比如扣减库存。

可能出现的一致性问题及应对策略

  1. 消息丢失
    • 问题描述:在事务执行过程中,半消息可能因为网络问题等原因丢失,导致后续事务无法正常进行。
    • 应对策略
      • RocketMQ本身有高可用机制,通过多副本(Master - Slave结构)保证消息不丢失。如果Master节点出现故障,Slave节点可以接替工作。
      • 生产者在发送半消息时,可以设置重试机制,确保消息发送成功。
  2. 事务状态回查失败
    • 问题描述:RocketMQ回查生产者本地事务状态时,可能因为生产者服务不可用等原因导致回查失败,无法确定事务最终状态。
    • 应对策略
      • 生产者服务需要保证高可用性,例如采用集群部署方式。
      • 可以在本地事务执行时,记录详细的日志,以便在回查时能够准确判断事务状态。
  3. 消息重复消费
    • 问题描述:消费者在消费消息时,可能因为网络等原因导致消费确认未及时返回,RocketMQ会重新投递消息,造成重复消费。
    • 应对策略
      • 消费者业务逻辑要实现幂等性。例如,在库存扣减时,可以先查询库存是否已经扣减过,避免重复扣减。
      • 可以使用数据库的唯一约束,在执行库存扣减操作前,通过数据库层面保证操作的唯一性。

分布式电商系统中订单创建与库存扣减一致性的详细设计方案

  1. 系统架构
    • 订单服务:负责接收用户下单请求,创建订单并向RocketMQ发送半消息。执行本地订单创建事务后,向RocketMQ反馈事务状态。
    • 库存服务:监听RocketMQ消息,消费订单创建成功的消息后执行库存扣减操作。
    • RocketMQ:负责存储半消息,回查事务状态,以及向库存服务投递消息。
  2. 流程设计
    • 用户下单,订单服务接收到请求。
    • 订单服务向RocketMQ发送创建订单的半消息。
    • 订单服务执行本地事务,在本地数据库中插入订单记录。
    • 订单服务向RocketMQ反馈事务状态。
    • RocketMQ根据事务状态处理消息,如果是提交,将消息发送给库存服务。
    • 库存服务消费消息,查询库存是否足够,足够则扣减库存,更新库存数据库。

核心代码片段

  1. 订单服务发送半消息及处理事务
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.*;

public class OrderService {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));
        producer.setExecutorService(executorService);
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地订单创建事务
                try {
                    // 模拟订单创建操作,如插入数据库
                    System.out.println("执行本地订单创建事务");
                    return LocalTransactionState.COMMIT_MESSAGE;
                } catch (Exception e) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 事务状态回查
                System.out.println("事务状态回查");
                // 根据本地事务记录判断事务状态
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        Message message = new Message("transaction_topic", "创建订单".getBytes());
        producer.sendMessageInTransaction(message, null);
        // 防止主线程退出
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}
  1. 库存服务消费消息扣减库存
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class StockService {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("stock_consumer_group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("transaction_topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    // 模拟库存扣减操作
                    System.out.println("收到消息,执行库存扣减操作:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("库存服务消费者启动");
    }
}