面试题答案
一键面试生产者端
- 实现机制:
- 同步发送确认:使用同步发送方式,发送消息后等待MQ返回确认响应,确保消息已成功发送到MQ服务器。例如:
SendResult sendResult = rocketMQTemplate.syncSend(topic, message); if (sendResult.getSendStatus() == SendStatus.SEND_OK) { // 消息发送成功处理逻辑 } else { // 消息发送失败处理逻辑,如重试等 }
- 事务消息:对于一些需要保证数据一致性的场景,使用事务消息。生产者先发送半消息到MQ,MQ返回成功后,生产者执行本地事务,再根据本地事务执行结果向MQ提交或回滚事务消息。例如:
并实现rocketMQTemplate.sendMessageInTransaction(topic, message, null);
RocketMQLocalTransactionListener
接口来处理本地事务和事务状态回查。 - 关键配置:
- 生产者组:通过
spring.rocketmq.producer.group
配置生产者组,同一组内的生产者发送消息的语义相同,方便进行故障容错和消息重试等管理。 - 发送超时时间:通过
spring.rocketmq.producer.send - timeout
配置发送消息的超时时间,单位为毫秒,合理设置该值可避免长时间等待响应。 - 最大重试次数:通过
spring.rocketmq.producer.retry - times - when - send - failed
配置消息发送失败时的最大重试次数,确保在网络等临时性故障下消息有机会成功发送。
- 生产者组:通过
消费者端
- 实现机制:
- 消费确认:消费者成功处理消息后,向MQ发送消费确认。默认情况下,Spring Cloud RocketMQ使用自动确认机制。也可以设置为手动确认,例如:
@RocketMQMessageListener(topic = "yourTopic", consumerGroup = "yourGroup", messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.ORDERLY, ackMode = AckMode.MANUAL) public class YourConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { try { // 处理消息逻辑 rocketMQTemplate.sendMessageStatus("yourTopic", message.getMsgId(), SendStatus.SEND_OK); } catch (Exception e) { // 处理失败,可进行重试等操作 rocketMQTemplate.sendMessageStatus("yourTopic", message.getMsgId(), SendStatus.SEND_FAIL); } } }
- 重试机制:当消费消息失败时,MQ会根据配置进行重试。对于顺序消息,会在同一个队列中按顺序重试;对于并发消息,会投递到重试队列进行重试。
- 关键配置:
- 消费者组:通过
spring.rocketmq.consumer.group
配置消费者组,同一组内的消费者共同消费主题下的消息。 - 消费模式:通过
spring.rocketmq.consumer.message - model
配置消费模式,有CLUSTERING
(集群消费)和BROADCASTING
(广播消费)两种模式。集群消费模式下,组内消费者分摊消费消息;广播消费模式下,组内每个消费者都消费全量消息。 - 最大重试次数:通过
spring.rocketmq.consumer.max - retry - times
配置消费失败时的最大重试次数,超过该次数后,消息会被投递到死信队列(如果配置了死信队列相关参数)。 - 消费线程数:通过
spring.rocketmq.consumer.consume - thread - min
和spring.rocketmq.consumer.consume - thread - max
配置最小和最大消费线程数,合理调整线程数可优化消费性能。
- 消费者组:通过