- Redis服务故障处理
- 重试机制:
- 当检测到Redis服务故障时,应用程序启动重试逻辑。可以设置一个固定的重试次数,比如5次,每次重试间隔一定时间,如从1秒开始,每次重试间隔翻倍(1秒、2秒、4秒、8秒、16秒),以避免过于频繁的重试对系统造成过大压力。
- 代码示例(以Python为例):
import time
retry_count = 0
while retry_count < 5:
try:
# 尝试从Redis队列获取数据
data = redis_client.rpop('my_queue')
if data:
break
except redis.RedisError as e:
sleep_time = 2 ** retry_count
time.sleep(sleep_time)
retry_count += 1
- 数据持久化:
- 采用Redis的AOF(Append - Only File)或RDB(Redis Database)持久化机制。AOF模式下,Redis会将每一个写命令追加到文件末尾,RDB则是定期将内存中的数据快照保存到磁盘。如果Redis故障重启,可通过加载持久化文件恢复数据。
- 配置示例:在
redis.conf
文件中,启用AOF持久化:
appendonly yes
- 备用Redis节点:
- 搭建Redis主从集群或哨兵模式。在主从集群中,从节点复制主节点的数据。当主节点出现故障时,手动或自动将从节点提升为主节点。在哨兵模式下,哨兵会自动监控主节点状态,当主节点故障时,自动进行故障转移,选举新的主节点。
- 以哨兵模式为例,配置
redis - sentinel.conf
文件:
sentinel monitor mymaster 127.0.0.1 6379 2
- MySQL写入失败处理
- 事务回滚:
- 在进行MySQL批量数据写入时,使用事务包裹所有的写入操作。如果其中任何一条数据写入失败,整个事务回滚,确保MySQL数据的一致性。
- 代码示例(以Java JDBC为例):
Connection conn = DriverManager.getConnection(url, username, password);
try {
conn.setAutoCommit(false);
PreparedStatement pstmt = conn.prepareStatement("INSERT INTO my_table (col1, col2) VALUES (?,?)");
for (DataObject data : dataList) {
pstmt.setString(1, data.getCol1());
pstmt.setString(2, data.getCol2());
pstmt.addBatch();
}
pstmt.executeBatch();
conn.commit();
} catch (SQLException e) {
if (conn!= null) {
try {
conn.rollback();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
e.printStackTrace();
} finally {
if (conn!= null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
- 重试机制:
- 与Redis重试类似,当MySQL写入失败时,启动重试逻辑。同样可以设置重试次数和间隔时间。在重试时,要确保重试的数据不会重复写入。可以为每条数据添加唯一标识(如UUID),在重试前先查询MySQL中是否已存在该数据。
- 代码示例(以Python为例,使用
uuid
模块生成唯一标识):
import uuid
import mysql.connector
retry_count = 0
data_id = uuid.uuid4()
while retry_count < 5:
try:
conn = mysql.connector.connect(user='user', password='password', host='127.0.0.1', database='mydb')
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM my_table WHERE data_id = %s", (str(data_id),))
count = cursor.fetchone()[0]
if count == 0:
cursor.execute("INSERT INTO my_table (data_id, col1) VALUES (%s, %s)", (str(data_id), 'data_value'))
conn.commit()
break
except mysql.connector.Error as e:
sleep_time = 2 ** retry_count
time.sleep(sleep_time)
retry_count += 1
finally:
if conn.is_connected():
cursor.close()
conn.close()
- 数据缓存:
- 当MySQL写入失败时,将失败的数据暂时缓存到本地文件或内存队列(如Python的
Queue
模块)中。在故障排除后,重新读取缓存数据进行写入。对于缓存到本地文件的情况,要注意文件的读写操作的原子性和并发控制。
- 代码示例(以Python的
Queue
模块为例):
from queue import Queue
failed_queue = Queue()
try:
# MySQL写入操作
pass
except mysql.connector.Error as e:
failed_data = {'data': 'failed_value', 'id': 'unique_id'}
failed_queue.put(failed_data)
while not failed_queue.empty():
data = failed_queue.get()
# 重新尝试写入MySQL
- 防止重复写入
- 唯一索引:
- 在MySQL表中为关键列(如业务标识列)添加唯一索引。当尝试插入重复数据时,MySQL会抛出唯一约束冲突错误,应用程序捕获该错误并进行相应处理(如重试或跳过)。
- 创建唯一索引的SQL语句:
CREATE UNIQUE INDEX idx_unique_col ON my_table (unique_col);
- 去重缓存:
- 在应用程序层面,维护一个去重缓存(如Python的
set
集合)。在将数据写入MySQL之前,先检查去重缓存中是否已存在该数据。如果存在,则跳过写入操作;如果不存在,则将数据添加到去重缓存并写入MySQL。
- 代码示例(以Python为例):
duplicate_cache = set()
data = {'id': 1, 'value': 'test'}
if data['id'] not in duplicate_cache:
duplicate_cache.add(data['id'])
# 写入MySQL操作