面试题答案
一键面试实现思路与技术手段
-
缓存读取和写入性能优化
- 批量操作:利用Redis的pipeline技术,将多个命令打包发送,减少网络往返次数。
- 合理设置数据结构:根据数据特点选择合适的Redis数据结构,如字符串、哈希表等。例如,对于对象类型的数据,可以使用哈希表存储,减少内存占用。
- 异步操作:在Python中使用异步库(如
asyncio
)结合aioredis
进行异步的缓存读取和写入,提高并发性能。
-
避免缓存穿透
- 布隆过滤器:在缓存之前使用布隆过滤器,先判断数据是否存在。如果布隆过滤器判断数据不存在,则直接返回,不再查询数据库,从而避免无效查询穿透到数据库。
- 空值缓存:当查询数据库发现数据不存在时,也将空值存入缓存,并设置较短的过期时间,防止后续相同的无效查询穿透。
-
避免缓存雪崩
- 随机过期时间:给缓存设置的过期时间加上一个随机值,避免大量缓存同时过期。
- 缓存预热:系统启动时,提前将部分热点数据加载到缓存中,避免在高并发请求下,所有数据都从数据库读取。
-
避免缓存击穿
- 互斥锁:在查询数据库前,先获取一个分布式锁(如使用Redis的SETNX命令实现),只有获取到锁的请求才能查询数据库并更新缓存,其他请求等待。这样可以防止高并发下单个热点数据失效时,大量请求同时查询数据库。
- 永不过期:对于热点数据设置永不过期,然后使用后台线程定期更新缓存数据,保证数据的实时性。
-
处理网络异常和连接超时
- 重试机制:在遇到网络异常或连接超时后,进行一定次数的重试。可以使用
tenacity
库来实现重试逻辑。 - 连接池:使用Redis连接池管理连接,保证连接的复用,减少连接建立和销毁的开销。同时,连接池可以对连接进行健康检查,及时发现并处理异常连接。
- 重试机制:在遇到网络异常或连接超时后,进行一定次数的重试。可以使用
代码示例
- 基本缓存操作与性能优化
import redis
from redis.client import Pipeline
# 创建Redis连接池
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
redis_client = redis.Redis(connection_pool=pool)
def set_cache(key, value, ex=None):
with redis_client.pipeline() as pipe:
pipe.set(key, value, ex=ex)
pipe.execute()
def get_cache(key):
return redis_client.get(key)
- 避免缓存穿透(布隆过滤器示例)
import bitarray
import math
import mmh3
class BloomFilter:
def __init__(self, n, p):
self.n = n
self.p = p
self.m = - (n * math.log(p)) / (math.log(2) ** 2)
self.m = int(self.m)
self.k = (self.m / n) * math.log(2)
self.k = int(self.k)
self.bit_array = bitarray.bitarray(self.m)
self.bit_array.setall(0)
def add(self, item):
for i in range(self.k):
index = mmh3.hash(item, i) % self.m
self.bit_array[index] = 1
def __contains__(self, item):
for i in range(self.k):
index = mmh3.hash(item, i) % self.m
if not self.bit_array[index]:
return False
return True
# 使用示例
bloom_filter = BloomFilter(n=10000, p=0.01)
def get_data_without_penetration(key):
if key in bloom_filter:
value = get_cache(key)
if value is not None:
return value
else:
# 从数据库查询数据
data = get_from_database(key)
if data:
set_cache(key, data)
bloom_filter.add(key)
return data
else:
# 空值缓存
set_cache(key, None, ex=60)
return None
else:
return None
- 避免缓存雪崩
import random
def set_cache_with_random_expiry(key, value, base_expiry=3600):
random_expiry = base_expiry + random.randint(0, 600)
set_cache(key, value, ex=random_expiry)
- 避免缓存击穿(互斥锁示例)
import time
def get_data_without_breakdown(key):
value = get_cache(key)
if value is not None:
return value
lock_key = f"lock:{key}"
lock_acquired = redis_client.set(lock_key, "locked", nx=True, ex=10)
if lock_acquired:
try:
value = get_cache(key)
if value is None:
data = get_from_database(key)
set_cache(key, data)
return data
else:
return value
finally:
redis_client.delete(lock_key)
else:
# 等待一段时间后重试
time.sleep(0.1)
return get_data_without_breakdown(key)
- 处理网络异常和连接超时(重试机制示例)
from tenacity import retry, stop_after_attempt, wait_fixed
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def get_cache_with_retry(key):
return redis_client.get(key)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def set_cache_with_retry(key, value, ex=None):
with redis_client.pipeline() as pipe:
pipe.set(key, value, ex=ex)
pipe.execute()
上述代码展示了在高并发场景下使用Python和Redis实现数据缓存与过期策略,并处理各种问题的方法。具体实现可根据实际业务场景进行调整和优化。