面试题答案
一键面试设计思路
- 任务排序:使用
CompletableFuture
的thenApply
方法,在每个任务完成后,将任务结果和任务ID封装成一个对象,这样可以方便后续按照任务ID排序。 - 顺序处理:通过
thenCompose
方法,按照排序后的顺序依次处理每个任务的结果,实现顺序处理。 - 多核CPU利用:使用
ForkJoinPool
来并行执行异步任务,充分利用多核CPU的优势。 - 超时处理:使用
CompletableFuture
的orTimeout
方法来设置任务的超时时间,在超时发生时可以优雅地处理,例如记录日志、返回默认结果等。
核心代码片段
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DataAnalysisSystem {
private static final ExecutorService executor = new ForkJoinPool();
public static CompletableFuture<String> processTasks(List<CompletableFuture<String>> tasks) {
// 将任务结果和任务ID封装成对象
List<CompletableFuture<TaskResult>> futureList = IntStream.range(0, tasks.size())
.mapToObj(i -> tasks.get(i).thenApply(result -> new TaskResult(i, result)))
.collect(Collectors.toList());
// 等待所有任务完成,并按照任务ID排序
CompletableFuture<List<TaskResult>> allTasksFuture = CompletableFuture.allOf(
futureList.toArray(new CompletableFuture[0])
).thenApply(v -> futureList.stream()
.map(CompletableFuture::join)
.sorted((r1, r2) -> Integer.compare(r1.taskId, r2.taskId))
.collect(Collectors.toList()));
// 顺序处理每个任务的结果
return allTasksFuture.thenCompose(results -> {
CompletableFuture<String> finalResultFuture = CompletableFuture.completedFuture("");
for (TaskResult result : results) {
finalResultFuture = finalResultFuture.thenApply(partial -> partial + result.data);
}
return finalResultFuture;
}).orTimeout(10, TimeUnit.SECONDS); // 设置10秒超时
}
private static class TaskResult {
int taskId;
String data;
TaskResult(int taskId, String data) {
this.taskId = taskId;
this.data = data;
}
}
public static void main(String[] args) {
List<CompletableFuture<String>> tasks = new ArrayList<>();
tasks.add(CompletableFuture.supplyAsync(() -> "result1", executor));
tasks.add(CompletableFuture.supplyAsync(() -> "result2", executor));
tasks.add(CompletableFuture.supplyAsync(() -> "result3", executor));
processTasks(tasks).whenComplete((result, ex) -> {
if (ex == null) {
System.out.println("Final result: " + result);
} else {
System.out.println("Task timed out or failed: " + ex.getMessage());
}
});
executor.shutdown();
}
}
在上述代码中:
processTasks
方法接受一组CompletableFuture<String>
类型的任务。- 首先将每个任务的结果和任务ID封装成
TaskResult
对象,并使用CompletableFuture.allOf
等待所有任务完成,然后按照任务ID排序。 - 接着通过
thenCompose
方法顺序处理每个任务的结果,形成最终的完整数据报告。 - 最后使用
orTimeout
方法设置任务的超时时间为10秒,并在whenComplete
回调中处理任务完成或超时的情况。 main
方法演示了如何创建任务并调用processTasks
方法。