设计思路
- 使用上下文管理器管理文件资源:Python 的
with
语句会自动处理文件的打开和关闭,确保在操作完成后文件资源被正确释放,避免资源泄漏。例如 with open('file.txt', 'r') as f:
这样的语句块结束时,文件 f
会自动关闭。
- 处理并发访问冲突:
- 锁机制:使用
threading.Lock
(对于多线程场景)或 multiprocessing.Lock
(对于多进程场景)来保证同一时间只有一个线程或进程可以访问文件。在进入文件操作前获取锁,操作完成后释放锁。
- 队列:可以使用
queue.Queue
来缓冲文件操作请求,将文件操作任务放入队列,由一个或多个专门的线程/进程从队列中取出任务并按顺序执行,从而避免并发冲突。
- 性能监控和调优:
- 使用
cProfile
模块:cProfile
可以帮助分析代码性能,找出耗时较长的函数和操作。例如 import cProfile; cProfile.run('your_function()')
,根据分析结果优化瓶颈代码。
- 调整并发数:通过实验不同的线程数或进程数,找到系统在当前硬件条件下的最佳并发度,避免过多的上下文切换带来的性能损耗。
关键部分代码示例
多线程场景
import threading
import queue
import time
class FileProcessor:
def __init__(self, num_threads):
self.lock = threading.Lock()
self.task_queue = queue.Queue()
self.threads = []
for _ in range(num_threads):
t = threading.Thread(target=self.process_task)
t.start()
self.threads.append(t)
def add_task(self, file_path, operation):
self.task_queue.put((file_path, operation))
def process_task(self):
while True:
try:
file_path, operation = self.task_queue.get(timeout=1)
with self.lock:
if operation == 'read':
with open(file_path, 'r') as f:
content = f.read()
print(f"Read {file_path}: {content}")
elif operation == 'write':
with open(file_path, 'w') as f:
f.write('Some content')
print(f"Wrote to {file_path}")
self.task_queue.task_done()
except queue.Empty:
break
# 使用示例
processor = FileProcessor(5)
for i in range(10):
processor.add_task(f'file_{i}.txt','read')
processor.add_task(f'file_{i + 10}.txt', 'write')
processor.task_queue.join()
for t in processor.threads:
t.join()
多进程场景
import multiprocessing
import queue
import time
class FileProcessor:
def __init__(self, num_processes):
self.lock = multiprocessing.Lock()
self.task_queue = multiprocessing.JoinableQueue()
self.processes = []
for _ in range(num_processes):
p = multiprocessing.Process(target=self.process_task)
p.start()
self.processes.append(p)
def add_task(self, file_path, operation):
self.task_queue.put((file_path, operation))
def process_task(self):
while True:
try:
file_path, operation = self.task_queue.get(timeout=1)
with self.lock:
if operation == 'read':
with open(file_path, 'r') as f:
content = f.read()
print(f"Read {file_path}: {content}")
elif operation == 'write':
with open(file_path, 'w') as f:
f.write('Some content')
print(f"Wrote to {file_path}")
self.task_queue.task_done()
except queue.Empty:
break
# 使用示例
processor = FileProcessor(5)
for i in range(10):
processor.add_task(f'file_{i}.txt','read')
processor.add_task(f'file_{i + 10}.txt', 'write')
processor.task_queue.join()
for p in processor.processes:
p.join()