MST
星途 面试题库

面试题:Java CompletableFuture thenCompose在高并发与性能优化场景的应用

假设你正在开发一个高并发的数据分析系统,有一组异步任务,每个任务都会返回一个包含部分数据处理结果的CompletableFuture。这些结果需要按照特定顺序(例如根据任务ID排序)使用thenCompose方法进行进一步处理和汇总,形成最终的完整数据报告。请设计一个高效的解决方案,既要保证顺序处理,又要尽可能利用多核CPU的优势提升性能,同时要考虑任务执行过程中可能出现的超时情况以及如何优雅地处理超时任务,详细描述你的设计思路并给出核心代码片段。
43.1万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 任务排序:使用CompletableFuturethenApply方法,在每个任务完成后,将任务结果和任务ID封装成一个对象,这样可以方便后续按照任务ID排序。
  2. 顺序处理:通过thenCompose方法,按照排序后的顺序依次处理每个任务的结果,实现顺序处理。
  3. 多核CPU利用:使用ForkJoinPool来并行执行异步任务,充分利用多核CPU的优势。
  4. 超时处理:使用CompletableFutureorTimeout方法来设置任务的超时时间,在超时发生时可以优雅地处理,例如记录日志、返回默认结果等。

核心代码片段

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class DataAnalysisSystem {
    private static final ExecutorService executor = new ForkJoinPool();

    public static CompletableFuture<String> processTasks(List<CompletableFuture<String>> tasks) {
        // 将任务结果和任务ID封装成对象
        List<CompletableFuture<TaskResult>> futureList = IntStream.range(0, tasks.size())
               .mapToObj(i -> tasks.get(i).thenApply(result -> new TaskResult(i, result)))
               .collect(Collectors.toList());

        // 等待所有任务完成,并按照任务ID排序
        CompletableFuture<List<TaskResult>> allTasksFuture = CompletableFuture.allOf(
                futureList.toArray(new CompletableFuture[0])
        ).thenApply(v -> futureList.stream()
               .map(CompletableFuture::join)
               .sorted((r1, r2) -> Integer.compare(r1.taskId, r2.taskId))
               .collect(Collectors.toList()));

        // 顺序处理每个任务的结果
        return allTasksFuture.thenCompose(results -> {
            CompletableFuture<String> finalResultFuture = CompletableFuture.completedFuture("");
            for (TaskResult result : results) {
                finalResultFuture = finalResultFuture.thenApply(partial -> partial + result.data);
            }
            return finalResultFuture;
        }).orTimeout(10, TimeUnit.SECONDS); // 设置10秒超时
    }

    private static class TaskResult {
        int taskId;
        String data;

        TaskResult(int taskId, String data) {
            this.taskId = taskId;
            this.data = data;
        }
    }

    public static void main(String[] args) {
        List<CompletableFuture<String>> tasks = new ArrayList<>();
        tasks.add(CompletableFuture.supplyAsync(() -> "result1", executor));
        tasks.add(CompletableFuture.supplyAsync(() -> "result2", executor));
        tasks.add(CompletableFuture.supplyAsync(() -> "result3", executor));

        processTasks(tasks).whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("Final result: " + result);
            } else {
                System.out.println("Task timed out or failed: " + ex.getMessage());
            }
        });

        executor.shutdown();
    }
}

在上述代码中:

  • processTasks方法接受一组CompletableFuture<String>类型的任务。
  • 首先将每个任务的结果和任务ID封装成TaskResult对象,并使用CompletableFuture.allOf等待所有任务完成,然后按照任务ID排序。
  • 接着通过thenCompose方法顺序处理每个任务的结果,形成最终的完整数据报告。
  • 最后使用orTimeout方法设置任务的超时时间为10秒,并在whenComplete回调中处理任务完成或超时的情况。
  • main方法演示了如何创建任务并调用processTasks方法。