自定义线程池实现
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPool extends ThreadPoolExecutor {
private static final int DEFAULT_CORE_POOL_SIZE = 5;
private static final int DEFAULT_MAX_POOL_SIZE = 10;
private static final long DEFAULT_KEEP_ALIVE_TIME = 10L;
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public CustomThreadPool() {
this(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME, DEFAULT_TIME_UNIT,
new PriorityBlockingQueue<>());
}
public CustomThreadPool(int corePoolSize, int maximumPoolSize) {
this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE_TIME, DEFAULT_TIME_UNIT,
new PriorityBlockingQueue<>());
}
// 动态调整任务队列
public void adjustQueueSize(int newSize) {
BlockingQueue<Runnable> newQueue = new PriorityBlockingQueue<>(newSize);
BlockingQueue<Runnable> oldQueue = getQueue();
newQueue.addAll(oldQueue);
setQueue(newQueue);
}
// 线程复用由ThreadPoolExecutor自动实现,worker线程执行完任务后,会从任务队列中获取新任务继续执行
// 线程优先级控制,通过自定义的PriorityRunnable和PriorityBlockingQueue实现
public static class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
private final Runnable runnable;
private final int priority;
public PriorityRunnable(Runnable runnable, int priority) {
this.runnable = runnable;
this.priority = priority;
}
@Override
public void run() {
runnable.run();
}
@Override
public int compareTo(PriorityRunnable other) {
return Integer.compare(this.priority, other.priority);
}
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (r instanceof PriorityRunnable) {
int priority = ((PriorityRunnable) r).priority;
t.setPriority(priority);
}
}
}
性能优化措施
- 任务队列选择:使用
PriorityBlockingQueue
作为任务队列,以支持线程优先级控制。这种队列能够根据任务的优先级自动排序,保证高优先级任务优先执行。
- 动态调整任务队列:提供
adjustQueueSize
方法,根据系统负载动态调整任务队列的大小。在高负载时,可以适当增大队列大小以容纳更多任务,减少任务被拒绝的可能性。
- 线程复用:
ThreadPoolExecutor
本身就支持线程复用,worker线程执行完一个任务后,会尝试从任务队列中获取新的任务继续执行,避免了频繁创建和销毁线程带来的开销。
- 合理设置线程池参数:根据系统的CPU核心数、任务类型(CPU密集型或I/O密集型)等因素,合理设置核心线程数、最大线程数和线程存活时间。对于CPU密集型任务,核心线程数可设置为CPU核心数;对于I/O密集型任务,核心线程数可适当增大。
- 线程优先级控制:通过自定义
PriorityRunnable
类和在beforeExecute
方法中设置线程优先级,确保高优先级任务优先执行,提高系统整体响应速度。
- 拒绝策略优化:可以根据实际需求选择合适的拒绝策略,如
CallerRunsPolicy
(让调用者线程执行任务),避免任务丢失,同时也可以自定义拒绝策略,以更好地适应高负载情况。