MST
星途 面试题库

面试题:Python中Redis数据缓存与过期策略的性能优化及异常处理

在高并发场景下,使用Python和Redis实现数据缓存与过期策略。如何优化缓存读取和写入的性能,避免缓存穿透、缓存雪崩和缓存击穿问题?请详细说明实现思路、使用到的技术手段以及相应的代码示例。另外,在操作Redis过程中,如何优雅地处理可能出现的网络异常、连接超时等问题?
47.0万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

实现思路与技术手段

  1. 缓存读取和写入性能优化

    • 批量操作:利用Redis的pipeline技术,将多个命令打包发送,减少网络往返次数。
    • 合理设置数据结构:根据数据特点选择合适的Redis数据结构,如字符串、哈希表等。例如,对于对象类型的数据,可以使用哈希表存储,减少内存占用。
    • 异步操作:在Python中使用异步库(如asyncio)结合aioredis进行异步的缓存读取和写入,提高并发性能。
  2. 避免缓存穿透

    • 布隆过滤器:在缓存之前使用布隆过滤器,先判断数据是否存在。如果布隆过滤器判断数据不存在,则直接返回,不再查询数据库,从而避免无效查询穿透到数据库。
    • 空值缓存:当查询数据库发现数据不存在时,也将空值存入缓存,并设置较短的过期时间,防止后续相同的无效查询穿透。
  3. 避免缓存雪崩

    • 随机过期时间:给缓存设置的过期时间加上一个随机值,避免大量缓存同时过期。
    • 缓存预热:系统启动时,提前将部分热点数据加载到缓存中,避免在高并发请求下,所有数据都从数据库读取。
  4. 避免缓存击穿

    • 互斥锁:在查询数据库前,先获取一个分布式锁(如使用Redis的SETNX命令实现),只有获取到锁的请求才能查询数据库并更新缓存,其他请求等待。这样可以防止高并发下单个热点数据失效时,大量请求同时查询数据库。
    • 永不过期:对于热点数据设置永不过期,然后使用后台线程定期更新缓存数据,保证数据的实时性。
  5. 处理网络异常和连接超时

    • 重试机制:在遇到网络异常或连接超时后,进行一定次数的重试。可以使用tenacity库来实现重试逻辑。
    • 连接池:使用Redis连接池管理连接,保证连接的复用,减少连接建立和销毁的开销。同时,连接池可以对连接进行健康检查,及时发现并处理异常连接。

代码示例

  1. 基本缓存操作与性能优化
import redis
from redis.client import Pipeline

# 创建Redis连接池
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
redis_client = redis.Redis(connection_pool=pool)


def set_cache(key, value, ex=None):
    with redis_client.pipeline() as pipe:
        pipe.set(key, value, ex=ex)
        pipe.execute()


def get_cache(key):
    return redis_client.get(key)


  1. 避免缓存穿透(布隆过滤器示例)
import bitarray
import math
import mmh3


class BloomFilter:
    def __init__(self, n, p):
        self.n = n
        self.p = p
        self.m = - (n * math.log(p)) / (math.log(2) ** 2)
        self.m = int(self.m)
        self.k = (self.m / n) * math.log(2)
        self.k = int(self.k)
        self.bit_array = bitarray.bitarray(self.m)
        self.bit_array.setall(0)

    def add(self, item):
        for i in range(self.k):
            index = mmh3.hash(item, i) % self.m
            self.bit_array[index] = 1

    def __contains__(self, item):
        for i in range(self.k):
            index = mmh3.hash(item, i) % self.m
            if not self.bit_array[index]:
                return False
        return True


# 使用示例
bloom_filter = BloomFilter(n=10000, p=0.01)


def get_data_without_penetration(key):
    if key in bloom_filter:
        value = get_cache(key)
        if value is not None:
            return value
        else:
            # 从数据库查询数据
            data = get_from_database(key)
            if data:
                set_cache(key, data)
                bloom_filter.add(key)
                return data
            else:
                # 空值缓存
                set_cache(key, None, ex=60)
                return None
    else:
        return None


  1. 避免缓存雪崩
import random


def set_cache_with_random_expiry(key, value, base_expiry=3600):
    random_expiry = base_expiry + random.randint(0, 600)
    set_cache(key, value, ex=random_expiry)


  1. 避免缓存击穿(互斥锁示例)
import time


def get_data_without_breakdown(key):
    value = get_cache(key)
    if value is not None:
        return value
    lock_key = f"lock:{key}"
    lock_acquired = redis_client.set(lock_key, "locked", nx=True, ex=10)
    if lock_acquired:
        try:
            value = get_cache(key)
            if value is None:
                data = get_from_database(key)
                set_cache(key, data)
                return data
            else:
                return value
        finally:
            redis_client.delete(lock_key)
    else:
        # 等待一段时间后重试
        time.sleep(0.1)
        return get_data_without_breakdown(key)


  1. 处理网络异常和连接超时(重试机制示例)
from tenacity import retry, stop_after_attempt, wait_fixed


@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def get_cache_with_retry(key):
    return redis_client.get(key)


@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def set_cache_with_retry(key, value, ex=None):
    with redis_client.pipeline() as pipe:
        pipe.set(key, value, ex=ex)
        pipe.execute()


上述代码展示了在高并发场景下使用Python和Redis实现数据缓存与过期策略,并处理各种问题的方法。具体实现可根据实际业务场景进行调整和优化。