面试题答案
一键面试自定义拒绝策略
- 思路:
- 当线程池无法处理新任务时(队列已满且线程数达到最大线程数),需要一个合理的拒绝策略。默认的拒绝策略如
AbortPolicy
(抛出异常)、CallerRunsPolicy
(在调用者线程中执行任务)、DiscardPolicy
(丢弃任务)、DiscardOldestPolicy
(丢弃队列中最老的任务)可能不满足复杂业务系统的需求。我们可以根据业务逻辑自定义拒绝策略,例如记录日志、将任务放入其他备用队列等。
- 当线程池无法处理新任务时(队列已满且线程数达到最大线程数),需要一个合理的拒绝策略。默认的拒绝策略如
- 具体实现方法:
- 实现
RejectedExecutionHandler
接口。假设我们要将被拒绝的任务记录到日志中,并尝试将其放入一个备用队列。
- 实现
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
private static final Logger logger = LoggerFactory.getLogger(CustomRejectedExecutionHandler.class);
private final MyBackupQueue backupQueue;
public CustomRejectedExecutionHandler(MyBackupQueue backupQueue) {
this.backupQueue = backupQueue;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.warn("Task {} rejected from thread pool {}", r, executor);
backupQueue.addTask(r);
}
}
- 使用自定义拒绝策略创建
ThreadPoolExecutor
:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
MyBackupQueue backupQueue = new MyBackupQueue();
RejectedExecutionHandler handler = new CustomRejectedExecutionHandler(backupQueue);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
10,
TimeUnit.SECONDS,
workQueue,
handler
);
}
}
自定义线程工厂
- 思路:
- 自定义线程工厂可以对线程进行统一的命名、设置优先级、绑定上下文等操作。这有助于在复杂系统中更好地识别和管理线程,特别是在涉及多个不同优先级任务的场景下,可以根据任务优先级设置线程优先级。
- 具体实现方法:
- 实现
ThreadFactory
接口。假设我们要根据任务优先级设置线程优先级。
- 实现
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public CustomThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
// 假设任务实现了PriorityTask接口,根据任务优先级设置线程优先级
if (r instanceof PriorityTask) {
PriorityTask task = (PriorityTask) r;
t.setPriority(task.getPriority());
}
return t;
}
}
- 使用自定义线程工厂创建
ThreadPoolExecutor
:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
ThreadFactory threadFactory = new CustomThreadFactory("CustomThread");
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
10,
10,
TimeUnit.SECONDS,
workQueue,
threadFactory
);
}
}
参数的动态调整
- 思路:
- 在系统运行过程中,任务的负载情况可能会发生变化。通过动态调整
ThreadPoolExecutor
的核心线程数、最大线程数等参数,可以更好地适应不同的业务负载,避免资源浪费或任务堆积。可以通过监控系统指标(如队列长度、任务执行时间等)来触发参数调整。
- 在系统运行过程中,任务的负载情况可能会发生变化。通过动态调整
- 具体实现方法:
- 例如,使用定时任务或事件驱动机制来监控队列长度。当队列长度超过一定阈值时,增加最大线程数;当队列长度低于一定阈值且线程数大于核心线程数时,减少线程数。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class Main {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 10;
private static final long KEEP_ALIVE_TIME = 10;
private static final AtomicInteger queueLength = new AtomicInteger(0);
private static final AtomicLong lastAdjustTime = new AtomicLong(System.currentTimeMillis());
public static void main(String[] args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
workQueue
);
// 模拟任务提交
for (int i = 0; i < 20; i++) {
executor.submit(() -> {
try {
queueLength.decrementAndGet();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
queueLength.incrementAndGet();
}
// 定时监控并调整参数
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(5);
int currentQueueLength = queueLength.get();
if (currentQueueLength > QUEUE_CAPACITY * 0.8 && executor.getPoolSize() < MAX_POOL_SIZE) {
executor.setMaximumPoolSize(executor.getPoolSize() + 1);
lastAdjustTime.set(System.currentTimeMillis());
} else if (currentQueueLength < QUEUE_CAPACITY * 0.2 && executor.getPoolSize() > CORE_POOL_SIZE && System.currentTimeMillis() - lastAdjustTime.get() > 60 * 1000) {
executor.setMaximumPoolSize(executor.getPoolSize() - 1);
lastAdjustTime.set(System.currentTimeMillis());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
通过以上对 ThreadPoolExecutor
的自定义拒绝策略、线程工厂以及参数动态调整的实现,可以更好地适应复杂业务系统中多个不同优先级任务、任务执行时间差异大且资源限制严格的情况,确保系统的高效稳定运行。