MST

星途 面试题库

面试题:Java ThreadPoolExecutor在复杂业务场景下的深度调优

在一个涉及多个不同优先级任务、任务执行时间差异大且对资源限制严格的复杂业务系统中,如何定制化地调优ThreadPoolExecutor,包括不仅限于自定义拒绝策略、线程工厂,以及对参数的动态调整,以确保系统的高效稳定运行?请详细阐述思路和具体实现方法。
45.2万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

自定义拒绝策略

  1. 思路
    • 当线程池无法处理新任务时(队列已满且线程数达到最大线程数),需要一个合理的拒绝策略。默认的拒绝策略如 AbortPolicy(抛出异常)、CallerRunsPolicy(在调用者线程中执行任务)、DiscardPolicy(丢弃任务)、DiscardOldestPolicy(丢弃队列中最老的任务)可能不满足复杂业务系统的需求。我们可以根据业务逻辑自定义拒绝策略,例如记录日志、将任务放入其他备用队列等。
  2. 具体实现方法
    • 实现 RejectedExecutionHandler 接口。假设我们要将被拒绝的任务记录到日志中,并尝试将其放入一个备用队列。
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    private static final Logger logger = LoggerFactory.getLogger(CustomRejectedExecutionHandler.class);
    private final MyBackupQueue backupQueue;

    public CustomRejectedExecutionHandler(MyBackupQueue backupQueue) {
        this.backupQueue = backupQueue;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        logger.warn("Task {} rejected from thread pool {}", r, executor);
        backupQueue.addTask(r);
    }
}
  • 使用自定义拒绝策略创建 ThreadPoolExecutor
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        MyBackupQueue backupQueue = new MyBackupQueue();
        RejectedExecutionHandler handler = new CustomRejectedExecutionHandler(backupQueue);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                10,
                10,
                TimeUnit.SECONDS,
                workQueue,
                handler
        );
    }
}

自定义线程工厂

  1. 思路
    • 自定义线程工厂可以对线程进行统一的命名、设置优先级、绑定上下文等操作。这有助于在复杂系统中更好地识别和管理线程,特别是在涉及多个不同优先级任务的场景下,可以根据任务优先级设置线程优先级。
  2. 具体实现方法
    • 实现 ThreadFactory 接口。假设我们要根据任务优先级设置线程优先级。
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public CustomThreadFactory(String namePrefix) {
        this.namePrefix = namePrefix;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
        if (t.isDaemon()) {
            t.setDaemon(false);
        }
        if (t.getPriority() != Thread.NORM_PRIORITY) {
            t.setPriority(Thread.NORM_PRIORITY);
        }
        // 假设任务实现了PriorityTask接口,根据任务优先级设置线程优先级
        if (r instanceof PriorityTask) {
            PriorityTask task = (PriorityTask) r;
            t.setPriority(task.getPriority());
        }
        return t;
    }
}
  • 使用自定义线程工厂创建 ThreadPoolExecutor
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        ThreadFactory threadFactory = new CustomThreadFactory("CustomThread");
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                10,
                10,
                TimeUnit.SECONDS,
                workQueue,
                threadFactory
        );
    }
}

参数的动态调整

  1. 思路
    • 在系统运行过程中,任务的负载情况可能会发生变化。通过动态调整 ThreadPoolExecutor 的核心线程数、最大线程数等参数,可以更好地适应不同的业务负载,避免资源浪费或任务堆积。可以通过监控系统指标(如队列长度、任务执行时间等)来触发参数调整。
  2. 具体实现方法
    • 例如,使用定时任务或事件驱动机制来监控队列长度。当队列长度超过一定阈值时,增加最大线程数;当队列长度低于一定阈值且线程数大于核心线程数时,减少线程数。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class Main {
    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 10;
    private static final long KEEP_ALIVE_TIME = 10;
    private static final AtomicInteger queueLength = new AtomicInteger(0);
    private static final AtomicLong lastAdjustTime = new AtomicLong(System.currentTimeMillis());

    public static void main(String[] args) {
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                workQueue
        );

        // 模拟任务提交
        for (int i = 0; i < 20; i++) {
            executor.submit(() -> {
                try {
                    queueLength.decrementAndGet();
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            queueLength.incrementAndGet();
        }

        // 定时监控并调整参数
        new Thread(() -> {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(5);
                    int currentQueueLength = queueLength.get();
                    if (currentQueueLength > QUEUE_CAPACITY * 0.8 && executor.getPoolSize() < MAX_POOL_SIZE) {
                        executor.setMaximumPoolSize(executor.getPoolSize() + 1);
                        lastAdjustTime.set(System.currentTimeMillis());
                    } else if (currentQueueLength < QUEUE_CAPACITY * 0.2 && executor.getPoolSize() > CORE_POOL_SIZE && System.currentTimeMillis() - lastAdjustTime.get() > 60 * 1000) {
                        executor.setMaximumPoolSize(executor.getPoolSize() - 1);
                        lastAdjustTime.set(System.currentTimeMillis());
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}

通过以上对 ThreadPoolExecutor 的自定义拒绝策略、线程工厂以及参数动态调整的实现,可以更好地适应复杂业务系统中多个不同优先级任务、任务执行时间差异大且资源限制严格的情况,确保系统的高效稳定运行。