MST
星途 面试题库

面试题:Java CompletableFuture在高并发且任务依赖复杂场景下的性能优化与设计

在一个高并发系统中,存在大量异步任务,且任务之间存在复杂的依赖关系(例如任务A的结果作为任务B和任务C的输入,任务B和C完成后又作为任务D的输入等)。请阐述如何基于CompletableFuture进行性能优化设计,包括线程池的选择与配置、任务调度策略、如何避免死锁与资源耗尽等问题,并提供一个简化的代码框架来展示设计思路。
49.2万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

1. 线程池的选择与配置

  • 选择合适的线程池:对于高并发系统中的异步任务,可使用 ThreadPoolExecutor 自定义线程池。例如,FixedThreadPool 适用于已知并发任务数量且相对稳定的场景;CachedThreadPool 适用于任务数量波动较大的场景,但可能会创建过多线程导致资源耗尽。在这种复杂依赖关系的高并发场景下,建议使用 ThreadPoolExecutor 来精确控制线程数量。
  • 配置线程池参数
    • 核心线程数:根据系统的 CPU 核心数、I/O 操作类型和任务复杂度来设置。如果任务以 CPU 计算为主,核心线程数可设置为 CPU 核心数;如果任务包含大量 I/O 操作,核心线程数可适当大于 CPU 核心数。例如,可通过 Runtime.getRuntime().availableProcessors() 获取 CPU 核心数,然后根据实际情况调整。
    • 最大线程数:设置合理的上限,避免过多线程耗尽系统资源。这需要根据系统的硬件资源(如内存)和任务特性来确定。
    • 队列容量:根据任务的平均执行时间和到达速率来设置。如果任务执行时间短且到达速率高,可设置较大的队列容量;反之,则设置较小的队列容量。

2. 任务调度策略

  • 按依赖关系调度:利用 CompletableFuturethenApplythenCompose 等方法,按照任务的依赖关系进行调度。例如,任务 A 完成后触发任务 B 和任务 C,可使用 CompletableFuture A = CompletableFuture.supplyAsync(() -> { /* A 的任务逻辑 */ }, executor); CompletableFuture B = A.thenApply(resultOfA -> { /* B 的任务逻辑 */ }); CompletableFuture C = A.thenApply(resultOfA -> { /* C 的任务逻辑 */ });
  • 并行执行无依赖任务:对于没有依赖关系的任务,可并行执行以提高性能。例如,任务 E 和任务 F 无依赖关系,可分别创建 CompletableFuture 并使用线程池并行执行。

3. 避免死锁与资源耗尽

  • 避免死锁
    • 打破循环依赖:检查任务依赖关系,确保不存在循环依赖。例如,任务 A 依赖任务 B,任务 B 依赖任务 C,任务 C 又依赖任务 A,这种情况需避免。
    • 合理的锁顺序:如果任务中涉及锁操作,确保所有任务获取锁的顺序一致,避免出现交叉获取锁导致死锁。
  • 避免资源耗尽
    • 合理配置线程池:如上述线程池配置部分所述,通过合理设置核心线程数、最大线程数和队列容量,防止过多线程创建和任务堆积导致内存溢出等资源耗尽问题。
    • 监控与预警:定期监控系统资源使用情况(如 CPU、内存、线程数等),设置合理的阈值,当资源使用接近阈值时发出预警,以便及时调整系统参数或优化任务。

简化的代码框架示例

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class CompletableFutureOptimization {

    private static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();
    private static final int MAX_POOL_SIZE = CORE_POOL_SIZE * 2;
    private static final int QUEUE_CAPACITY = 1000;
    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            10L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(QUEUE_CAPACITY),
            new ThreadFactory() {
                private final AtomicInteger threadNumber = new AtomicInteger(1);
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "MyThread-" + threadNumber.getAndIncrement());
                    thread.setDaemon(false);
                    return thread;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy());

    public static void main(String[] args) {
        // 任务 A
        CompletableFuture<Integer> taskA = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task A is running");
            return 10;
        }, executor);

        // 任务 B 依赖任务 A 的结果
        CompletableFuture<Integer> taskB = taskA.thenApply(resultOfA -> {
            System.out.println("Task B is running with result of A: " + resultOfA);
            return resultOfA * 2;
        });

        // 任务 C 依赖任务 A 的结果
        CompletableFuture<Integer> taskC = taskA.thenApply(resultOfA -> {
            System.out.println("Task C is running with result of A: " + resultOfA);
            return resultOfA * 3;
        });

        // 任务 D 依赖任务 B 和任务 C 的结果
        CompletableFuture<Void> taskD = CompletableFuture.allOf(taskB, taskC).thenRun(() -> {
            int resultOfB = taskB.join();
            int resultOfC = taskC.join();
            System.out.println("Task D is running with result of B: " + resultOfB + " and result of C: " + resultOfC);
        });

        taskD.join();
        executor.shutdown();
    }
}

上述代码框架展示了如何使用 CompletableFuture 处理有依赖关系的异步任务,并通过自定义线程池来管理任务执行。其中,ThreadPoolExecutor 用于创建线程池,通过 supplyAsync 方法执行任务 A,然后利用 thenApplyallOf 等方法按照依赖关系调度后续任务。最后,通过 join 方法等待任务执行完成,并关闭线程池。