MST

星途 面试题库

面试题:消息队列RocketMQ与Spring Boot集成中的消息发送可靠性保证

在RocketMQ与Spring Boot集成项目里,如何确保消息发送的可靠性?请阐述具体的实现机制和相关配置。
32.1万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

实现机制

  1. 同步发送并确认
    • 使用 DefaultMQProducersend 方法进行同步消息发送。该方法会等待 Broker 返回发送结果,只有当返回状态为 SEND_OK 时,才认为消息发送成功。例如:
    SendResult sendResult = producer.send(message);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
        // 消息发送成功处理逻辑
    } else {
        // 消息发送失败处理逻辑,如重试等
    }
    
  2. 异步发送并确认
    • 通过 DefaultMQProducersend 方法的异步重载形式实现。需要传入一个 SendCallback 回调函数,在 Broker 返回结果时,会调用该回调函数。示例代码如下:
    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            // 消息发送成功处理逻辑
        }
    
        @Override
        public void onException(Throwable e) {
            // 消息发送失败处理逻辑,如重试等
        }
    });
    
  3. 消息重试机制
    • RocketMQ 支持消息发送失败后的重试。默认情况下,同步发送失败会重试2次,异步发送失败不会自动重试(可在回调函数中手动实现重试)。可以通过设置 DefaultMQProducerretryTimesWhenSendFailed 属性来调整同步发送的重试次数。例如:
    producer.setRetryTimesWhenSendFailed(3);
    
  4. 事务消息
    • 适用于需要保证消息发送与本地业务操作一致性的场景。例如在电商下单场景中,下单操作与消息发送需保证原子性。
    • 实现步骤:
      • 定义事务生产者,继承 TransactionMQProducer 类。
      • 实现 LocalTransactionExecuter 接口,在 executeLocalTransaction 方法中执行本地业务逻辑,并返回 LocalTransactionState 状态(COMMIT_MESSAGEROLLBACK_MESSAGEUNKNOW)。
      • checkLocalTransaction 方法中,Broker 回调该方法来检查本地事务状态。

相关配置

  1. Maven 依赖: 在 pom.xml 文件中添加 RocketMQ 与 Spring Boot 集成的依赖:
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-rocketmq</artifactId>
    </dependency>
    
  2. 配置文件: 在 application.propertiesapplication.yml 文件中配置 RocketMQ 的相关参数,例如:
    rocketmq.name-server=127.0.0.1:9876
    rocketmq.producer.group=test-group
    rocketmq.producer.send-message-timeout=3000
    rocketmq.producer.retry-times-when-send-failed=3
    
    若使用 application.yml,配置如下:
    rocketmq:
      name-server: 127.0.0.1:9876
      producer:
        group: test - group
        send - message - timeout: 3000
        retry - times - when - send - failed: 3
    
  3. 生产者配置类: 创建一个配置类来配置 DefaultMQProducerTransactionMQProducer。示例代码如下:
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RocketMQConfig {
    
        @Bean
        public DefaultMQProducer defaultMQProducer() {
            DefaultMQProducer producer = new DefaultMQProducer("test-group");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.setSendMsgTimeout(3000);
            producer.setRetryTimesWhenSendFailed(3);
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            return producer;
        }
    }
    
    若使用事务消息,配置类如下:
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class TransactionMQProducerConfig {
    
        @Bean
        public TransactionMQProducer transactionMQProducer() {
            TransactionMQProducer producer = new TransactionMQProducer("transaction - group");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.setTransactionListener(new TransactionListenerImpl());
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            return producer;
        }
    
        @Bean
        public RocketMQTemplate rocketMQTemplate() {
            return new RocketMQTemplate();
        }
    }
    
    其中 TransactionListenerImpl 是实现 TransactionListener 接口的类,包含 executeLocalTransactioncheckLocalTransaction 方法的具体实现。