MST

星途 面试题库

面试题:Redis缓存与MySQL数据一致性:高并发下的处理

假设在高并发场景下,多个请求同时读取Redis缓存并发现缓存过期,随后都尝试从MySQL加载数据并更新Redis缓存,这种情况可能会导致“缓存击穿”问题。请描述如何设计一个解决方案来避免这个问题,并确保数据一致性。
42.2万 热度难度
数据库Redis

知识考点

AI 面试

面试题答案

一键面试
  1. 加锁机制
    • 互斥锁:在从MySQL加载数据前,使用分布式锁(如Redis的SETNX命令实现)。只有获取到锁的请求才能从MySQL加载数据并更新Redis缓存。其他未获取到锁的请求等待一段时间后重试读取Redis缓存。示例代码(以Python和Redis为例):
import redis
import time

redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)

def get_data(key):
    data = redis_client.get(key)
    if data is None:
        lock_key = f'lock:{key}'
        if redis_client.setnx(lock_key, 1):
            try:
                # 从MySQL加载数据
                data_from_mysql = load_data_from_mysql(key)
                # 更新Redis缓存
                redis_client.set(key, data_from_mysql)
                return data_from_mysql
            finally:
                # 释放锁
                redis_client.delete(lock_key)
        else:
            # 等待一段时间后重试
            time.sleep(0.1)
            return get_data(key)
    else:
        return data.decode('utf - 8')


def load_data_from_mysql(key):
    # 模拟从MySQL加载数据
    return f'data for {key}'
  1. 设置过期时间的随机值
    • 为Redis缓存设置过期时间时,使用一个随机值。例如,原本过期时间为60秒,可以设置为55到65秒之间的随机值。这样可以避免大量缓存同时过期导致的缓存击穿问题。示例代码(以Python和Redis为例):
import redis
import random

redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)

def set_data_with_random_expiry(key, value):
    base_expiry = 60
    random_expiry = random.randint(base_expiry - 5, base_expiry + 5)
    redis_client.setex(key, random_expiry, value)
  1. 二级缓存
    • 本地缓存:在应用服务器中添加本地缓存(如Python的functools.lru_cache,或Guava Cache for Java)。当请求读取Redis缓存发现过期时,先检查本地缓存。如果本地缓存中有数据,则直接返回。如果本地缓存中没有数据,再获取分布式锁从MySQL加载数据并更新Redis和本地缓存。示例代码(以Python和Redis为例,使用functools.lru_cache):
import redis
import functools

redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)

@functools.lru_cache(maxsize = 128)
def get_data_from_local_cache(key):
    return redis_client.get(key)


def get_data(key):
    data = get_data_from_local_cache(key)
    if data is None:
        data = redis_client.get(key)
        if data is None:
            # 加锁逻辑
            lock_key = f'lock:{key}'
            if redis_client.setnx(lock_key, 1):
                try:
                    data_from_mysql = load_data_from_mysql(key)
                    redis_client.set(key, data_from_mysql)
                    # 更新本地缓存
                    get_data_from_local_cache.cache_clear()
                    return data_from_mysql
                finally:
                    redis_client.delete(lock_key)
            else:
                # 等待重试
                time.sleep(0.1)
                return get_data(key)
        else:
            # 更新本地缓存
            get_data_from_local_cache.cache_clear()
            return data.decode('utf - 8')
    else:
        return data.decode('utf - 8')


def load_data_from_mysql(key):
    # 模拟从MySQL加载数据
    return f'data for {key}'
  1. 缓存预热
    • 在系统启动时,预先将一些热点数据加载到Redis缓存中,并设置较长的过期时间。这样可以减少在高并发场景下缓存过期的概率。可以通过脚本或者定时任务来实现缓存预热。例如,在Python中可以使用APScheduler库来定时更新热点数据缓存:
from apscheduler.schedulers.background import BackgroundScheduler
import redis

redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)

def warm_up_cache():
    keys = ['key1', 'key2', 'key3'] # 热点数据的键
    for key in keys:
        data = load_data_from_mysql(key)
        redis_client.set(key, data)


scheduler = BackgroundScheduler()
scheduler.add_job(warm_up_cache, 'interval', minutes = 60)
scheduler.start()


def load_data_from_mysql(key):
    # 模拟从MySQL加载数据
    return f'data for {key}'