捕获并处理ThreadPoolExecutor任务异常
- 使用
submit
方法和Future
对象:
- 使用
submit
方法提交任务,该方法返回一个Future
对象。
- 可以在主线程中通过
Future.get()
方法获取任务执行结果,在获取结果时捕获ExecutionException
和InterruptedException
异常,ExecutionException
内部包装了任务实际抛出的异常。示例代码如下:
import concurrent.futures
def task():
raise ValueError('This is a test exception')
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task)
try:
result = future.get()
except concurrent.futures.ExecutionException as e:
print(f"捕获到任务异常: {e}")
except concurrent.futures.InterruptedException:
print("线程被中断")
- 重写
ThreadPoolExecutor
的after_execute
方法:
- 继承
ThreadPoolExecutor
类并覆盖after_execute
方法。
- 在
after_execute
方法中捕获任务执行过程中未处理的异常。示例代码如下:
import concurrent.futures
class CustomThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
def after_execute(self, future, exc):
if exc:
print(f"捕获到任务异常: {exc}")
def task():
raise ValueError('This is a test exception')
with CustomThreadPoolExecutor() as executor:
executor.submit(task)
线程池参数优化(任务数量大时)
- 确定合适的最大线程数:
- CPU密集型任务:最大线程数一般设置为CPU核心数。可以通过
multiprocessing.cpu_count()
获取CPU核心数。例如在Python中:
import multiprocessing
max_threads = multiprocessing.cpu_count()
- I/O密集型任务:最大线程数可以设置得比CPU核心数大很多,具体数值需要通过性能测试确定。例如可以先尝试设置为CPU核心数的2 - 4倍,然后根据性能指标(如响应时间、吞吐量)进行调整。
- 调整队列容量:
- 当任务数量大时,使用有界队列可以防止任务无限制堆积,导致内存耗尽。如果任务提交速度快,处理速度慢,可以适当增大队列容量,但也不能过大。例如,对于一些流量较大且处理相对稳定的系统,可以将队列容量设置为几百甚至上千。
- 如果队列容量过小,可能导致任务被拒绝,此时需要合理调整拒绝策略,如使用
ThreadPoolExecutor.CallerRunsPolicy
,该策略会在提交任务的线程中直接执行被拒绝的任务,这样可以保证任务不会丢失,但可能会影响主线程的性能。
- 预热线程池:
- 在任务大量提交之前,先向线程池提交少量任务,让线程池创建一定数量的线程并处于就绪状态,减少任务提交后的线程创建开销。例如在Java中可以通过
executor.prestartAllCoreThreads()
方法(在ThreadPoolExecutor
类中)进行线程预热。