面试题答案
一键面试关键设计点
- 监控机制:使用Python的
ping
或相关网络库检测数据库实例的网络连接状态,定期检查实例是否存活。 - 故障转移:当主数据库出现故障时,自动切换到从数据库进行备份。可以通过记录主从数据库的状态,使用配置文件指定备用的从库列表。
- 数据一致性:在备份前,使用MySQL的
FLUSH TABLES WITH READ LOCK
语句锁定所有表,确保数据在备份过程中不会发生变化。备份完成后,使用UNLOCK TABLES
解锁。 - 故障恢复:当主库或从库恢复后,重新检测其状态,根据配置重新将其纳入备份体系,并自动同步之前未完成的备份任务。
核心代码框架
import mysql.connector
import time
import os
import subprocess
# 数据库配置
db_configs = {
'primary': {
'host': 'primary_host',
'user': 'user',
'password': 'password',
'database': 'database'
},
'slave1': {
'host':'slave1_host',
'user': 'user',
'password': 'password',
'database': 'database'
},
'slave2': {
'host':'slave2_host',
'user': 'user',
'password': 'password',
'database': 'database'
}
}
def check_db_connection(config):
try:
conn = mysql.connector.connect(**config)
conn.close()
return True
except mysql.connector.Error as err:
print(f"连接数据库错误: {err}")
return False
def backup_db(config, backup_dir):
host = config['host']
user = config['user']
password = config['password']
database = config['database']
backup_file = os.path.join(backup_dir, f"{database}_{int(time.time())}.sql")
command = f"mysqldump -h {host} -u {user} -p{password} {database} > {backup_file}"
try:
subprocess.run(command, shell=True, check=True)
print(f"成功备份到 {backup_file}")
except subprocess.CalledProcessError as err:
print(f"备份失败: {err}")
def main():
backup_dir = 'backup_directory'
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
primary_config = db_configs['primary']
slave_configs = [db_configs['slave1'], db_configs['slave2']]
if check_db_connection(primary_config):
backup_db(primary_config, backup_dir)
else:
for slave_config in slave_configs:
if check_db_connection(slave_config):
backup_db(slave_config, backup_dir)
break
else:
print("所有数据库实例均不可用")
if __name__ == "__main__":
while True:
main()
time.sleep(3600) # 每小时备份一次
数据一致性处理
在backup_db
函数中,可以在执行mysqldump
备份命令前添加锁定表和解锁表的操作:
def backup_db(config, backup_dir):
host = config['host']
user = config['user']
password = config['password']
database = config['database']
backup_file = os.path.join(backup_dir, f"{database}_{int(time.time())}.sql")
conn = mysql.connector.connect(**config)
cursor = conn.cursor()
try:
cursor.execute("FLUSH TABLES WITH READ LOCK")
command = f"mysqldump -h {host} -u {user} -p{password} {database} > {backup_file}"
subprocess.run(command, shell=True, check=True)
cursor.execute("UNLOCK TABLES")
print(f"成功备份到 {backup_file}")
except subprocess.CalledProcessError as err:
print(f"备份失败: {err}")
cursor.execute("UNLOCK TABLES")
finally:
cursor.close()
conn.close()
故障恢复与任务同步
- 记录备份状态:可以使用日志文件或数据库表记录每次备份的时间、状态(成功/失败)、备份的数据库实例等信息。
- 故障检测与恢复:在每次执行备份任务前,重新检测所有数据库实例的状态,若之前不可用的实例恢复,则将其重新纳入备份候选列表。
- 任务同步:根据备份状态记录,判断哪些备份任务需要重新执行,特别是在故障期间未成功备份的任务。可以通过在备份文件名或记录中添加版本号等方式,确保数据的一致性和完整性。例如,在备份文件名中添加时间戳和版本号,在故障恢复后重新备份时更新版本号。