MST

星途 面试题库

面试题:缓存设计之Redis发布订阅在复杂实时通知场景下的优化

在一个电商系统中,有多种类型的实时通知需求,如订单状态变更通知、商品促销通知等。每种通知可能有不同的优先级和目标受众。使用Redis发布订阅模式,你将如何设计系统以满足这些复杂需求,包括如何管理不同类型通知的队列、确保高优先级通知优先处理以及如何优化性能以应对大量并发通知请求。
28.4万 热度难度
后端开发缓存设计

知识考点

AI 面试

面试题答案

一键面试

1. 设计不同类型通知的队列

  • 使用不同频道:利用Redis的发布订阅机制,为每种类型的通知创建独立的频道。例如,order_status_channel用于订单状态变更通知,product_promotion_channel用于商品促销通知。发布者根据通知类型向对应的频道发布消息。
import redis

r = redis.Redis(host='localhost', port=6379, db=0)
# 发布订单状态变更通知
r.publish('order_status_channel', '订单已发货')
  • 消息结构:对于每个通知消息,设计一个包含通知类型、优先级、目标受众等信息的结构体。例如,使用JSON格式:
{
    "type": "order_status",
    "priority": "high",
    "target_audience": "all",
    "content": "订单已发货"
}

2. 确保高优先级通知优先处理

  • 优先级队列:为每个频道设置一个基于优先级的队列。可以使用Redis的Sorted Set数据结构,以优先级为分数,通知消息为成员。例如,高优先级的通知分数设为100,中等优先级为50,低优先级为10。
# 添加高优先级订单状态通知到队列
r.zadd('order_status_priority_queue', {'订单已发货': 100})
  • 消费者处理:消费者从优先级队列中获取消息时,按照分数(优先级)从高到低获取。例如,在Python中:
while True:
    result = r.zrange('order_status_priority_queue', 0, 0, withscores=True)
    if result:
        message, score = result[0]
        # 处理消息
        print(f'处理消息: {message}, 优先级: {score}')
        r.zrem('order_status_priority_queue', message)

3. 优化性能以应对大量并发通知请求

  • 缓存处理:对于一些频繁发送且内容不经常变化的通知,如长期有效的商品促销通知,可以在Redis中设置缓存。当有新的通知请求时,先检查缓存,如果缓存中有则直接使用缓存内容进行发布,减少处理时间。
# 检查商品促销通知缓存
cached_promotion = r.get('cached_product_promotion')
if cached_promotion:
    r.publish('product_promotion_channel', cached_promotion)
else:
    new_promotion = '新的商品促销活动!'
    r.set('cached_product_promotion', new_promotion)
    r.publish('product_promotion_channel', new_promotion)
  • 批量处理:在发布和订阅端都可以采用批量处理的方式。发布者可以批量收集通知消息,然后一次性发布到频道。订阅者可以批量获取消息并进行处理,减少Redis的交互次数,提高效率。
# 批量发布通知
messages = ['通知1', '通知2', '通知3']
for message in messages:
    r.publish('general_notification_channel', message)
  • 多线程/多进程:在消费者端,可以使用多线程或多进程来并行处理通知消息,提高处理速度。例如,在Python中使用multiprocessing模块:
import multiprocessing

def process_message(message):
    # 处理消息逻辑
    print(f'处理消息: {message}')

if __name__ == '__main__':
    messages = ['消息1', '消息2', '消息3']
    pool = multiprocessing.Pool(processes=3)
    pool.map(process_message, messages)
    pool.close()
    pool.join()