MST
星途 面试题库

面试题:如何通过Redis实现漏桶限流以适应多维度业务需求

现有一个社交平台,其业务需求包括按用户ID、IP地址、设备类型等多个维度进行限流。请详细描述如何利用Redis实现漏桶限流,以满足这些不同维度的业务需求,同时要考虑到系统的性能和可扩展性。
28.1万 热度难度
数据库Redis

知识考点

AI 面试

面试题答案

一键面试

1. 漏桶限流原理

漏桶算法的核心概念是有一个固定容量的桶,请求进来就像水倒入桶中,而系统以固定速率从桶中取水(处理请求)。如果桶满了,新进来的水(请求)就会溢出(被拒绝)。

2. Redis 数据结构选择

使用 Redis 的 zset(有序集合) 来记录请求时间戳,string 来记录桶的剩余容量。

3. 按用户ID限流实现

  • 初始化桶: 在 Redis 中为每个用户ID创建一个键值对,键为 user:limit:{user_id} 用于记录桶的剩余容量,值为桶的初始容量。同时创建一个 zset,键为 user:timestamps:{user_id},用于记录用户每次请求的时间戳。
# 假设桶容量为 100,处理速率为每秒 10 个请求
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

def init_user_bucket(user_id, capacity, rate):
    r.set(f'user:limit:{user_id}', capacity)
    r.set(f'user:rate:{user_id}', rate)

  • 处理请求: 每次用户请求时,计算当前桶中剩余容量。从 zset 中移除过期的时间戳(根据处理速率计算),并更新桶的剩余容量。如果剩余容量不足,则拒绝请求。
import time

def user_request(user_id):
    capacity = int(r.get(f'user:limit:{user_id}'))
    rate = int(r.get(f'user:rate:{user_id}'))
    now = time.time()
    # 移除过期的时间戳
    r.zremrangebyscore(f'user:timestamps:{user_id}', 0, now - 1)
    current_count = r.zcard(f'user:timestamps:{user_id}')
    # 计算剩余容量
    available = capacity - current_count
    if available <= 0:
        return False
    r.zadd(f'user:timestamps:{user_id}', {now: now})
    return True

4. 按 IP 地址限流实现

与按用户ID限流类似,只不过键的命名改为 ip:limit:{ip_address}ip:timestamps:{ip_address}

def init_ip_bucket(ip_address, capacity, rate):
    r.set(f'ip:limit:{ip_address}', capacity)
    r.set(f'ip:rate:{ip_address}', rate)

def ip_request(ip_address):
    capacity = int(r.get(f'ip:limit:{ip_address}'))
    rate = int(r.get(f'ip:rate:{ip_address}'))
    now = time.time()
    r.zremrangebyscore(f'ip:timestamps:{ip_address}', 0, now - 1)
    current_count = r.zcard(f'ip:timestamps:{ip_address}')
    available = capacity - current_count
    if available <= 0:
        return False
    r.zadd(f'ip:timestamps:{ip_address}', {now: now})
    return True

5. 按设备类型限流实现

同样的原理,键命名为 device:limit:{device_type}device:timestamps:{device_type}

def init_device_bucket(device_type, capacity, rate):
    r.set(f'device:limit:{device_type}', capacity)
    r.set(f'device:rate:{device_type}', rate)

def device_request(device_type):
    capacity = int(r.get(f'device:limit:{device_type}'))
    rate = int(r.get(f'device:rate:{device_type}'))
    now = time.time()
    r.zremrangebyscore(f'device:timestamps:{device_type}', 0, now - 1)
    current_count = r.zcard(f'device:timestamps:{device_type}')
    available = capacity - current_count
    if available <= 0:
        return False
    r.zadd(f'device:timestamps:{device_type}', {now: now})
    return True

6. 性能与可扩展性考虑

  • 性能
    • Redis 是基于内存的,读写操作非常快,适合高并发场景下的限流操作。
    • 使用 pipeline 技术可以批量执行多个 Redis 命令,减少网络开销。
  • 可扩展性
    • 可以通过 Redis 集群来提高系统的可扩展性,将不同维度的限流数据分布在不同的节点上。
    • 对于大规模的系统,可以考虑使用分布式缓存如 Redis Cluster 来处理海量的限流规则和请求记录。同时,可以使用一致性哈希算法来确保数据在集群中的均匀分布。