MST

星途 面试题库

面试题:Redis队列实现MySQL批量写入时的异常处理与数据一致性保障

在基于Redis队列实现MySQL批量数据写入过程中,如果出现Redis服务故障、MySQL写入失败等异常情况,如何设计一套机制确保数据的一致性,保证不会丢失数据且不会重复写入,详细描述整体的异常处理流程和涉及到的技术手段。
10.2万 热度难度
数据库Redis

知识考点

AI 面试

面试题答案

一键面试
  1. Redis服务故障处理
    • 重试机制
      • 当检测到Redis服务故障时,应用程序启动重试逻辑。可以设置一个固定的重试次数,比如5次,每次重试间隔一定时间,如从1秒开始,每次重试间隔翻倍(1秒、2秒、4秒、8秒、16秒),以避免过于频繁的重试对系统造成过大压力。
      • 代码示例(以Python为例):
import time
retry_count = 0
while retry_count < 5:
    try:
        # 尝试从Redis队列获取数据
        data = redis_client.rpop('my_queue')
        if data:
            break
    except redis.RedisError as e:
        sleep_time = 2 ** retry_count
        time.sleep(sleep_time)
        retry_count += 1
  • 数据持久化
    • 采用Redis的AOF(Append - Only File)或RDB(Redis Database)持久化机制。AOF模式下,Redis会将每一个写命令追加到文件末尾,RDB则是定期将内存中的数据快照保存到磁盘。如果Redis故障重启,可通过加载持久化文件恢复数据。
    • 配置示例:在redis.conf文件中,启用AOF持久化:
appendonly yes
  • 备用Redis节点
    • 搭建Redis主从集群或哨兵模式。在主从集群中,从节点复制主节点的数据。当主节点出现故障时,手动或自动将从节点提升为主节点。在哨兵模式下,哨兵会自动监控主节点状态,当主节点故障时,自动进行故障转移,选举新的主节点。
    • 以哨兵模式为例,配置redis - sentinel.conf文件:
sentinel monitor mymaster 127.0.0.1 6379 2
  1. MySQL写入失败处理
    • 事务回滚
      • 在进行MySQL批量数据写入时,使用事务包裹所有的写入操作。如果其中任何一条数据写入失败,整个事务回滚,确保MySQL数据的一致性。
      • 代码示例(以Java JDBC为例):
Connection conn = DriverManager.getConnection(url, username, password);
try {
    conn.setAutoCommit(false);
    PreparedStatement pstmt = conn.prepareStatement("INSERT INTO my_table (col1, col2) VALUES (?,?)");
    for (DataObject data : dataList) {
        pstmt.setString(1, data.getCol1());
        pstmt.setString(2, data.getCol2());
        pstmt.addBatch();
    }
    pstmt.executeBatch();
    conn.commit();
} catch (SQLException e) {
    if (conn!= null) {
        try {
            conn.rollback();
        } catch (SQLException ex) {
            ex.printStackTrace();
        }
    }
    e.printStackTrace();
} finally {
    if (conn!= null) {
        try {
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}
  • 重试机制
    • 与Redis重试类似,当MySQL写入失败时,启动重试逻辑。同样可以设置重试次数和间隔时间。在重试时,要确保重试的数据不会重复写入。可以为每条数据添加唯一标识(如UUID),在重试前先查询MySQL中是否已存在该数据。
    • 代码示例(以Python为例,使用uuid模块生成唯一标识):
import uuid
import mysql.connector
retry_count = 0
data_id = uuid.uuid4()
while retry_count < 5:
    try:
        conn = mysql.connector.connect(user='user', password='password', host='127.0.0.1', database='mydb')
        cursor = conn.cursor()
        cursor.execute("SELECT COUNT(*) FROM my_table WHERE data_id = %s", (str(data_id),))
        count = cursor.fetchone()[0]
        if count == 0:
            cursor.execute("INSERT INTO my_table (data_id, col1) VALUES (%s, %s)", (str(data_id), 'data_value'))
            conn.commit()
            break
    except mysql.connector.Error as e:
        sleep_time = 2 ** retry_count
        time.sleep(sleep_time)
        retry_count += 1
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()
  • 数据缓存
    • 当MySQL写入失败时,将失败的数据暂时缓存到本地文件或内存队列(如Python的Queue模块)中。在故障排除后,重新读取缓存数据进行写入。对于缓存到本地文件的情况,要注意文件的读写操作的原子性和并发控制。
    • 代码示例(以Python的Queue模块为例):
from queue import Queue
failed_queue = Queue()
try:
    # MySQL写入操作
    pass
except mysql.connector.Error as e:
    failed_data = {'data': 'failed_value', 'id': 'unique_id'}
    failed_queue.put(failed_data)
while not failed_queue.empty():
    data = failed_queue.get()
    # 重新尝试写入MySQL
  1. 防止重复写入
    • 唯一索引
      • 在MySQL表中为关键列(如业务标识列)添加唯一索引。当尝试插入重复数据时,MySQL会抛出唯一约束冲突错误,应用程序捕获该错误并进行相应处理(如重试或跳过)。
      • 创建唯一索引的SQL语句:
CREATE UNIQUE INDEX idx_unique_col ON my_table (unique_col);
  • 去重缓存
    • 在应用程序层面,维护一个去重缓存(如Python的set集合)。在将数据写入MySQL之前,先检查去重缓存中是否已存在该数据。如果存在,则跳过写入操作;如果不存在,则将数据添加到去重缓存并写入MySQL。
    • 代码示例(以Python为例):
duplicate_cache = set()
data = {'id': 1, 'value': 'test'}
if data['id'] not in duplicate_cache:
    duplicate_cache.add(data['id'])
    # 写入MySQL操作