1. 读写策略
- 读操作:
- 先从缓存(如Redis)读取数据。如果缓存命中,直接返回数据,这样可以大大提高响应速度,减少对数据库的压力。例如,在一个用户信息查询的场景中,首先尝试从Redis中获取用户信息:
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
user_id = 123
user_info = r.get(f'user:{user_id}')
if user_info:
return user_info.decode('utf-8')
- 如果缓存未命中,则从数据库读取数据。读取到数据后,将数据写入缓存,以便后续相同请求可以直接从缓存获取。例如:
import sqlite3
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE id =?', (user_id,))
user_info = cursor.fetchone()
if user_info:
r.set(f'user:{user_id}', str(user_info))
return str(user_info)
- 写操作:
- 先写数据库,再删缓存:这是一种较为常用的策略。在更新数据库成功后,删除对应的缓存数据。这样当下次读取时,缓存未命中,会从数据库重新加载最新数据到缓存。例如,在更新用户信息时:
try:
conn.execute('UPDATE users SET name =? WHERE id =?', ('new_name', user_id))
conn.commit()
r.delete(f'user:{user_id}')
except Exception as e:
# 处理异常,如数据库更新失败回滚等
pass
- **先删缓存,再写数据库**:这种策略存在一定风险。如果删除缓存后,写数据库操作失败,可能导致一段时间内缓存与数据库数据不一致。不过可以通过一些补偿机制来解决,如使用消息队列记录删除缓存的操作,当写数据库失败时,通过消息队列重试删除缓存操作。例如:
r.delete(f'user:{user_id}')
try:
conn.execute('UPDATE users SET name =? WHERE id =?', ('new_name', user_id))
conn.commit()
except Exception as e:
# 记录写数据库失败,通过消息队列重试删除缓存等操作
pass
2. 缓存过期策略
- 设置合理的缓存过期时间:对于一些变化不频繁的数据,可以设置较长的过期时间,减少缓存失效后对数据库的压力。而对于变化频繁的数据,设置较短的过期时间,以确保数据的一致性。例如,商品的基本信息(如名称、描述等)变化不频繁,可以设置几天的过期时间:
r.setex(f'product:{product_id}', 60 * 60 * 24 * 3, product_info) # 3天过期
- 缓存永不过期:对于一些几乎不会变化的数据,可以设置缓存永不过期。但需要在数据发生变化时,手动更新缓存。例如系统的配置信息:
r.set(f'system_config', system_config_info)
3. 分布式缓存一致性问题(如果使用分布式缓存)
- 使用分布式锁:在更新缓存或数据库时,先获取分布式锁(如Redis的SETNX命令实现的锁),确保同一时间只有一个线程进行写操作,避免并发写导致的数据不一致。例如:
lock_key = 'lock:user_update'
lock_value = 'unique_value'
if r.set(lock_key, lock_value, nx=True, ex=10): # 10秒锁过期时间
try:
# 进行写数据库和删缓存操作
pass
finally:
r.delete(lock_key)
- 缓存版本号:为缓存数据设置版本号。每次数据更新时,版本号递增。读取数据时,不仅读取数据,还读取版本号。如果发现缓存版本号与数据库中的版本号不一致,说明数据已更新,需要从数据库重新加载数据并更新缓存。例如:
# 写操作
version = r.incr('user_version')
r.set(f'user:{user_id}:version', version)
# 读操作
cached_version = r.get(f'user:{user_id}:version')
if cached_version and int(cached_version) < get_database_version(user_id):
# 从数据库重新加载数据并更新缓存
pass
4. 缓存预热
- 系统启动时预热:在系统启动阶段,预先将一些常用的数据加载到缓存中,避免系统刚上线时大量缓存未命中导致数据库压力过大。可以通过批量查询数据库并写入缓存的方式实现。例如:
import itertools
user_ids = range(1, 100) # 假设预加载1 - 100的用户信息
user_info_list = []
for user_id in user_ids:
cursor.execute('SELECT * FROM users WHERE id =?', (user_id,))
user_info_list.append(cursor.fetchone())
pipe = r.pipeline()
for user_id, user_info in zip(user_ids, user_info_list):
pipe.set(f'user:{user_id}', str(user_info))
pipe.execute()
- 定时任务预热:通过定时任务,定期更新或加载缓存数据,确保缓存中的数据始终是最新的或常用的。例如,每天凌晨更新商品的热门推荐缓存:
import schedule
import time
def update_hot_products_cache():
hot_products = get_hot_products_from_db()
r.set('hot_products', str(hot_products))
schedule.every().day.at("00:00").do(update_hot_products_cache)
while True:
schedule.run_pending()
time.sleep(1)