MST
星途 面试题库

面试题:Java中ThreadPoolExecutor的线程复用优化及原理

ThreadPoolExecutor是如何实现线程复用的?这种线程复用机制在高并发场景下可能会遇到哪些问题,又该如何优化?请从源码层面深入分析。
45.5万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

ThreadPoolExecutor线程复用实现原理

  1. 核心流程
    • ThreadPoolExecutor维护了一个线程池,其中的线程可以被复用。当任务提交到线程池时,首先会判断核心线程数是否已满。如果核心线程数未达到设定值(corePoolSize),则创建新的线程来执行任务。
    • 若核心线程已满,任务会被放入阻塞队列(workQueue)。如果阻塞队列未满,任务在队列中等待,已有线程从队列中取出任务执行,从而实现线程复用。
    • 当阻塞队列已满,且当前线程数小于最大线程数(maximumPoolSize)时,会创建新的非核心线程来执行任务。
  2. 关键源码
    • execute方法中,有如下关键逻辑:
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
  • addWorker方法用于创建新的Worker线程,Worker类实现了Runnable接口,其run方法如下:
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
  • getTask方法从阻塞队列中获取任务,使得线程不断从队列中取任务执行,实现线程复用:
private Runnable getTask() {
    boolean timedOut = false; 
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

高并发场景下可能遇到的问题

  1. 队列溢出:在高并发场景下,如果任务提交速度过快,而线程处理速度跟不上,阻塞队列可能会被填满,导致后续任务无法入队,触发拒绝策略。
  2. 线程过多:如果任务持续大量涌入,且阻塞队列已满,线程池可能会创建过多的线程(达到maximumPoolSize),这会消耗大量系统资源,如内存和CPU,甚至可能导致系统崩溃。
  3. 线程饥饿:当核心线程一直处于忙碌状态,新提交的任务只能在队列中等待,可能导致某些任务长时间得不到执行,出现线程饥饿现象。

优化方法

  1. 合理调整线程池参数
    • 核心线程数:根据任务的性质和系统资源,合理设置corePoolSize。对于CPU密集型任务,核心线程数可设置为CPU核心数;对于I/O密集型任务,核心线程数可适当增大,例如CPU核心数的2倍左右。
    • 最大线程数:根据系统能够承受的最大线程数来设置maximumPoolSize,避免创建过多线程导致系统资源耗尽。
    • 队列容量:根据任务的预计流量和处理速度,设置合适的阻塞队列容量。如果任务流量波动较大,可以考虑使用无界队列(如LinkedBlockingQueue),但要注意可能导致内存耗尽的风险;如果流量相对稳定,有界队列(如ArrayBlockingQueue)能更好地控制资源。
  2. 优化任务处理逻辑
    • 减少任务执行时间:对任务的业务逻辑进行优化,例如减少不必要的I/O操作、优化算法等,提高任务处理效率,从而降低线程池的负载。
    • 异步处理:将一些非关键的任务异步化处理,避免这些任务阻塞核心业务流程,减少任务对线程池的占用时间。
  3. 采用更灵活的拒绝策略
    • 除了默认的拒绝策略(如AbortPolicyCallerRunsPolicyDiscardPolicyDiscardOldestPolicy),可以根据业务需求自定义拒绝策略,例如将任务写入缓存或消息队列,后续再进行处理。