消息补偿机制
- 设计思路:
- 记录消息处理状态:每个服务在处理消息时,记录消息处理结果到本地持久化存储(如数据库),状态可分为“处理中”“处理成功”“处理失败”等。
- 定时任务检查:通过定时任务,定期扫描处理状态为“处理中”或“处理失败”的消息记录。对于“处理中”的消息,判断是否超时,若超时则认为处理失败;对于“处理失败”的消息,准备进行补偿。
- 重试策略:根据不同的失败原因设置不同的重试策略,如固定间隔重试、指数退避重试等。对于因网络波动等临时性问题导致的失败,可采用多次重试的方式;对于业务逻辑错误导致的失败,可人工介入修正后再重试。
- 具体实现方式:
- 数据库表设计:创建一张消息处理记录表,包含消息ID、消息内容、所属业务标识、处理状态、重试次数、最后处理时间等字段。
- 定时任务:使用Quartz等定时任务框架,定时执行消息检查逻辑。在任务中查询处理状态为“处理中”且超过一定时间(如5分钟)的消息,将其状态更新为“处理失败”;查询处理状态为“处理失败”且重试次数未超过上限(如3次)的消息,重新发送到RocketMQ队列进行处理。
- 重试逻辑:在消息处理服务中,捕获处理消息过程中的异常,根据异常类型判断是否可重试。若可重试,增加重试次数,按照重试策略进行延迟后重新发送消息。例如,使用指数退避重试时,每次重试间隔时间翻倍。
分布式事务处理
- 设计思路:
- 基于可靠消息最终一致性方案:
- 本地消息表:在每个Dubbo服务本地数据库创建消息表,服务在执行本地业务逻辑前,先向本地消息表插入一条待发送消息记录,状态为“待发送”。业务逻辑执行成功后,将消息状态更新为“已发送”,同时发送消息到RocketMQ。若业务逻辑执行失败,删除本地消息表记录。
- RocketMQ事务消息:利用RocketMQ的事务消息机制,发送半消息(Prepared消息),RocketMQ返回消息ID。服务执行本地业务逻辑,根据业务结果向RocketMQ发送Commit或Rollback指令。若服务崩溃等原因导致未发送Commit或Rollback指令,RocketMQ会回调服务的事务状态回查接口,服务根据本地业务执行结果返回相应状态。
- TCC(Try - Confirm - Cancel)模式:
- Try阶段:每个服务对资源进行检测和预留操作,如扣减库存时,先检查库存是否足够并冻结库存,此阶段操作需具有幂等性。
- Confirm阶段:当所有服务的Try阶段都成功后,依次执行Confirm操作,正式提交业务操作,如解冻并扣减库存。若某个服务的Try阶段失败,则进入Cancel阶段。
- Cancel阶段:对Try阶段预留的资源进行释放,如解冻库存。
- 具体实现方式:
- 基于可靠消息最终一致性方案:
- 本地消息表实现:在每个服务的数据库中创建本地消息表,包含消息ID、消息内容、业务关联ID、消息状态、创建时间、更新时间等字段。在业务逻辑代码中,通过数据库事务控制本地业务操作和消息表操作的一致性。例如,在Spring Boot项目中,使用
@Transactional
注解。
- RocketMQ事务消息实现:引入RocketMQ客户端依赖,在发送事务消息时,使用
TransactionMQProducer
。示例代码如下:
TransactionMQProducer producer = new TransactionMQProducer("group_name");
producer.setNamesrvAddr("namesrv_addr");
producer.start();
Message msg = new Message("topic", "tags", "key", "message_body".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
// 执行本地业务逻辑
try {
// 业务操作
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}, null);
producer.registerTransactionCheckListener(new TransactionCheckListener() {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
// 回查本地业务执行状态
// 根据业务逻辑返回相应状态
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.shutdown();
- TCC模式实现:
- 定义TCC接口:每个服务定义Try、Confirm、Cancel接口。例如,库存服务定义
InventoryServiceTCC
接口,包含tryDeductStock
、confirmDeductStock
、cancelDeductStock
方法。
- TCC框架集成:可以使用Hmily等TCC框架来管理分布式事务。在项目中引入Hmily依赖,配置相关参数。在服务接口方法上使用
@Hmily
注解,框架会自动管理Try、Confirm、Cancel阶段的调用和异常处理。例如:
@Service
public class InventoryServiceImpl implements InventoryServiceTCC {
@Override
@Hmily
public void deductStock(String productId, int count) {
// Try阶段逻辑
tryDeductStock(productId, count);
// Confirm阶段逻辑会由框架自动调用
confirmDeductStock(productId, count);
}
@Override
public void tryDeductStock(String productId, int count) {
// 检查并冻结库存
}
@Override
public void confirmDeductStock(String productId, int count) {
// 正式扣减库存
}
@Override
public void cancelDeductStock(String productId, int count) {
// 解冻库存
}
}
其他辅助机制
- 幂等性设计:
- 设计思路:确保相同消息多次处理的结果与单次处理结果一致。对于消息处理接口,通过判断消息ID是否已处理过,避免重复处理。
- 具体实现方式:在数据库中创建幂等记录表,记录已处理的消息ID。每次处理消息前,先查询幂等记录表,若消息ID已存在则直接返回成功;否则处理消息并插入消息ID到幂等记录表。
- 监控与报警:
- 设计思路:实时监控消息处理状态、重试次数、分布式事务状态等指标,当出现异常情况(如消息处理失败次数过多、重试次数达到上限等)及时报警。
- 具体实现方式:使用Prometheus + Grafana搭建监控系统,在服务中埋点采集相关指标数据。当指标超过阈值时,通过Alertmanager发送报警信息到钉钉、邮件等通知渠道。