面试题答案
一键面试保证消息不丢失
- 生产者端
- 开启事务:在发送消息前开启事务,发送成功后提交事务,失败则回滚。例如在Spring Boot中,配置
DataSourceTransactionManager
,在发送消息的业务方法上添加@Transactional
注解。
@Service public class MessageProducer { @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private PlatformTransactionManager transactionManager; public void sendMessage(String topic, String message) { TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager); transactionTemplate.execute(status -> { try { rocketMQTemplate.send(topic, MessageBuilder.withPayload(message).build()); return TransactionSynchronizationManager.STATUS_COMMITTED; } catch (Exception e) { status.setRollbackOnly(); return TransactionSynchronizationManager.STATUS_ROLLED_BACK; } }); } }
- 同步发送并确认:使用同步发送方式,等待MQ返回发送结果确认。例如
RocketMQTemplate
的syncSend
方法。
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build()); if (sendResult.getSendStatus() != SendStatus.SEND_OK) { // 处理发送失败逻辑,如重试 }
- 使用消息轨迹:RocketMQ提供消息轨迹功能,可记录消息从生产者到消费者的全链路过程,方便排查消息丢失问题。在生产者配置中开启消息轨迹:
rocketmq: producer: group: my-producer-group enable-message-trace: true customize-trace-topic: my-trace-topic
- 开启事务:在发送消息前开启事务,发送成功后提交事务,失败则回滚。例如在Spring Boot中,配置
- MQ Broker端
- 刷盘策略:采用同步刷盘策略,确保消息写入磁盘后才返回成功响应给生产者。在RocketMQ的
broker.conf
配置文件中设置:
flushDiskType = SYNC_FLUSH
- 高可用配置:搭建主从集群,采用2M - 2S模式(2个Master和2个Slave),Master将消息同步给Slave,提高消息存储的可靠性。
- 刷盘策略:采用同步刷盘策略,确保消息写入磁盘后才返回成功响应给生产者。在RocketMQ的
- 消费者端
- 手动确认消费:采用手动ACK模式,消费者处理完消息后,向MQ发送确认消息。例如在RocketMQ的消费者配置中:
@RocketMQMessageListener(topic = "my - topic", consumerGroup = "my - consumer - group",ackMode = AckMode.MANUAL) @Component public class MyConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { try { // 处理消息逻辑 System.out.println("Received message: " + message); // 手动确认消费 RocketMQPushConsumerLocalContext context = RocketMQPushConsumerLocalContext.get(); context.getAckCallback().ack(); } catch (Exception e) { // 处理异常,可选择重试或记录日志等 RocketMQPushConsumerLocalContext context = RocketMQPushConsumerLocalContext.get(); context.getAckCallback().nack(); } } }
保证消息不重复消费
- 消费者幂等性处理
- 数据库唯一约束:如果消息处理逻辑涉及数据库操作,对关键业务字段添加唯一约束。例如,订单表中的订单编号设为唯一,当重复消息处理同一订单时,数据库会抛出唯一约束冲突异常,消费者捕获异常并忽略,认为消息已处理成功。
- 状态机控制:为消息处理对象设计状态机。以订单为例,订单状态有“未支付”“已支付”“已发货”等,当处理支付消息时,只有订单状态为“未支付”时才进行支付操作并更新状态为“已支付”,重复收到支付消息时,若订单状态已是“已支付”,则直接忽略。
- 消息唯一标识:生产者为每条消息生成唯一ID,消费者收到消息后,先根据唯一ID查询本地记录(如Redis)判断是否已处理过该消息。例如:
@Autowired private StringRedisTemplate stringRedisTemplate; @Override public void onMessage(String message) { String messageId = getMessageIdFromMessage(message); if (stringRedisTemplate.hasKey(messageId)) { // 已处理过,忽略 return; } try { // 处理消息逻辑 stringRedisTemplate.opsForValue().set(messageId, "processed", 3600, TimeUnit.SECONDS); } catch (Exception e) { // 处理异常 } }
- MQ层面保证
- 使用事务消息:生产者发送事务消息,MQ保证消息要么成功写入且被消费者处理,要么不写入,从而避免重复消费。在RocketMQ中,生产者发送事务消息示例:
rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), null);
- 合理配置消息重试机制:避免因重试次数过多导致消息重复消费。在消费者配置中合理设置最大重试次数,如:
rocketmq: consumer: group: my - consumer - group max - retry - times: 3