优化策略
- 线程/进程池大小设置:
- 对于I/O密集型任务,通常使用线程池。线程池大小可以根据CPU核心数和I/O等待时间估算。一般经验法则是设置为CPU核心数的数倍(例如4 - 8倍),假设CPU核心数为
num_cpus
,线程池大小pool_size = num_cpus * 4
。可以通过multiprocessing.cpu_count()
获取CPU核心数。
- 资源管理:
- 尽量复用与外部服务的连接。例如,如果是HTTP请求,可以使用
requests.Session
对象,它会自动管理连接池,减少连接建立和关闭的开销。
- 对于其他资源(如数据库连接等),同样采用连接池等技术进行复用。
- 异常处理:
- 在任务函数中捕获可能的异常,如网络异常(
requests
库中的RequestException
)、外部服务返回错误等。使用try - except
块捕获异常,并记录详细的错误信息,以便调试。
- 在提交任务到线程池后,使用
as_completed
或result
方法获取任务结果时,也需要处理可能的异常,如concurrent.futures.TimeoutError
(任务超时)、concurrent.futures.CancelledError
(任务被取消)等。
- 避免死锁或资源竞争:
- 确保共享资源(如连接池、全局变量等)的访问是线程安全的。对于共享资源的操作,可以使用
threading.Lock
或queue.Queue
等线程安全的数据结构。
- 避免循环依赖资源,在设计任务逻辑时,要仔细规划资源的获取和释放顺序。
代码框架示例
import concurrent.futures
import requests
import multiprocessing
def process_task(task_data, session):
try:
# 与外部服务进行多次交互
response1 = session.get('http://example.com/api1', params=task_data)
response1.raise_for_status()
data1 = response1.json()
response2 = session.post('http://example.com/api2', json=data1)
response2.raise_for_status()
data2 = response2.json()
# 处理响应数据
result = data2['key']
return result
except requests.RequestException as e:
# 记录网络相关异常
print(f"Network error in task: {e}")
except KeyError as e:
# 记录数据解析异常
print(f"Data parsing error in task: {e}")
def main():
num_cpus = multiprocessing.cpu_count()
pool_size = num_cpus * 4
tasks = [{"param1": "value1"}, {"param2": "value2"}] # 假设任务数据列表
with requests.Session() as session:
with concurrent.futures.ThreadPoolExecutor(max_workers=pool_size) as executor:
futures = [executor.submit(process_task, task_data, session) for task_data in tasks]
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(f"Task result: {result}")
except concurrent.futures.TimeoutError:
print("Task timed out")
except concurrent.futures.CancelledError:
print("Task was cancelled")
if __name__ == '__main__':
main()