MST

星途 面试题库

面试题:Java中CompletableFuture性能优化之复杂任务链调度

假设有一系列复杂的异步任务,任务A完成后,根据其结果决定并行执行任务B1、B2或任务C1、C2,并且后续还有依赖这些子任务结果的任务D。请使用CompletableFuture设计一套高效的任务调度方案,同时考虑如何对该方案进行性能优化,例如减少线程上下文切换、合理复用线程资源等。
31.5万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class AsyncTaskScheduler {
    private static final ExecutorService executor = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors(),
        new ThreadFactory() {
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "MyThread-" + threadNumber.getAndIncrement());
                t.setDaemon(true);
                return t;
            }
        }
    );

    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            // 模拟任务A
            System.out.println("Task A is running.");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return true;
        }, executor)
       .thenApplyAsync(result -> {
            // 根据任务A的结果决定执行哪组任务
            if (result) {
                return CompletableFuture.allOf(
                    CompletableFuture.runAsync(() -> {
                        System.out.println("Task B1 is running.");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }, executor),
                    CompletableFuture.runAsync(() -> {
                        System.out.println("Task B2 is running.");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }, executor)
                );
            } else {
                return CompletableFuture.allOf(
                    CompletableFuture.runAsync(() -> {
                        System.out.println("Task C1 is running.");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }, executor),
                    CompletableFuture.runAsync(() -> {
                        System.out.println("Task C2 is running.");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }, executor)
                );
            }
        }, executor)
       .thenApplyAsync(future -> {
            // 等待子任务完成
            try {
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            return "All sub - tasks completed.";
        }, executor)
       .thenAcceptAsync(result -> {
            // 执行任务D
            System.out.println("Task D is running with result: " + result);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, executor)
       .exceptionally(ex -> {
            ex.printStackTrace();
            return null;
        })
       .whenComplete((v, ex) -> {
            executor.shutdown();
        });
    }
}

性能优化

  1. 线程池复用
    • 使用Executors.newFixedThreadPool创建固定大小的线程池,根据系统可用处理器数量设置线程池大小,这样可以避免频繁创建和销毁线程带来的开销。例如:
    private static final ExecutorService executor = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors(),
        new ThreadFactory() {
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "MyThread-" + threadNumber.getAndIncrement());
                t.setDaemon(true);
                return t;
            }
        }
    );
    
    • 使用自定义的ThreadFactory设置线程为守护线程,使得在主线程结束时,这些线程也能自动结束,避免资源泄漏。
  2. 减少上下文切换
    • CompletableFuture的链式调用中,使用thenApplyAsyncthenAcceptAsync等方法,并传入同一个线程池executor,这样可以减少不同线程池之间切换带来的上下文切换开销。例如:

.thenApplyAsync(result -> { // 根据任务A的结果决定执行哪组任务 if (result) { return CompletableFuture.allOf( CompletableFuture.runAsync(() -> { System.out.println("Task B1 is running."); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executor), CompletableFuture.runAsync(() -> { System.out.println("Task B2 is running."); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executor) ); } else { return CompletableFuture.allOf( CompletableFuture.runAsync(() -> { System.out.println("Task C1 is running."); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executor), CompletableFuture.runAsync(() -> { System.out.println("Task C2 is running."); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executor) ); } }, executor)

3. **合理的任务粒度**:
- 在设计任务时,尽量使任务有一定的粒度,避免将任务划分得过于细碎。例如任务A、B1、B2、C1、C2、D的执行时间相对合理,不会因为过于短小的任务导致频繁的线程调度开销。

4. **异常处理**:
- 使用`exceptionally`方法对整个任务链中的异常进行统一处理,避免因未处理的异常导致程序中断。例如:
```java
.exceptionally(ex -> {
    ex.printStackTrace();
    return null;
})
  1. 资源清理
    • 在任务完成后(通过whenComplete方法),关闭线程池,释放资源。例如:

.whenComplete((v, ex) -> { executor.shutdown(); });