面试题答案
一键面试RocketMQ 延迟消息原理
- 核心组件:
- Broker:RocketMQ的核心组件之一,负责接收、存储和转发消息。在延迟消息处理中,Broker承担了消息的暂存以及按延迟时间调度发送的功能。
- CommitLog:是消息存储的物理文件,所有的消息都顺序写入到CommitLog中。延迟消息也先存储在此,等待调度。
- ConsumeQueue:可以理解为CommitLog的索引文件,它存储了消息在CommitLog中的物理偏移量等信息,方便消费者快速定位消息。延迟消息在ConsumeQueue中的处理有特殊逻辑,它不会立即被消费者消费,而是等待延迟时间到达。
- 消息流转过程:
- 生产者发送延迟消息:生产者在发送消息时,设置消息的延迟级别。消息发送到Broker后,先写入CommitLog。
- Broker处理延迟消息:Broker识别到是延迟消息后,不会立即将其投递到ConsumeQueue供消费者消费。而是根据延迟级别,将消息放入对应的延迟队列(RocketMQ内部使用了一种基于时间轮的结构来管理延迟队列)。
- 延迟队列调度:基于时间轮机制,当延迟时间到达时,消息会被从延迟队列中取出,重新投递到ConsumeQueue中。此时消费者就可以正常消费该消息了。
- 设置不同级别的延迟时间:
- 预定义延迟级别:RocketMQ默认定义了18个延迟级别,分别对应不同的延迟时间。在Broker配置文件(broker.conf)中,可以看到默认的延迟级别设置,例如:
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
。 - 生产者设置:生产者在发送消息时,通过设置
Message
对象的setDelayTimeLevel(int level)
方法来指定延迟级别。这里的level
就是上述预定义延迟级别中的索引值(从1开始)。例如,如果要设置延迟5分钟,根据上述默认延迟级别设置,5分钟对应的是第10个级别,那么生产者代码中设置message.setDelayTimeLevel(10)
即可。如果需要自定义延迟级别,可以修改Broker配置文件中的messageDelayLevel
参数,重新指定延迟时间序列。
- 预定义延迟级别:RocketMQ默认定义了18个延迟级别,分别对应不同的延迟时间。在Broker配置文件(broker.conf)中,可以看到默认的延迟级别设置,例如: