线程池参数调优
- 核心线程数:根据每个节点的计算资源,如CPU核心数、内存大小来确定。例如,如果一个节点有4个CPU核心,核心线程数可以设置为4,这样可以充分利用CPU资源。在代码中,可以这样创建线程池:
int corePoolSize = Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(corePoolSize);
- 最大线程数:考虑到节点负载的波动,最大线程数可以设置为核心线程数的一定倍数,如2倍。这是为了应对突发的任务高峰。
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = corePoolSize * 2;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>());
- 队列容量:根据任务的平均大小和节点的负载情况设置。如果任务较小且负载相对稳定,可以设置较小的队列容量;若任务大小不确定且负载波动大,应设置较大的队列容量。
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = corePoolSize * 2;
int queueCapacity = 100;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(queueCapacity));
- 存活时间:设置非核心线程在闲置多久后被回收,例如1分钟。这有助于在负载降低时释放多余的线程资源。
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = corePoolSize * 2;
int queueCapacity = 100;
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(queueCapacity));
资源隔离
- 线程池隔离:为不同类型的任务创建不同的线程池。比如,将I/O密集型任务和CPU密集型任务分别放在不同的线程池中执行。
// CPU密集型任务线程池
int cpuCorePoolSize = Runtime.getRuntime().availableProcessors();
ExecutorService cpuExecutorService = Executors.newFixedThreadPool(cpuCorePoolSize);
// I/O密集型任务线程池
int ioCorePoolSize = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService ioExecutorService = Executors.newFixedThreadPool(ioCorePoolSize);
- 任务优先级隔离:可以使用PriorityBlockingQueue结合自定义的任务优先级来实现。任务实现Comparable接口,在线程池中按照优先级顺序执行。
class PriorityTask implements Comparable<PriorityTask> {
private int priority;
// 其他任务相关属性和方法
@Override
public int compareTo(PriorityTask o) {
return Integer.compare(this.priority, o.priority);
}
}
PriorityBlockingQueue<PriorityTask> priorityQueue = new PriorityBlockingQueue<>();
ThreadPoolExecutor priorityExecutor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, 1, TimeUnit.MINUTES, priorityQueue);
容错机制
- 异常处理:在CompletableFuture中使用
exceptionally
方法来处理异步任务中的异常。
CompletableFuture.supplyAsync(() -> {
// 可能抛出异常的任务
if (Math.random() > 0.5) {
throw new RuntimeException("模拟异常");
}
return "任务成功结果";
}, executorService)
.exceptionally(ex -> {
System.out.println("捕获到异常: " + ex.getMessage());
return "默认值";
})
.thenAccept(System.out::println);
- 重试机制:结合
CompletableFuture
和循环实现重试逻辑。例如,当任务失败时,最多重试3次。
int maxRetries = 3;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 可能失败的任务
if (Math.random() > 0.5) {
throw new RuntimeException("模拟异常");
}
return "任务成功结果";
}, executorService);
for (int i = 0; i < maxRetries; i++) {
future = future.exceptionally(ex -> {
System.out.println("第" + (i + 1) + "次重试, 捕获到异常: " + ex.getMessage());
return CompletableFuture.supplyAsync(() -> {
// 重试的任务
if (Math.random() > 0.5) {
throw new RuntimeException("模拟异常");
}
return "任务成功结果";
}, executorService);
}).thenCompose(x -> x);
}
future.thenAccept(System.out::println);
与分布式系统其他组件的协同
- 与消息队列协同:将任务发送到消息队列,由不同节点的消费者从消息队列中获取任务并执行。可以使用Kafka、RabbitMQ等消息队列。例如,使用Kafka:
Properties props = new Properties();
props.put("bootstrap.servers", "your_kafka_servers");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String taskMessage = "任务描述";
producer.send(new ProducerRecord<>("task_topic", taskMessage));
producer.close();
- 消费者从队列获取任务并使用CompletableFuture执行:
Properties props = new Properties();
props.put("bootstrap.servers", "your_kafka_servers");
props.put("group.id", "task_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("task_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
CompletableFuture.runAsync(() -> {
// 执行任务逻辑
System.out.println("执行任务: " + record.value());
}, executorService);
}
}
- 与分布式缓存协同:利用分布式缓存(如Redis)存储任务的中间结果或共享数据,避免重复计算。例如,在任务执行前先检查缓存中是否有结果:
Jedis jedis = new Jedis("your_redis_server");
CompletableFuture.supplyAsync(() -> {
String cacheKey = "task_result_key";
String cachedResult = jedis.get(cacheKey);
if (cachedResult != null) {
return cachedResult;
}
// 执行任务
String result = "任务执行结果";
jedis.set(cacheKey, result);
return result;
}, executorService)
.thenAccept(System.out::println);
jedis.close();