MST

星途 面试题库

面试题:消息队列RocketMQ如何实现流控机制

请阐述RocketMQ在后端开发中,实现流控机制的具体方式,包括对生产者和消费者的流控策略分别是怎样的。
37.8万 热度难度
后端开发消息队列

知识考点

AI 面试

面试题答案

一键面试

生产者流控机制

  1. 发送限流
    • RocketMQ 生产者在发送消息时,会根据当前 Broker 的负载情况以及生产者自身的一些配置来进行流控。例如,在默认情况下,生产者会通过 DefaultMQProducer 类中的 sendLatencyFaultEnable 参数(默认值为 true)来动态感知 Broker 的可用性和延迟情况。
    • 如果某个 Broker 的延迟过高或者不可用,生产者会在一段时间内(由 latencyMaxnotAvailableDuration 等参数控制)不再向该 Broker 发送消息,从而避免因为向不可靠的 Broker 发送消息而导致消息发送失败,间接起到流控作用。
    • 生产者还可以通过设置 DefaultMQProducermaxMessageSize 参数来限制每次发送消息的最大大小。如果消息大小超过该值,生产者会抛出异常,从而控制发送的消息流量,避免过大消息对系统造成压力。
  2. 并发流控
    • 生产者在多线程环境下发送消息时,DefaultMQProducer 内部会通过 Semaphore 来控制并发发送的线程数。DefaultMQProducer 构造函数中有一个 sendMessageThreadPoolQueueCapacity 参数(默认值为 10000),表示发送消息线程池队列的容量。
    • 当有大量消息需要发送时,如果线程池队列已满,新的发送任务会被阻塞,直到队列中有空闲位置,这就限制了并发发送消息的数量,达到流控目的。

消费者流控机制

  1. 消息拉取限流
    • RocketMQ 消费者在拉取消息时,会根据 Broker 端的负载和自身配置进行流控。Broker 端会统计每个消费者组的消息堆积情况等信息。
    • 消费者端通过 PullThresholdForQueue 参数(默认值为 1000)来控制单个队列拉取消息的阈值。如果该队列上堆积的消息数量超过这个阈值,消费者会减少拉取频率或者暂停拉取,以防止消费端因为消息过多处理不过来而导致性能问题。
    • 消费者还可以通过 DefaultMQPushConsumerpullBatchSize 参数(默认值为 32)来控制每次从 Broker 拉取消息的数量。通过调整这个值,可以控制拉取消息的速率,避免一次性拉取过多消息导致消费端压力过大。
  2. 消费并发流控
    • 对于 DefaultMQPushConsumer,可以通过 setConsumeThreadMinsetConsumeThreadMax 参数来设置消费线程池的最小和最大线程数。默认情况下,ConsumeThreadMin 为 20,ConsumeThreadMax 为 64。
    • 通过调整这两个参数,可以控制并发消费消息的线程数量,从而控制消费速度。如果消费任务比较轻量级,可以适当增加线程数提高消费并发度;如果消费任务较重,为避免系统资源耗尽,可适当减少线程数。
    • 此外,消费者在消费消息时,如果遇到消费失败等情况,会根据 maxReconsumeTimes 参数(默认值为 16)进行重试。超过这个重试次数后,消息会被发送到死信队列。这个机制也间接起到了流控作用,避免因为某条消息一直消费失败而占用过多系统资源。