MST

星途 面试题库

面试题:Redis作为MySQL缓存层,如何保证缓存与数据库数据的一致性

在使用Redis作为MySQL缓存层进行读写分离时,确保缓存与数据库数据的一致性至关重要。请详细说明在高并发读写场景下,可能导致缓存与数据库数据不一致的情况,并给出相应的解决方案及具体实现思路。
28.9万 热度难度
数据库Redis

知识考点

AI 面试

面试题答案

一键面试

可能导致缓存与数据库数据不一致的情况

  1. 先写数据库,再删缓存,删缓存失败: 在高并发场景下,写操作先成功更新数据库,但是删除缓存操作失败。此时,缓存中的数据仍是旧数据,后续读请求会读到旧数据,导致数据不一致。
  2. 先删缓存,再写数据库,写数据库失败: 先删除缓存后,写数据库操作失败。此时缓存已被删除,而数据库未更新,后续读请求会从数据库读取旧数据并更新到缓存,同样导致数据不一致。
  3. 并发读写: 在写操作删除缓存后,还未来得及更新数据库时,读操作过来,发现缓存为空,从数据库读取旧数据并更新到缓存,然后写操作更新数据库,此时缓存与数据库数据不一致。

解决方案及实现思路

  1. 重试机制
    • 思路:当删除缓存操作失败时,通过重试机制,多次尝试删除缓存。例如,使用定时任务或者消息队列来实现重试逻辑。如果第一次删除缓存失败,将删除缓存的任务发送到消息队列,由消息队列的消费者进行重试,直到缓存删除成功。
    • 示例代码(Python + Redis + Celery实现重试,假设使用Flask框架)
from flask import Flask
import redis
from celery import Celery

app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db = 0)

# 初始化Celery
celery = Celery('tasks', broker='redis://localhost:6379/1')

@celery.task(bind=True, default_retry_delay = 5 * 60, max_retries = 5)
def delete_cache(self, key):
    try:
        redis_client.delete(key)
    except Exception as exc:
        self.retry(exc=exc)


# 假设这里是写数据库的函数
def write_to_db(data):
    # 模拟写数据库操作
    pass


@app.route('/write_data/<data>')
def write(data):
    write_to_db(data)
    try:
        redis_client.delete('cache_key')
    except Exception:
        delete_cache.delay('cache_key')
    return 'Data written successfully'
  1. 读写锁
    • 思路:在读写操作前获取读写锁。写操作获取写锁,读操作获取读锁。写锁具有排他性,当有写操作时,其他读写操作都需要等待。读锁可以共享,多个读操作可以同时进行。这样可以避免并发读写导致的数据不一致问题。
    • 示例代码(Python + Redis实现读写锁,使用redlock - py库)
from redlock import Redlock
import redis

redis_client = redis.Redis(host='localhost', port=6379, db = 0)
redlock = Redlock([{
    "host": "localhost",
    "port": 6379,
    "db": 0
}])

def read_data():
    lock = redlock.lock('read_lock', 10000)
    if lock:
        try:
            data = redis_client.get('data_key')
            if not data:
                data = get_from_db()
                redis_client.set('data_key', data)
            return data
        finally:
            redlock.unlock(lock)
    else:
        return 'Failed to acquire read lock'


def write_data(data):
    lock = redlock.lock('write_lock', 10000)
    if lock:
        try:
            write_to_db(data)
            redis_client.delete('data_key')
        finally:
            redlock.unlock(lock)
    else:
        return 'Failed to acquire write lock'


# 假设这里是从数据库获取数据的函数
def get_from_db():
    pass


# 假设这里是写数据库的函数
def write_to_db(data):
    pass
  1. 缓存失效时间
    • 思路:为缓存设置合理的失效时间。即使缓存与数据库数据出现短暂不一致,在缓存失效后,后续读请求会重新从数据库读取数据并更新缓存,从而保证数据的最终一致性。失效时间的设置需要根据业务场景进行权衡,既要避免缓存频繁失效导致过多的数据库读压力,又要保证数据不会长时间不一致。
    • 示例代码(Python + Redis设置缓存失效时间)
import redis

redis_client = redis.Redis(host='localhost', port=6379, db = 0)

def read_data():
    data = redis_client.get('data_key')
    if not data:
        data = get_from_db()
        redis_client.setex('data_key', 3600, data) # 设置缓存有效期为1小时
    return data


def write_data(data):
    write_to_db(data)
    redis_client.delete('data_key')


# 假设这里是从数据库获取数据的函数
def get_from_db():
    pass


# 假设这里是写数据库的函数
def write_to_db(data):
    pass
  1. 异步更新缓存
    • 思路:使用消息队列(如Kafka、RabbitMQ等),将写操作发送到消息队列,由消费者异步处理更新缓存。这样可以减少写操作的响应时间,并且通过消息队列的可靠性保证缓存更新操作的执行。
    • 示例代码(Python + RabbitMQ + Redis实现异步更新缓存,使用pika库连接RabbitMQ)
import pika
import redis

redis_client = redis.Redis(host='localhost', port=6379, db = 0)

# 连接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='cache_update')

def callback(ch, method, properties, body):
    key = body.decode('utf - 8')
    redis_client.delete(key)


channel.basic_consume(queue='cache_update', on_message_callback = callback, auto_ack = True)


# 假设这里是写数据库的函数
def write_to_db(data):
    pass


def write_data(data):
    write_to_db(data)
    channel.basic_publish(exchange='', routing_key='cache_update', body='cache_key')


if __name__ == '__main__':
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()