ThreadPoolExecutor线程复用实现原理
- 核心流程
ThreadPoolExecutor
维护了一个线程池,其中的线程可以被复用。当任务提交到线程池时,首先会判断核心线程数是否已满。如果核心线程数未达到设定值(corePoolSize
),则创建新的线程来执行任务。
- 若核心线程已满,任务会被放入阻塞队列(
workQueue
)。如果阻塞队列未满,任务在队列中等待,已有线程从队列中取出任务执行,从而实现线程复用。
- 当阻塞队列已满,且当前线程数小于最大线程数(
maximumPoolSize
)时,会创建新的非核心线程来执行任务。
- 关键源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker
方法用于创建新的Worker
线程,Worker
类实现了Runnable
接口,其run
方法如下:
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask
方法从阻塞队列中获取任务,使得线程不断从队列中取任务执行,实现线程复用:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
高并发场景下可能遇到的问题
- 队列溢出:在高并发场景下,如果任务提交速度过快,而线程处理速度跟不上,阻塞队列可能会被填满,导致后续任务无法入队,触发拒绝策略。
- 线程过多:如果任务持续大量涌入,且阻塞队列已满,线程池可能会创建过多的线程(达到
maximumPoolSize
),这会消耗大量系统资源,如内存和CPU,甚至可能导致系统崩溃。
- 线程饥饿:当核心线程一直处于忙碌状态,新提交的任务只能在队列中等待,可能导致某些任务长时间得不到执行,出现线程饥饿现象。
优化方法
- 合理调整线程池参数
- 核心线程数:根据任务的性质和系统资源,合理设置
corePoolSize
。对于CPU密集型任务,核心线程数可设置为CPU核心数;对于I/O密集型任务,核心线程数可适当增大,例如CPU核心数的2倍左右。
- 最大线程数:根据系统能够承受的最大线程数来设置
maximumPoolSize
,避免创建过多线程导致系统资源耗尽。
- 队列容量:根据任务的预计流量和处理速度,设置合适的阻塞队列容量。如果任务流量波动较大,可以考虑使用无界队列(如
LinkedBlockingQueue
),但要注意可能导致内存耗尽的风险;如果流量相对稳定,有界队列(如ArrayBlockingQueue
)能更好地控制资源。
- 优化任务处理逻辑
- 减少任务执行时间:对任务的业务逻辑进行优化,例如减少不必要的I/O操作、优化算法等,提高任务处理效率,从而降低线程池的负载。
- 异步处理:将一些非关键的任务异步化处理,避免这些任务阻塞核心业务流程,减少任务对线程池的占用时间。
- 采用更灵活的拒绝策略
- 除了默认的拒绝策略(如
AbortPolicy
、CallerRunsPolicy
、DiscardPolicy
、DiscardOldestPolicy
),可以根据业务需求自定义拒绝策略,例如将任务写入缓存或消息队列,后续再进行处理。