MST

星途 面试题库

面试题:消息队列之RocketMQ如何在生产者端实现消息幂等性

在RocketMQ中,生产者发送消息时,如何通过相关机制实现消息幂等性?请阐述具体的实现方法和涉及到的关键概念。
26.9万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试
  1. 关键概念
    • 消息唯一标识:为每条消息生成一个唯一的标识,在RocketMQ中,Producer在发送消息时可以为消息设置 keys 字段,这个 keys 通常可作为消息的唯一标识。例如订单号等具有业务唯一性的字段。
    • 去重表:在业务系统中维护一张去重表,表中记录已经处理过的消息的唯一标识。当新消息到达时,先查询去重表,如果该消息标识已存在,表示该消息已处理过,不再重复处理。
  2. 具体实现方法
    • 使用 TransactionMQProducer
      • 发送半消息:通过 TransactionMQProducer 发送半消息(Half Message),半消息是指暂不能被Consumer消费的消息。例如:
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TopicTest",
        "TagA",
        "OrderID001",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
 - **执行本地事务**:发送半消息成功后,RocketMQ会回调 `executeLocalTransaction` 方法来执行本地事务。在这个方法中,根据业务逻辑执行本地事务,并根据事务执行结果返回相应的状态(`LocalTransactionState.COMMIT_MESSAGE`、`LocalTransactionState.ROLLBACK_MESSAGE`、`LocalTransactionState.UNKNOW`)。例如:
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地业务逻辑,如更新数据库等
        // 假设业务执行成功
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 检查本地事务状态,返回相应的状态
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});
 - **事务状态回查**:如果RocketMQ没有收到本地事务执行状态(返回 `LocalTransactionState.UNKNOW`),会通过 `checkLocalTransaction` 方法回调来检查本地事务状态,确保消息最终的一致性和幂等性。
  • 基于业务端实现幂等
    • 生成唯一消息标识:在生产者端为每个消息生成唯一标识,如使用UUID或业务中的唯一键(如订单号)。
    • 消息发送与去重:发送消息前,先查询去重表。如果去重表中不存在该消息标识,则发送消息,并在发送成功后将该标识插入去重表;如果去重表中已存在该标识,则不再发送消息,认为该消息已被处理过。例如在数据库的订单表中,可增加一个字段用于记录消息处理状态,每次处理订单消息时,先查询该订单是否已处理,若已处理则跳过。