1. 合理配置线程池或进程池大小
- I/O密集型任务:对于I/O密集型任务,由于任务大部分时间处于等待I/O操作完成的状态,线程上下文切换开销相对较小,因此可以使用线程池,并且线程池大小可以设置得相对较大。例如,可以根据经验将线程池大小设置为CPU核心数的 5 - 10 倍。在Python中使用
concurrent.futures.ThreadPoolExecutor
创建线程池,如下代码示例:
import concurrent.futures
# 设置线程池大小为CPU核心数的8倍
import multiprocessing
num_threads = multiprocessing.cpu_count() * 8
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
# 提交任务
future = executor.submit(io_bound_task, args)
- CPU密集型任务:CPU密集型任务需要大量的CPU计算资源,过多的进程或线程会导致频繁的上下文切换,反而降低性能。因此,进程池大小一般设置为CPU核心数。使用
concurrent.futures.ProcessPoolExecutor
创建进程池,示例如下:
import concurrent.futures
# 设置进程池大小为CPU核心数
num_processes = multiprocessing.cpu_count()
with concurrent.futures.ProcessPoolExecutor(max_workers=num_processes) as executor:
# 提交任务
future = executor.submit(cpu_bound_task, args)
- 混合任务:对于混合任务,可以将任务分类,分别提交到线程池和进程池处理。例如,将I/O密集型任务提交到线程池,CPU密集型任务提交到进程池。
2. 处理任务间的依赖关系
- 使用
Future
对象:concurrent.futures
模块中的Future
对象可以用于处理任务间的依赖关系。当一个任务的执行依赖于另一个任务的结果时,可以通过获取前一个任务的Future
对象的结果来启动后续任务。例如:
import concurrent.futures
def task1():
return 10
def task2(result1):
return result1 * 2
with concurrent.futures.ThreadPoolExecutor() as executor:
future1 = executor.submit(task1)
future2 = executor.submit(task2, future1.result())
print(future2.result())
- 使用
as_completed
函数:如果有多个任务,并且部分任务之间有依赖关系,可以使用concurrent.futures.as_completed
函数按任务完成顺序获取结果,并根据结果启动依赖的任务。示例如下:
import concurrent.futures
def task_a():
return "A result"
def task_b(result_a):
return "B result with " + result_a
def task_c(result_a):
return "C result with " + result_a
tasks = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_a = executor.submit(task_a)
tasks.append(future_a)
future_b = executor.submit(task_b, future_a.result())
tasks.append(future_b)
future_c = executor.submit(task_c, future_a.result())
tasks.append(future_c)
for future in concurrent.futures.as_completed(tasks):
print(future.result())
3. 避免资源耗尽
- 监控资源使用:在实际项目中,可以使用系统监控工具(如
psutil
库)实时监控内存、CPU等资源的使用情况。在任务执行过程中,定期检查资源使用状态,当资源使用接近限制时,暂停提交新任务,直到资源使用下降。示例代码如下:
import psutil
import time
while True:
memory = psutil.virtual_memory()
cpu_percent = psutil.cpu_percent()
if memory.percent > 80 or cpu_percent > 80:
# 资源紧张,暂停提交任务
time.sleep(5)
else:
# 提交新任务
pass
- 设置任务队列:为了避免一次性提交过多任务导致资源耗尽,可以设置任务队列。将任务先放入队列中,然后根据资源使用情况从队列中取出任务提交到线程池或进程池。例如,使用
queue.Queue
实现简单的任务队列:
import queue
import concurrent.futures
import time
task_queue = queue.Queue()
# 将任务放入队列
for i in range(100):
task_queue.put(i)
with concurrent.futures.ThreadPoolExecutor() as executor:
while not task_queue.empty():
task = task_queue.get()
future = executor.submit(process_task, task)
# 检查资源使用情况,如资源紧张则暂停
if psutil.virtual_memory().percent > 80 or psutil.cpu_percent() > 80:
time.sleep(5)
- 及时释放资源:在任务完成后,及时释放任务占用的资源,如关闭文件句柄、数据库连接等。确保每个任务在结束时不会遗留未释放的资源,以免随着任务的不断执行导致资源耗尽。例如,在使用文件操作的任务中,使用
with
语句确保文件在使用后自动关闭:
def file_io_task():
with open('example.txt', 'r') as f:
data = f.read()
# 处理数据
return processed_data