高并发场景下Java Executors任务执行流程性能优化
- 选择合适的线程池类型:
- FixedThreadPool:适用于已知并发任务数量且相对稳定的场景,可控制线程数量,避免过多线程竞争资源。但如果任务执行时间较长,可能会导致队列积压。
- CachedThreadPool:适合任务执行时间短、并发数波动较大的场景。线程池会根据需求动态创建和回收线程,但如果并发量过大,可能创建过多线程耗尽系统资源。
- ScheduledThreadPool:用于执行定时任务和周期性任务。
- SingleThreadExecutor:适用于需要顺序执行任务,且同一时间只有一个任务执行的场景。
- 优化线程池参数:
- 核心线程数(corePoolSize):根据系统的CPU核心数、I/O特性和任务类型来设置。对于CPU密集型任务,核心线程数可设置为CPU核心数 + 1;对于I/O密集型任务,可适当增大核心线程数,如2 * CPU核心数。
- 最大线程数(maximumPoolSize):考虑系统资源限制,避免设置过大导致系统资源耗尽。一般可根据应用场景和硬件资源估算,比如结合任务队列的大小和任务执行时间。
- 队列容量(workQueue):选择合适的队列类型和容量。无界队列(如LinkedBlockingQueue)可能导致OOM,有界队列(如ArrayBlockingQueue)可避免此问题,但需合理设置容量,避免任务拒绝。
- 存活时间(keepAliveTime):对于CachedThreadPool等动态创建线程的池,合理设置存活时间,以平衡线程创建和销毁的开销。
- 任务调度优化:
- 优先队列:如果任务有优先级之分,可使用PriorityBlockingQueue作为任务队列,自定义任务的优先级比较逻辑,让高优先级任务优先执行。
- 预取策略:在线程池执行任务时,可采用预取策略,提前从任务队列中获取任务,减少线程等待时间。
定制化线程池
- 线程创建:
- 自定义ThreadFactory:实现ThreadFactory接口,可自定义线程的名称、优先级、是否为守护线程等属性。例如:
public class CustomThreadFactory implements ThreadFactory {
private final String namePrefix;
private final int priority;
private final boolean daemon;
public CustomThreadFactory(String namePrefix, int priority, boolean daemon) {
this.namePrefix = namePrefix;
this.priority = priority;
this.daemon = daemon;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + "-" + Thread.currentThread().getName());
t.setPriority(priority);
t.setDaemon(daemon);
return t;
}
}
- 任务调度:
- 自定义RejectedExecutionHandler:当任务队列已满且线程池达到最大线程数时,会触发拒绝策略。可实现RejectedExecutionHandler接口来自定义拒绝处理逻辑,如记录日志、将任务保存到外部存储等。
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定义拒绝处理逻辑,例如记录日志
System.out.println("Task " + r + " rejected from " + executor);
}
}
- 资源管理:
- 监控线程池状态:通过ThreadPoolExecutor的方法如getActiveCount()、getQueue().size()、getCompletedTaskCount()等监控线程池的运行状态,以便及时调整参数。
- 动态调整线程池参数:在运行时根据系统负载、任务队列长度等动态调整核心线程数、最大线程数等参数。可使用反射或通过自定义的管理接口实现。
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
workQueue, new CustomThreadFactory("custom-pool", Thread.NORM_PRIORITY, false),
new CustomRejectedExecutionHandler());
// 动态调整核心线程数
executor.setCorePoolSize(newCorePoolSize);