MST

星途 面试题库

面试题:Python Redis订阅发布中的消息持久化与可靠性

在Python的Redis订阅发布场景下,如何确保消息不会丢失,尤其是在网络波动或者订阅者暂时离线的情况下。请阐述相关的机制原理,并给出部分关键代码实现思路。
15.7万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

机制原理

  1. 持久化
    • Redis支持RDB和AOF两种持久化方式。RDB会定期将内存中的数据快照到磁盘,AOF则是将写命令追加到日志文件中。在网络波动或订阅者离线时,即使Redis重启,也能通过这些持久化数据恢复发布的消息。
  2. 发布/订阅模式改进
    • 使用Redis的Stream数据结构,它提供了消息队列的功能。Stream有消费者组的概念,消息发送到Stream后,只有当消费者组中的所有消费者都确认消费了该消息,消息才会被标记为已处理。如果有消费者离线,重新上线后可以继续处理未确认的消息。
  3. 可靠的发布
    • 在发布消息时,可以使用事务(MULTIEXEC),确保消息发布操作的原子性。如果在事务执行过程中网络波动,Redis会自动回滚未完成的事务,避免部分发布成功部分失败的情况。
  4. 心跳检测与重连
    • 订阅者可以定期向Redis发送心跳消息,检测网络连接状态。当检测到网络断开时,立即尝试重连,重连成功后继续接收消息。

关键代码实现思路

  1. 使用Stream数据结构(Python示例)
import redis

# 连接Redis
r = redis.Redis(host='localhost', port=6379, db = 0)

# 生产者发送消息到Stream
def send_message_to_stream(message):
    r.xadd('message_stream', {'message': message})

# 消费者组相关操作
# 创建消费者组
r.xgroup_create('message_stream', 'consumer_group', mkstream = True)

# 消费者从Stream消费消息
def consume_message_from_stream():
    while True:
        result = r.xreadgroup('consumer_group', 'consumer_1', {'message_stream': '>'}, None, 1)
        if result:
            _, messages = result[0]
            for message_id, message in messages:
                print(f"Received message: {message}")
                # 处理完消息后确认
                r.xack('message_stream', 'consumer_group', message_id)


  1. 心跳检测与重连
import time
import redis

def connect_redis():
    try:
        r = redis.Redis(host='localhost', port=6379, db = 0)
        r.ping()
        return r
    except redis.ConnectionError:
        return None


def monitor_connection():
    r = connect_redis()
    while True:
        if r is None:
            print("Trying to reconnect...")
            r = connect_redis()
        else:
            try:
                r.ping()
                time.sleep(5)  # 每5秒发送一次心跳
            except redis.ConnectionError:
                r = None