MST

星途 面试题库

面试题:消息队列:在复杂业务场景下,RocketMQ与Dubbo集成时如何保证数据的最终一致性

在实际的复杂业务场景中,涉及多个Dubbo服务通过RocketMQ进行消息交互,可能存在部分消息处理失败、网络波动等情况。请详细阐述在这种情况下,如何设计和实现一套机制,确保数据在多个服务之间能够达到最终一致性,包括但不限于消息补偿机制、分布式事务处理等方面的设计思路和具体实现方式。
25.5万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

消息补偿机制

  1. 设计思路
    • 记录消息处理状态:每个服务在处理消息时,记录消息处理结果到本地持久化存储(如数据库),状态可分为“处理中”“处理成功”“处理失败”等。
    • 定时任务检查:通过定时任务,定期扫描处理状态为“处理中”或“处理失败”的消息记录。对于“处理中”的消息,判断是否超时,若超时则认为处理失败;对于“处理失败”的消息,准备进行补偿。
    • 重试策略:根据不同的失败原因设置不同的重试策略,如固定间隔重试、指数退避重试等。对于因网络波动等临时性问题导致的失败,可采用多次重试的方式;对于业务逻辑错误导致的失败,可人工介入修正后再重试。
  2. 具体实现方式
    • 数据库表设计:创建一张消息处理记录表,包含消息ID、消息内容、所属业务标识、处理状态、重试次数、最后处理时间等字段。
    • 定时任务:使用Quartz等定时任务框架,定时执行消息检查逻辑。在任务中查询处理状态为“处理中”且超过一定时间(如5分钟)的消息,将其状态更新为“处理失败”;查询处理状态为“处理失败”且重试次数未超过上限(如3次)的消息,重新发送到RocketMQ队列进行处理。
    • 重试逻辑:在消息处理服务中,捕获处理消息过程中的异常,根据异常类型判断是否可重试。若可重试,增加重试次数,按照重试策略进行延迟后重新发送消息。例如,使用指数退避重试时,每次重试间隔时间翻倍。

分布式事务处理

  1. 设计思路
    • 基于可靠消息最终一致性方案
      • 本地消息表:在每个Dubbo服务本地数据库创建消息表,服务在执行本地业务逻辑前,先向本地消息表插入一条待发送消息记录,状态为“待发送”。业务逻辑执行成功后,将消息状态更新为“已发送”,同时发送消息到RocketMQ。若业务逻辑执行失败,删除本地消息表记录。
      • RocketMQ事务消息:利用RocketMQ的事务消息机制,发送半消息(Prepared消息),RocketMQ返回消息ID。服务执行本地业务逻辑,根据业务结果向RocketMQ发送Commit或Rollback指令。若服务崩溃等原因导致未发送Commit或Rollback指令,RocketMQ会回调服务的事务状态回查接口,服务根据本地业务执行结果返回相应状态。
    • TCC(Try - Confirm - Cancel)模式
      • Try阶段:每个服务对资源进行检测和预留操作,如扣减库存时,先检查库存是否足够并冻结库存,此阶段操作需具有幂等性。
      • Confirm阶段:当所有服务的Try阶段都成功后,依次执行Confirm操作,正式提交业务操作,如解冻并扣减库存。若某个服务的Try阶段失败,则进入Cancel阶段。
      • Cancel阶段:对Try阶段预留的资源进行释放,如解冻库存。
  2. 具体实现方式
    • 基于可靠消息最终一致性方案
      • 本地消息表实现:在每个服务的数据库中创建本地消息表,包含消息ID、消息内容、业务关联ID、消息状态、创建时间、更新时间等字段。在业务逻辑代码中,通过数据库事务控制本地业务操作和消息表操作的一致性。例如,在Spring Boot项目中,使用@Transactional注解。
      • RocketMQ事务消息实现:引入RocketMQ客户端依赖,在发送事务消息时,使用TransactionMQProducer。示例代码如下:
TransactionMQProducer producer = new TransactionMQProducer("group_name");
producer.setNamesrvAddr("namesrv_addr");
producer.start();
Message msg = new Message("topic", "tags", "key", "message_body".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() {
    @Override
    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
        // 执行本地业务逻辑
        try {
            // 业务操作
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}, null);
producer.registerTransactionCheckListener(new TransactionCheckListener() {
    @Override
    public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
        // 回查本地业务执行状态
        // 根据业务逻辑返回相应状态
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});
producer.shutdown();
  • TCC模式实现
    • 定义TCC接口:每个服务定义Try、Confirm、Cancel接口。例如,库存服务定义InventoryServiceTCC接口,包含tryDeductStockconfirmDeductStockcancelDeductStock方法。
    • TCC框架集成:可以使用Hmily等TCC框架来管理分布式事务。在项目中引入Hmily依赖,配置相关参数。在服务接口方法上使用@Hmily注解,框架会自动管理Try、Confirm、Cancel阶段的调用和异常处理。例如:
@Service
public class InventoryServiceImpl implements InventoryServiceTCC {
    @Override
    @Hmily
    public void deductStock(String productId, int count) {
        // Try阶段逻辑
        tryDeductStock(productId, count);
        // Confirm阶段逻辑会由框架自动调用
        confirmDeductStock(productId, count);
    }

    @Override
    public void tryDeductStock(String productId, int count) {
        // 检查并冻结库存
    }

    @Override
    public void confirmDeductStock(String productId, int count) {
        // 正式扣减库存
    }

    @Override
    public void cancelDeductStock(String productId, int count) {
        // 解冻库存
    }
}

其他辅助机制

  1. 幂等性设计
    • 设计思路:确保相同消息多次处理的结果与单次处理结果一致。对于消息处理接口,通过判断消息ID是否已处理过,避免重复处理。
    • 具体实现方式:在数据库中创建幂等记录表,记录已处理的消息ID。每次处理消息前,先查询幂等记录表,若消息ID已存在则直接返回成功;否则处理消息并插入消息ID到幂等记录表。
  2. 监控与报警
    • 设计思路:实时监控消息处理状态、重试次数、分布式事务状态等指标,当出现异常情况(如消息处理失败次数过多、重试次数达到上限等)及时报警。
    • 具体实现方式:使用Prometheus + Grafana搭建监控系统,在服务中埋点采集相关指标数据。当指标超过阈值时,通过Alertmanager发送报警信息到钉钉、邮件等通知渠道。