整体架构思路
- 业务模块划分:明确系统中的各个业务模块,每个模块有其独特的业务逻辑和资源需求。
- 线程池定制:为每个业务模块创建独立的线程池。在创建线程池时,使用
ThreadPoolExecutor
类,因为它允许我们自定义线程池的核心线程数、最大线程数、存活时间等参数,并且可以设置定制化的BlockingQueue
。
- 定制化拒绝策略:实现
RejectedExecutionHandler
接口来创建定制化的拒绝策略。根据业务需求,例如记录日志、尝试重新提交任务、直接抛弃任务等不同策略。
- 动态调整队列容量:通过在
ArrayBlockingQueue
外部包装一个类,提供方法来动态修改ArrayBlockingQueue
的容量。可以通过反射机制或者重新创建一个ArrayBlockingQueue
并替换原有队列的方式来实现。
关键代码示例
- 定制化拒绝策略
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志
System.out.println("Task " + r.toString() + " rejected from " + executor.toString());
// 这里可以添加更多业务逻辑,如尝试重新提交任务等
}
}
- 线程池创建及配置
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BusinessModuleThreadPool {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final long KEEP_ALIVE_TIME = 10;
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
private static final int INITIAL_CAPACITY = 20;
private ThreadPoolExecutor executor;
private BlockingQueue<Runnable> queue;
public BusinessModuleThreadPool() {
queue = new ArrayBlockingQueue<>(INITIAL_CAPACITY);
executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TIME_UNIT,
queue,
new CustomRejectedExecutionHandler());
}
public void submitTask(Runnable task) {
executor.submit(task);
}
// 动态调整队列容量方法
public void adjustQueueCapacity(int newCapacity) {
BlockingQueue<Runnable> newQueue = new ArrayBlockingQueue<>(newCapacity);
executor.setQueue(newQueue);
this.queue = newQueue;
}
}
- 使用示例
public class Main {
public static void main(String[] args) {
BusinessModuleThreadPool threadPool = new BusinessModuleThreadPool();
for (int i = 0; i < 30; i++) {
int taskNumber = i;
threadPool.submitTask(() -> {
System.out.println("Task " + taskNumber + " is running.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 动态调整队列容量
threadPool.adjustQueueCapacity(30);
}
}