机制原理
- 持久化:
- Redis支持RDB和AOF两种持久化方式。RDB会定期将内存中的数据快照到磁盘,AOF则是将写命令追加到日志文件中。在网络波动或订阅者离线时,即使Redis重启,也能通过这些持久化数据恢复发布的消息。
- 发布/订阅模式改进:
- 使用Redis的Stream数据结构,它提供了消息队列的功能。Stream有消费者组的概念,消息发送到Stream后,只有当消费者组中的所有消费者都确认消费了该消息,消息才会被标记为已处理。如果有消费者离线,重新上线后可以继续处理未确认的消息。
- 可靠的发布:
- 在发布消息时,可以使用事务(
MULTI
、EXEC
),确保消息发布操作的原子性。如果在事务执行过程中网络波动,Redis会自动回滚未完成的事务,避免部分发布成功部分失败的情况。
- 心跳检测与重连:
- 订阅者可以定期向Redis发送心跳消息,检测网络连接状态。当检测到网络断开时,立即尝试重连,重连成功后继续接收消息。
关键代码实现思路
- 使用Stream数据结构(Python示例):
import redis
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db = 0)
# 生产者发送消息到Stream
def send_message_to_stream(message):
r.xadd('message_stream', {'message': message})
# 消费者组相关操作
# 创建消费者组
r.xgroup_create('message_stream', 'consumer_group', mkstream = True)
# 消费者从Stream消费消息
def consume_message_from_stream():
while True:
result = r.xreadgroup('consumer_group', 'consumer_1', {'message_stream': '>'}, None, 1)
if result:
_, messages = result[0]
for message_id, message in messages:
print(f"Received message: {message}")
# 处理完消息后确认
r.xack('message_stream', 'consumer_group', message_id)
- 心跳检测与重连:
import time
import redis
def connect_redis():
try:
r = redis.Redis(host='localhost', port=6379, db = 0)
r.ping()
return r
except redis.ConnectionError:
return None
def monitor_connection():
r = connect_redis()
while True:
if r is None:
print("Trying to reconnect...")
r = connect_redis()
else:
try:
r.ping()
time.sleep(5) # 每5秒发送一次心跳
except redis.ConnectionError:
r = None