- 丢弃新请求
- 策略阐述:当漏桶已满,新到来的请求直接被丢弃,不进行处理。这种策略简单直接,适用于对请求处理实时性要求不高,或者请求本身不是非常关键的场景。
- Redis实现:假设我们用Redis的字符串类型来表示漏桶的水量(请求数量),用
key
来标识这个漏桶。例如bucket_key
。我们可以使用INCR
命令来模拟请求进入漏桶。每次有新请求时,先执行INCR bucket_key
,如果返回值大于漏桶容量(假设容量为capacity
),则丢弃该请求,然后执行DECR bucket_key
将计数恢复。如下代码示例(以Python为例,结合redis - py库):
import redis
r = redis.Redis(host='localhost', port = 6379, db = 0)
capacity = 10
# 模拟新请求到来
new_request_count = r.incr('bucket_key')
if new_request_count > capacity:
r.decr('bucket_key')
print('请求被丢弃')
else:
print('请求被处理')
- 阻塞等待
- 策略阐述:当漏桶已满时,新请求进入等待队列,直到漏桶有空间时再处理。这种策略适用于请求不能丢失,但允许一定程度延迟处理的场景。
- Redis实现:我们可以使用Redis的列表(List)数据结构来实现等待队列。假设用
bucket_key
表示漏桶当前水量,用waiting_queue_key
表示等待队列。每次有新请求时,先检查漏桶是否已满(通过GET bucket_key
获取当前水量与容量比较),如果已满,将请求相关信息(比如请求ID等)通过RPUSH waiting_queue_key request_info
命令加入等待队列。当漏桶有空间时(可以通过定期检查或者在处理请求后检查),使用LPOP waiting_queue_key
从队列中取出请求进行处理。如下代码示例(以Python为例,结合redis - py库):
import redis
r = redis.Redis(host='localhost', port = 6379, db = 0)
capacity = 10
# 模拟新请求到来
current_water = int(r.get('bucket_key') or 0)
if current_water >= capacity:
r.rpush('waiting_queue_key','request_id_123')
else:
r.incr('bucket_key')
print('请求被处理')
# 检查漏桶有空间时处理等待队列中的请求
if current_water < capacity:
waiting_request = r.lpop('waiting_queue_key')
if waiting_request:
r.incr('bucket_key')
print('从等待队列中取出请求并处理:', waiting_request)
- 限流后处理
- 策略阐述:当漏桶已满时,对新请求进行特殊处理,比如记录下来,然后在系统负载较低或者漏桶有空闲空间时进行批量处理。这种策略适用于对请求数据完整性有要求,并且可以在后续集中处理的场景。
- Redis实现:可以使用Redis的有序集合(Sorted Set)来记录这些请求。假设用
bucket_key
表示漏桶当前水量,用delayed_requests_key
表示存储延迟处理请求的有序集合。每次有新请求时,先检查漏桶是否已满(通过GET bucket_key
获取当前水量与容量比较),如果已满,将请求相关信息(如请求ID和时间戳)通过ZADD delayed_requests_key timestamp request_id
命令加入有序集合(这里时间戳可以用于后续按照顺序处理)。当漏桶有空间时,通过ZRANGEBYSCORE delayed_requests_key -inf +inf LIMIT 0 1 WITHSCORES
获取最早的请求(根据时间戳),处理后通过ZREM delayed_requests_key request_id
移除该请求。如下代码示例(以Python为例,结合redis - py库):
import redis
import time
r = redis.Redis(host='localhost', port = 6379, db = 0)
capacity = 10
# 模拟新请求到来
current_water = int(r.get('bucket_key') or 0)
if current_water >= capacity:
timestamp = time.time()
r.zadd('delayed_requests_key', {f'request_id_123': timestamp})
else:
r.incr('bucket_key')
print('请求被处理')
# 检查漏桶有空间时处理延迟请求
if current_water < capacity:
result = r.zrangebyscore('delayed_requests_key', '-inf', '+inf', start = 0, num = 1, withscores = True)
if result:
request_id, _ = result[0]
r.zrem('delayed_requests_key', request_id)
r.incr('bucket_key')
print('从延迟集合中取出请求并处理:', request_id)