MST

星途 面试题库

面试题:延时消息在高并发场景下的一致性问题

在高并发的后端系统中使用消息队列的延时消息机制时,如何保证消息处理的一致性?如果出现消息重复消费或丢失的情况,你会如何排查和解决?
48.4万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

保证消息处理一致性

  1. 幂等性设计
    • 定义:对同一操作的多次请求应该产生相同的结果。
    • 实现方式
      • 数据库层面:使用唯一索引。例如,在订单表中对订单号设置唯一索引,当处理创建订单消息时,如果重复消费,由于唯一索引的限制,重复插入相同订单号的记录会失败,应用层可以捕获异常并返回相同的成功结果。
      • 业务逻辑层面:在服务端维护一个已处理消息的记录(可以是内存中的缓存,如 Redis 的 Set 数据结构)。每次处理消息前,先检查该消息是否已处理。例如,对于支付回调消息,将回调消息的唯一标识(如支付单号)存入 Redis Set 中,处理前检查 Set 中是否已存在该标识,若存在则直接返回成功,不再重复处理。
  2. 事务机制
    • 本地事务:如果消息处理涉及到多个本地数据库操作,使用本地事务来保证这些操作要么全部成功,要么全部失败。例如,在处理用户注册消息时,同时向用户表和用户详情表插入数据,使用数据库事务来确保这两个操作的原子性。
    • 分布式事务:当消息处理涉及多个不同服务或数据库时,可采用分布式事务解决方案。如使用两阶段提交(2PC)协议,但它存在单点故障和同步阻塞等问题;也可使用可靠消息最终一致性方案,即消息发送方先将消息发送到本地事务消息表,本地事务提交成功后再将消息发送到消息队列,接收方消费消息并处理业务后,向发送方发送确认消息,发送方根据确认消息清理事务消息表。
  3. 消息顺序性处理
    • 场景:有些业务场景对消息处理顺序有严格要求,如电商中的订单创建、支付、发货等流程消息。
    • 实现方式
      • 分区有序:在消息队列中,通过将相关消息发送到同一个分区,消费者从该分区按顺序消费消息。例如,将同一用户的所有订单相关消息发送到同一个分区,这样就保证了该用户订单相关业务处理的顺序性。
      • 全局有序:较少使用,需要消息队列本身支持全局顺序消息,如 RocketMQ 的顺序消息模式。这种方式会严重影响系统的并发性能,因为所有消息都要按顺序处理。

排查和解决消息重复消费问题

  1. 确认消息重复来源
    • 检查消息队列配置:查看是否开启了消息重试机制且重试策略不合理,导致消息被重复投递。例如,在 RabbitMQ 中,如果设置了不合理的死信队列策略,消息可能会被多次重新入队。
    • 检查消费者端:确认消费者在处理消息时是否由于异常处理不当导致没有正确确认消息。比如在使用 Kafka 时,如果消费者在处理消息后没有及时提交偏移量(offset),当消费者重启或发生故障恢复时,会重新消费之前处理过的消息。
  2. 解决重复消费问题
    • 使用幂等性处理:如上述幂等性设计部分所述,通过幂等性设计来确保重复消费不影响业务结果。
    • 优化消息确认机制
      • RabbitMQ:使用手动确认模式(manual acknowledgment),消费者在成功处理消息后,再向 RabbitMQ 发送确认消息(ack)。这样可以避免因自动确认(auto - acknowledgment)导致的消息还未处理就被确认的问题。
      • Kafka:根据业务需求合理选择偏移量提交方式。如果对消息处理准确性要求较高,可以采用同步提交偏移量的方式,确保消息处理成功后再提交偏移量;如果追求高并发性能,可以采用异步提交偏移量,但要注意处理异步提交失败的情况。

排查和解决消息丢失问题

  1. 确认消息丢失位置
    • 生产端
      • 检查网络问题:网络波动可能导致消息发送失败但未被正确捕获。可以通过在生产端增加日志记录,记录每次消息发送的结果和相关网络状态信息。例如,在发送消息前记录本地网络状态,发送后记录消息队列返回的响应状态码。
      • 检查消息队列返回状态:查看消息队列返回的发送结果,确认是否由于消息格式错误、队列已满等原因导致消息发送失败。比如在 Kafka 中,如果生产者发送消息时返回的是错误的响应(如分区找不到等错误),就需要根据错误信息进行排查和处理。
    • 消费端
      • 检查消费逻辑:确认消费逻辑中是否存在异常导致消息未被正确处理就被丢弃。可以在消费逻辑的关键节点增加日志记录,记录消息处理的过程和结果。例如,在处理订单消息时,记录订单状态更新前后的信息以及处理过程中的异常信息。
      • 检查消息确认机制:同重复消费问题排查中消费端确认机制检查部分,确认是否因确认机制问题导致消息丢失。例如,在 RabbitMQ 中,如果消费者在处理消息时发生异常,但没有正确处理异常并拒绝消息(nack),可能会导致消息丢失。
  2. 解决消息丢失问题
    • 生产端
      • 消息持久化:在消息队列中开启消息持久化功能。例如,在 RabbitMQ 中,将消息设置为持久化(deliveryMode = 2),这样即使 RabbitMQ 服务器重启,消息也不会丢失。
      • 发送确认机制:使用消息队列提供的发送确认机制。如 RabbitMQ 的 confirm 机制,生产者发送消息后,通过监听 confirm 回调函数来确认消息是否成功到达 RabbitMQ 服务器。如果未成功,进行重试发送。
    • 消费端
      • 增加重试机制:在消费逻辑中增加重试机制。当消费消息发生异常时,根据异常类型进行一定次数的重试。例如,可以使用 Guava 的 Retryer 框架来实现重试逻辑,设置重试次数、重试间隔等参数。
      • 消息补偿机制:建立消息补偿机制,对于丢失的消息,可以通过人工介入或系统定时任务等方式进行补偿处理。比如在电商订单处理中,如果发现某个订单消息丢失导致订单状态未更新,可以通过人工查询订单原始数据并手动更新订单状态,或者系统定时扫描未完成订单并进行补偿处理。