MST
星途 面试题库

面试题:Java CompletableFuture thenRun 方法与系统资源优化

在一个高并发的 Java 应用程序中,大量使用 CompletableFuture 的 thenRun 方法来处理异步任务。随着任务数量的增加,系统资源(如内存、线程)的消耗也在上升。请阐述你会如何通过优化 thenRun 方法的使用,结合线程池配置、任务队列管理等手段,来有效控制资源消耗,同时保证系统的响应性能和吞吐量。要求详细说明优化思路,并给出关键代码片段示例。
39.9万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

优化思路

  1. 自定义线程池:创建一个合适大小的线程池,避免使用默认的 ForkJoinPool.commonPool(),因为它可能不适合高并发应用的特定需求。根据系统的 CPU 核心数、内存大小以及任务特性来调整线程池的参数,如核心线程数、最大线程数、队列容量等。
  2. 合理配置任务队列:选择合适的任务队列类型,如 ArrayBlockingQueue、LinkedBlockingQueue 等。根据任务的优先级、预计任务数量等因素来设置队列容量,防止队列无限增长导致内存耗尽。
  3. 复用 CompletableFuture:避免在每次 thenRun 调用时创建新的 CompletableFuture,尽可能复用已有的实例,减少对象创建开销。
  4. 任务拆分与合并:对于复杂任务,考虑拆分成多个小任务并行处理,处理完成后再合并结果,提高整体吞吐量。

关键代码片段示例

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class CompletableFutureOptimization {

    // 自定义线程池
    private static final ExecutorService executorService = new ThreadPoolExecutor(
            10, // 核心线程数
            50, // 最大线程数
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100), // 任务队列
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
    );

    public static void main(String[] args) {
        // 模拟大量异步任务
        IntStream.range(0, 1000)
               .mapToObj(i -> CompletableFuture.supplyAsync(() -> {
                    // 模拟异步计算
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    return i;
                }, executorService)
                       .thenRun(() -> {
                            // 任务完成后的处理
                            System.out.println("Task " + i + " completed.");
                        })
                )
               .collect(Collectors.toList())
               .forEach(CompletableFuture::join);

        // 优雅关闭线程池
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Pool did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

在上述代码中:

  1. 创建了一个自定义的 ThreadPoolExecutor,设置了核心线程数、最大线程数、存活时间、任务队列以及拒绝策略。
  2. 使用 CompletableFuture.supplyAsync 方法将异步任务提交到自定义线程池执行,然后通过 thenRun 方法处理任务完成后的逻辑。
  3. 最后通过 executorService.shutdown()executorService.awaitTermination() 方法优雅关闭线程池,确保所有任务执行完毕。