设计思路
- 使用CompletableFuture管理异步任务:利用CompletableFuture来表示异步任务,每个任务负责从数据库读取一组用户数据并返回特定格式的数据片段。
- thenCombine方法实现数据组合:thenCombine方法可以将两个CompletableFuture的结果进行合并。通过链式调用thenCombine,可以将多个异步任务的结果逐步组合成完整的数据结构。
- 性能优化:使用线程池来管理异步任务,避免创建过多线程导致资源耗尽。同时,尽量减少不必要的同步操作,充分利用并行计算的优势。
- 资源管理:合理设置线程池的大小,根据系统资源(如CPU核心数、内存等)来调整。避免线程池过大导致内存溢出,或过小导致任务处理缓慢。
- 异常处理:在CompletableFuture的各个阶段,通过exceptionally方法捕获并处理异常,确保即使某个任务失败,整体流程也能有恰当的处理,不会导致整个组合过程崩溃。
- 应对死锁:避免在任务中出现循环依赖,确保任务之间的依赖关系是无环的。同时,对于共享资源的访问,采用合理的锁策略(如读写锁),尽量减少锁的持有时间,降低死锁风险。
Java代码示例
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
class DataFragment {
// 数据片段类,根据业务需求定义
private String data;
public DataFragment(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
class ComplexTreeNode {
// 复杂树形结构节点类,根据业务需求定义
private String value;
private ComplexTreeNode left;
private ComplexTreeNode right;
public ComplexTreeNode(String value) {
this.value = value;
}
public void setLeft(ComplexTreeNode left) {
this.left = left;
}
public void setRight(ComplexTreeNode right) {
this.right = right;
}
// 其他树形结构操作方法...
}
public class AsyncTaskCombination {
private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// 模拟从数据库读取数据的异步任务
private static CompletableFuture<DataFragment> readDataFromDatabase(int userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟数据库读取操作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new DataFragment("Data for user " + userId);
}, executor);
}
// 组合数据片段成树形结构
private static CompletableFuture<ComplexTreeNode> combineDataFragments(CompletableFuture<DataFragment>... futures) {
AtomicInteger count = new AtomicInteger(futures.length);
CompletableFuture<ComplexTreeNode> rootFuture = new CompletableFuture<>();
if (futures.length == 0) {
rootFuture.complete(null);
return rootFuture;
} else if (futures.length == 1) {
futures[0].thenApply(dataFragment -> {
ComplexTreeNode root = new ComplexTreeNode(dataFragment.getData());
rootFuture.complete(root);
return root;
}).exceptionally(ex -> {
rootFuture.completeExceptionally(ex);
return null;
});
return rootFuture;
}
CompletableFuture<ComplexTreeNode>[] subTrees = new CompletableFuture[futures.length];
for (int i = 0; i < futures.length; i++) {
int index = i;
futures[i].thenApply(dataFragment -> {
ComplexTreeNode node = new ComplexTreeNode(dataFragment.getData());
subTrees[index] = CompletableFuture.completedFuture(node);
if (count.decrementAndGet() == 0) {
// 所有任务完成,开始组合树形结构
CompletableFuture<ComplexTreeNode> combined = subTrees[0];
for (int j = 1; j < subTrees.length; j++) {
combined = combined.thenCombine(subTrees[j], (left, right) -> {
ComplexTreeNode root = new ComplexTreeNode("Combined");
root.setLeft(left);
root.setRight(right);
return root;
});
}
combined.thenAccept(rootFuture::complete).exceptionally(ex -> {
rootFuture.completeExceptionally(ex);
return null;
});
}
return node;
}).exceptionally(ex -> {
rootFuture.completeExceptionally(ex);
return null;
});
}
return rootFuture;
}
public static void main(String[] args) {
// 假设有一组用户ID
int[] userIds = {1, 2, 3, 4};
CompletableFuture<DataFragment>[] dataFutures = new CompletableFuture[userIds.length];
for (int i = 0; i < userIds.length; i++) {
dataFutures[i] = readDataFromDatabase(userIds[i]);
}
CompletableFuture<ComplexTreeNode> combinedTreeFuture = combineDataFragments(dataFutures);
combinedTreeFuture.thenAccept(tree -> {
// 处理组合好的树形结构
System.out.println("Combined tree: " + tree);
}).exceptionally(ex -> {
System.err.println("An error occurred: " + ex.getMessage());
return null;
});
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
说明
- 线程池管理:通过
ExecutorService
创建固定大小的线程池,线程池大小根据CPU核心数动态调整,以充分利用系统资源。
- 异步任务创建:
readDataFromDatabase
方法模拟从数据库读取数据的异步任务,使用CompletableFuture.supplyAsync
方法提交任务到线程池执行。
- 数据组合:
combineDataFragments
方法使用thenCombine
方法将多个DataFragment
组合成ComplexTreeNode
树形结构。
- 异常处理:在
thenApply
和thenCombine
的链式调用中,通过exceptionally
方法捕获并处理异常,确保任务失败时能有合适的处理。
- 资源清理:在
main
方法结束时,关闭线程池,确保资源正确释放。