MST

星途 面试题库

面试题:Python中Redis缓存机制实现之高并发优化

在高并发场景下,使用Python的Redis缓存机制可能会遇到缓存击穿、缓存雪崩和缓存穿透问题。请分别阐述这三个问题产生的原因,并给出在Python中如何利用Redis自身特性或结合其他技术手段来解决这些问题的详细方案。
23.0万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

缓存击穿

  • 产生原因:大量请求同时访问一个过期的热点数据,瞬间大量请求绕过缓存直接访问数据库,可能导致数据库压力过大甚至崩溃。
  • 解决方案
    • 互斥锁:在Python中使用redis.lock模块实现互斥锁。当缓存过期时,只有一个线程能获取到锁去查询数据库并更新缓存,其他线程等待。例如:
import redis
r = redis.Redis()
lock = r.lock('my_lock')
try:
    if not r.exists('key'):
        if lock.acquire(blocking=True, timeout=10):
            value = get_from_db('key')
            r.set('key', value)
            lock.release()
        else:
            time.sleep(0.1)
            # 重试获取锁
except Exception as e:
    print(e)
    if lock.locked():
        lock.release()
- **永不过期**:对热点数据设置永不过期,使用额外的逻辑定期更新数据。例如:
import redis
r = redis.Redis()
value = r.get('key')
if not value:
    new_value = get_from_db('key')
    r.set('key', new_value)
    # 异步更新数据
    threading.Thread(target=update_data_periodically, args=(r, 'key')).start()

缓存雪崩

  • 产生原因:大量缓存数据在同一时间过期,导致大量请求同时访问数据库,使数据库压力剧增,甚至宕机。
  • 解决方案
    • 随机过期时间:在设置缓存过期时间时,给每个缓存数据加上一个随机的过期时间偏移量,避免大量数据同时过期。例如:
import redis
import random
r = redis.Redis()
expire_time = 3600 + random.randint(0, 600)
r.setex('key', expire_time, value)
- **二级缓存**:使用两层缓存,第一层缓存失效后,先从第二层缓存获取数据,减少对数据库的直接访问。例如:
import redis
r1 = redis.Redis(db=0)
r2 = redis.Redis(db=1)
value = r1.get('key')
if not value:
    value = r2.get('key')
    if not value:
        value = get_from_db('key')
        r1.set('key', value, ex=3600)
        r2.set('key', value, ex=7200)

缓存穿透

  • 产生原因:查询一个不存在的数据,每次都绕过缓存直接查询数据库,若有大量这种请求,会给数据库造成巨大压力。
  • 解决方案
    • 布隆过滤器:在Python中使用bitarraymmh3模块实现布隆过滤器。在写入数据到缓存时,同时将数据的key加入布隆过滤器。查询时,先通过布隆过滤器判断key是否存在,若不存在则直接返回,不再查询数据库。例如:
import bitarray
import mmh3
import redis
r = redis.Redis()
bf = bitarray.bitarray(1000000)
bf.setall(0)
def add_to_bloom(key):
    hashes = [mmh3.hash(key, seed) % len(bf) for seed in range(3)]
    for h in hashes:
        bf[h] = 1
def might_contain(key):
    hashes = [mmh3.hash(key, seed) % len(bf) for seed in range(3)]
    return all(bf[h] for h in hashes)
# 写入数据时
value = get_from_db('key')
if value:
    r.set('key', value)
    add_to_bloom('key')
# 查询数据时
if not might_contain('key'):
    return None
value = r.get('key')
if not value:
    value = get_from_db('key')
    if value:
        r.set('key', value)
- **空值缓存**:当查询数据库发现数据不存在时,也将这个空值缓存起来,并设置一个较短的过期时间,避免后续相同的查询继续穿透到数据库。例如:
import redis
r = redis.Redis()
value = r.get('key')
if not value:
    value = get_from_db('key')
    if value:
        r.set('key', value)
    else:
        r.setex('key', 60, '')  # 空值缓存60秒