MST

星途 面试题库

面试题:Redis与MySQL批量数据交互时的一致性处理

当使用Redis辅助MySQL进行批量数据处理时,如何保证Redis缓存数据与MySQL数据库数据的一致性?请详细描述可能用到的策略及实现方式,包括但不限于缓存更新策略、双写一致性等。
19.4万 热度难度
数据库Redis

知识考点

AI 面试

面试题答案

一键面试

缓存更新策略

  1. Cache-Aside Pattern(旁路缓存模式)
    • 读取流程
      • 应用程序首先从Redis缓存中读取数据。
      • 如果缓存命中,直接返回缓存中的数据。
      • 如果缓存未命中,从MySQL数据库中查询数据。
      • 将从数据库查询到的数据写入Redis缓存,然后返回给应用程序。
    • 写入流程
      • 应用程序先更新MySQL数据库中的数据。
      • 然后删除Redis缓存中的对应数据。这样下次读取时就会从数据库重新加载最新数据到缓存。
    • 实现方式(以Python为例,使用redis - pypymysql库)
import redis
import pymysql

# 连接Redis
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
# 连接MySQL
mysql_conn = pymysql.connect(host='localhost', user='root', password='password', database='test')


def get_data_from_db(key):
    with mysql_conn.cursor() as cursor:
        sql = "SELECT data FROM your_table WHERE key = %s"
        cursor.execute(sql, (key,))
        result = cursor.fetchone()
        if result:
            return result[0]
        return None


def get_data(key):
    data = redis_client.get(key)
    if data:
        return data.decode('utf - 8')
    data = get_data_from_db(key)
    if data:
        redis_client.set(key, data)
    return data


def update_data(key, new_data):
    with mysql_conn.cursor() as cursor:
        sql = "UPDATE your_table SET data = %s WHERE key = %s"
        cursor.execute(sql, (new_data, key))
        mysql_conn.commit()
    redis_client.delete(key)


  1. Write - Through Pattern(直写模式)
    • 读取流程:与Cache - Aside Pattern读取流程类似,先从Redis缓存读取,命中则返回,未命中从MySQL读取并写入缓存。
    • 写入流程
      • 应用程序同时更新MySQL数据库和Redis缓存中的数据。
      • 这种方式保证了缓存和数据库数据的强一致性,但由于需要同时操作两个存储,性能相对较低。
    • 实现方式(以Java为例,使用Jedis和JDBC)
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class WriteThroughExample {
    private static final String DB_URL = "jdbc:mysql://localhost:3306/test";
    private static final String DB_USER = "root";
    private static final String DB_PASSWORD = "password";
    private static final String REDIS_HOST = "localhost";
    private static final int REDIS_PORT = 6379;

    public static String getDataFromDB(String key) {
        try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
             PreparedStatement pstmt = conn.prepareStatement("SELECT data FROM your_table WHERE key =?")) {
            pstmt.setString(1, key);
            try (java.sql.ResultSet rs = pstmt.executeQuery()) {
                if (rs.next()) {
                    return rs.getString("data");
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static String getData(String key) {
        try (Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT)) {
            String data = jedis.get(key);
            if (data != null) {
                return data;
            }
            data = getDataFromDB(key);
            if (data != null) {
                jedis.set(key, data);
            }
            return data;
        }
    }

    public static void updateData(String key, String newData) {
        try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
             PreparedStatement pstmt = conn.prepareStatement("UPDATE your_table SET data =? WHERE key =?")) {
            pstmt.setString(1, newData);
            pstmt.setString(2, key);
            pstmt.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        try (Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT)) {
            jedis.set(key, newData);
        }
    }
}
  1. Write - Behind Caching Pattern(异步缓存写入模式)
    • 读取流程:同Cache - Aside Pattern读取流程。
    • 写入流程
      • 应用程序只更新Redis缓存。
      • 后台线程定期将Redis缓存中的更新数据批量写入MySQL数据库。这种方式可以提高写入性能,但存在数据一致性问题,因为从缓存更新到数据库写入之间有延迟。
    • 实现方式(以Python为例,使用redis - pypymysqlthreading模块)
import redis
import pymysql
import threading
import time

# 连接Redis
redis_client = redis.StrictRedis(host='localhost', port=6379, db = 0)
# 连接MySQL
mysql_conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
update_queue = []


def write_to_db_periodically():
    global update_queue
    while True:
        if update_queue:
            with mysql_conn.cursor() as cursor:
                for key, value in update_queue:
                    sql = "UPDATE your_table SET data = %s WHERE key = %s"
                    cursor.execute(sql, (value, key))
                mysql_conn.commit()
            update_queue = []
        time.sleep(5)


t = threading.Thread(target = write_to_db_periodically)
t.start()


def update_data(key, new_data):
    redis_client.set(key, new_data)
    update_queue.append((key, new_data))


双写一致性

  1. 问题场景:在高并发环境下,当两个请求同时对数据进行读写操作时,可能会出现缓存和数据库数据不一致的情况。例如,一个请求先更新数据库,然后删除缓存,另一个请求在缓存删除之前读取了缓存中的旧数据,之后又从数据库读取并更新了缓存,导致缓存中的数据为旧数据。
  2. 解决方案
    • 读写锁:使用读写锁来保证在更新数据时,其他读操作等待。在Java中可以使用ReentrantReadWriteLock。例如:
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockExample {
    private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private static final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
    private static final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

    public static String getData(String key) {
        readLock.lock();
        try {
            // 从缓存或数据库读取数据逻辑
            return null;
        } finally {
            readLock.unlock();
        }
    }

    public static void updateData(String key, String newData) {
        writeLock.lock();
        try {
            // 更新数据库和缓存逻辑
        } finally {
            writeLock.unlock();
        }
    }
}
- **队列处理**:将所有的写操作放入一个队列中,顺序执行。这样可以避免并发写操作导致的一致性问题。例如,在Python中可以使用`Queue`模块:
import queue
import threading

update_queue = queue.Queue()


def process_update_queue():
    while True:
        key, value = update_queue.get()
        # 执行更新数据库和缓存的操作
        update_queue.task_done()


t = threading.Thread(target = process_update_queue)
t.start()


def update_data(key, new_data):
    update_queue.put((key, new_data))


- **使用分布式事务(如XA协议)**:如果是分布式系统,可以使用XA协议来保证数据库和缓存操作在同一个事务中。不过这种方式实现复杂,性能开销大。在Java中可以使用`Atomikos`等分布式事务框架来实现。