面试题答案
一键面试实现机制
- 同步发送并确认:
- 使用
DefaultMQProducer
的send
方法进行同步消息发送。该方法会等待 Broker 返回发送结果,只有当返回状态为SEND_OK
时,才认为消息发送成功。例如:
SendResult sendResult = producer.send(message); if (sendResult.getSendStatus() == SendStatus.SEND_OK) { // 消息发送成功处理逻辑 } else { // 消息发送失败处理逻辑,如重试等 }
- 使用
- 异步发送并确认:
- 通过
DefaultMQProducer
的send
方法的异步重载形式实现。需要传入一个SendCallback
回调函数,在 Broker 返回结果时,会调用该回调函数。示例代码如下:
producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 消息发送成功处理逻辑 } @Override public void onException(Throwable e) { // 消息发送失败处理逻辑,如重试等 } });
- 通过
- 消息重试机制:
- RocketMQ 支持消息发送失败后的重试。默认情况下,同步发送失败会重试2次,异步发送失败不会自动重试(可在回调函数中手动实现重试)。可以通过设置
DefaultMQProducer
的retryTimesWhenSendFailed
属性来调整同步发送的重试次数。例如:
producer.setRetryTimesWhenSendFailed(3);
- RocketMQ 支持消息发送失败后的重试。默认情况下,同步发送失败会重试2次,异步发送失败不会自动重试(可在回调函数中手动实现重试)。可以通过设置
- 事务消息:
- 适用于需要保证消息发送与本地业务操作一致性的场景。例如在电商下单场景中,下单操作与消息发送需保证原子性。
- 实现步骤:
- 定义事务生产者,继承
TransactionMQProducer
类。 - 实现
LocalTransactionExecuter
接口,在executeLocalTransaction
方法中执行本地业务逻辑,并返回LocalTransactionState
状态(COMMIT_MESSAGE
、ROLLBACK_MESSAGE
、UNKNOW
)。 - 在
checkLocalTransaction
方法中,Broker 回调该方法来检查本地事务状态。
- 定义事务生产者,继承
相关配置
- Maven 依赖:
在
pom.xml
文件中添加 RocketMQ 与 Spring Boot 集成的依赖:<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-rocketmq</artifactId> </dependency>
- 配置文件:
在
application.properties
或application.yml
文件中配置 RocketMQ 的相关参数,例如:
若使用rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=test-group rocketmq.producer.send-message-timeout=3000 rocketmq.producer.retry-times-when-send-failed=3
application.yml
,配置如下:rocketmq: name-server: 127.0.0.1:9876 producer: group: test - group send - message - timeout: 3000 retry - times - when - send - failed: 3
- 生产者配置类:
创建一个配置类来配置
DefaultMQProducer
或TransactionMQProducer
。示例代码如下:
若使用事务消息,配置类如下:import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RocketMQConfig { @Bean public DefaultMQProducer defaultMQProducer() { DefaultMQProducer producer = new DefaultMQProducer("test-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setSendMsgTimeout(3000); producer.setRetryTimesWhenSendFailed(3); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } return producer; } }
其中import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TransactionMQProducerConfig { @Bean public TransactionMQProducer transactionMQProducer() { TransactionMQProducer producer = new TransactionMQProducer("transaction - group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setTransactionListener(new TransactionListenerImpl()); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } return producer; } @Bean public RocketMQTemplate rocketMQTemplate() { return new RocketMQTemplate(); } }
TransactionListenerImpl
是实现TransactionListener
接口的类,包含executeLocalTransaction
和checkLocalTransaction
方法的具体实现。