面试题答案
一键面试RocketMQ生产者实现事务消息的原理
- 半消息发送:生产者先向RocketMQ发送一条半消息(Half Message),此时消息对消费者不可见。MQ会响应消息发送的状态,判断半消息是否发送成功。
- 本地事务执行:如果半消息发送成功,生产者执行本地事务。本地事务可以是数据库操作、文件操作等业务逻辑。
- 事务状态回查:MQ会定时回查生产者,询问本地事务的执行状态。这是因为可能存在生产者发送事务状态确认消息失败的情况。生产者根据回查结果,再次告知MQ本地事务的最终状态。
- 消息投递:MQ根据生产者返回的本地事务状态来决定是否将半消息标记为可投递状态。如果本地事务执行成功,MQ将半消息标记为可投递,消费者可以消费该消息;如果本地事务执行失败,MQ将丢弃该半消息,消费者不会收到。
具体步骤
- 发送半消息:
TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null);
- 执行本地事务:
producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行本地事务,例如数据库操作 try { // 模拟数据库插入操作 // 这里应该是实际的业务逻辑代码 return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 这里进行事务状态回查逻辑 // 根据消息ID等信息查询本地事务状态 return LocalTransactionState.COMMIT_MESSAGE; } });
- 事务状态回查及处理:
在
checkLocalTransaction
方法中,根据消息的唯一标识(如msg.getMsgId())查询本地事务状态,并返回对应的LocalTransactionState
。MQ根据返回状态决定消息是否投递。
实际后端开发中的应用场景
- 电商订单系统:用户下单后,需要同时扣减库存和生成订单。使用事务消息,先发送半消息,然后在本地事务中扣减库存并生成订单。如果库存扣减成功则提交事务消息,订单生成成功,消费者可以处理订单;如果库存扣减失败则回滚事务消息,订单不会生成,保证了数据一致性。
- 银行转账:从一个账户扣款,同时向另一个账户加款。发送半消息后,在本地事务中执行账户扣款和加款操作。若扣款成功加款也成功则提交事务消息,若有一方失败则回滚事务消息,确保转账过程中资金的一致性。
事务消息保障数据一致性的方式
- 原子性:将本地事务和消息发送作为一个整体,要么都成功,要么都失败。通过半消息机制,在本地事务未确认成功前,消息对消费者不可见,避免了部分数据操作成功,而消息未发送成功导致的数据不一致。
- 事务状态回查:即使本地事务执行结果在发送确认状态给MQ时出现网络问题等异常,MQ通过回查机制,能够获取本地事务的最终状态,从而保证消息的投递与本地事务状态一致,确保数据的一致性。