MST

星途 面试题库

面试题:Java 中 CompletableFuture 在复杂任务链场景下的效率优化

现在有一个复杂的业务场景,需要依次执行多个异步任务,这些任务存在依赖关系,比如任务 B 需要任务 A 的结果作为输入,任务 C 需要任务 B 的结果等。而且每个任务执行时间较长且可能会抛出异常。请用 CompletableFuture 实现该任务链,并说明如何处理异常以及通过哪些策略提升整体执行效率,如 Fork/Join 框架的结合使用等。
36.9万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

1. 使用 CompletableFuture 实现任务链

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AsyncTaskChain {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> {
            // 模拟任务 A,执行一些耗时操作
            System.out.println("Task A is running...");
            if (Math.random() < 0.2) {
                throw new RuntimeException("Task A failed");
            }
            return "Result of Task A";
        })
       .thenApply(resultA -> {
            // 任务 B,依赖任务 A 的结果
            System.out.println("Task B is running with input: " + resultA);
            if (Math.random() < 0.2) {
                throw new RuntimeException("Task B failed");
            }
            return "Result of Task B based on " + resultA;
        })
       .thenApply(resultB -> {
            // 任务 C,依赖任务 B 的结果
            System.out.println("Task C is running with input: " + resultB);
            if (Math.random() < 0.2) {
                throw new RuntimeException("Task C failed");
            }
            return "Final result of Task C based on " + resultB;
        })
       .exceptionally(ex -> {
            // 统一处理异常
            System.out.println("An exception occurred: " + ex.getMessage());
            return "Default result due to exception";
        })
       .thenAccept(System.out::println);


        try {
            // 防止主线程退出
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2. 异常处理

  • exceptionally 方法:如上述代码中,通过 exceptionally 方法捕获任务链中任意一个任务抛出的异常。在 exceptionally 的回调函数中,可以对异常进行处理,比如记录日志、返回默认值等。
  • handle 方法handle 方法既可以处理正常结果,也可以处理异常情况。它接收一个 BiFunction,第一个参数是正常结果(如果没有异常),第二个参数是异常(如果有异常)。例如:
CompletableFuture.supplyAsync(() -> {
    // 任务逻辑
    if (Math.random() < 0.2) {
        throw new RuntimeException("Task failed");
    }
    return "Normal result";
})
.handle((result, ex) -> {
    if (ex != null) {
        System.out.println("Exception: " + ex.getMessage());
        return "Default value";
    }
    return result;
})
.thenAccept(System.out::println);

3. 提升整体执行效率策略

  • Fork/Join 框架结合使用
    • 原理:Fork/Join 框架是一种分治算法的实现,它将一个大任务分割成多个小任务并行执行,然后将小任务的结果合并起来得到最终结果。对于我们的任务链,如果其中某些任务可以进一步细分,就可以使用 Fork/Join 框架。
    • 实现方式:例如,假设任务 A 非常耗时且可以分割成多个子任务。我们可以创建一个 RecursiveTask(Fork/Join 框架中的任务类型)来表示任务 A 的子任务。
import java.util.concurrent.*;

class TaskAForkJoin extends RecursiveTask<String> {
    private static final int THRESHOLD = 10;
    private int start;
    private int end;

    public TaskAForkJoin(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected String compute() {
        if (end - start <= THRESHOLD) {
            // 执行子任务逻辑
            System.out.println("Sub - task A from " + start + " to " + end + " is running...");
            return "Sub - result of A from " + start + " to " + end;
        } else {
            int mid = (start + end) / 2;
            TaskAForkJoin leftTask = new TaskAForkJoin(start, mid);
            TaskAForkJoin rightTask = new TaskAForkJoin(mid + 1, end);

            leftTask.fork();
            String rightResult = rightTask.compute();
            String leftResult = leftTask.join();

            return leftResult + " and " + rightResult;
        }
    }
}

然后在任务链中使用:

CompletableFuture.supplyAsync(() -> {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    return forkJoinPool.invoke(new TaskAForkJoin(0, 100));
})
// 后续任务链不变
  • 线程池优化CompletableFuture 默认使用 ForkJoinPool.commonPool() 作为线程池。如果任务的类型和数量有特殊需求,可以创建自定义的线程池来执行任务。例如:
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
    // 任务 A 逻辑
    return "Result of Task A";
}, executor)
// 后续任务链操作
  • 异步任务并行化:如果任务链中的某些任务之间没有依赖关系,可以将它们并行执行。例如,假设任务 B 和任务 D 没有依赖关系,可以使用 CompletableFuture.allOf 来并行执行它们,然后等待所有任务完成后再继续后续任务。
CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {
    // 任务 B 逻辑
    return "Result of Task B";
});
CompletableFuture<String> taskD = CompletableFuture.supplyAsync(() -> {
    // 任务 D 逻辑
    return "Result of Task D";
});

CompletableFuture<Void> allTasks = CompletableFuture.allOf(taskB, taskD);
allTasks.thenRun(() -> {
    try {
        String resultB = taskB.get();
        String resultD = taskD.get();
        // 基于 resultB 和 resultD 继续后续任务逻辑
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
});