面试题答案
一键面试生产者流控机制
- 发送限流:
- RocketMQ 生产者在发送消息时,会根据当前 Broker 的负载情况以及生产者自身的一些配置来进行流控。例如,在默认情况下,生产者会通过
DefaultMQProducer
类中的sendLatencyFaultEnable
参数(默认值为true
)来动态感知 Broker 的可用性和延迟情况。 - 如果某个 Broker 的延迟过高或者不可用,生产者会在一段时间内(由
latencyMax
和notAvailableDuration
等参数控制)不再向该 Broker 发送消息,从而避免因为向不可靠的 Broker 发送消息而导致消息发送失败,间接起到流控作用。 - 生产者还可以通过设置
DefaultMQProducer
的maxMessageSize
参数来限制每次发送消息的最大大小。如果消息大小超过该值,生产者会抛出异常,从而控制发送的消息流量,避免过大消息对系统造成压力。
- RocketMQ 生产者在发送消息时,会根据当前 Broker 的负载情况以及生产者自身的一些配置来进行流控。例如,在默认情况下,生产者会通过
- 并发流控:
- 生产者在多线程环境下发送消息时,
DefaultMQProducer
内部会通过Semaphore
来控制并发发送的线程数。DefaultMQProducer
构造函数中有一个sendMessageThreadPoolQueueCapacity
参数(默认值为 10000),表示发送消息线程池队列的容量。 - 当有大量消息需要发送时,如果线程池队列已满,新的发送任务会被阻塞,直到队列中有空闲位置,这就限制了并发发送消息的数量,达到流控目的。
- 生产者在多线程环境下发送消息时,
消费者流控机制
- 消息拉取限流:
- RocketMQ 消费者在拉取消息时,会根据 Broker 端的负载和自身配置进行流控。Broker 端会统计每个消费者组的消息堆积情况等信息。
- 消费者端通过
PullThresholdForQueue
参数(默认值为 1000)来控制单个队列拉取消息的阈值。如果该队列上堆积的消息数量超过这个阈值,消费者会减少拉取频率或者暂停拉取,以防止消费端因为消息过多处理不过来而导致性能问题。 - 消费者还可以通过
DefaultMQPushConsumer
的pullBatchSize
参数(默认值为 32)来控制每次从 Broker 拉取消息的数量。通过调整这个值,可以控制拉取消息的速率,避免一次性拉取过多消息导致消费端压力过大。
- 消费并发流控:
- 对于
DefaultMQPushConsumer
,可以通过setConsumeThreadMin
和setConsumeThreadMax
参数来设置消费线程池的最小和最大线程数。默认情况下,ConsumeThreadMin
为 20,ConsumeThreadMax
为 64。 - 通过调整这两个参数,可以控制并发消费消息的线程数量,从而控制消费速度。如果消费任务比较轻量级,可以适当增加线程数提高消费并发度;如果消费任务较重,为避免系统资源耗尽,可适当减少线程数。
- 此外,消费者在消费消息时,如果遇到消费失败等情况,会根据
maxReconsumeTimes
参数(默认值为 16)进行重试。超过这个重试次数后,消息会被发送到死信队列。这个机制也间接起到了流控作用,避免因为某条消息一直消费失败而占用过多系统资源。
- 对于