可能导致性能瓶颈的原因
- 线程资源竞争:高并发环境下,大量的
CompletableFuture
任务创建和执行会导致线程资源竞争激烈,线程频繁切换上下文,消耗大量 CPU 时间。
- 任务队列积压:任务提交到线程池后,如果线程池的处理能力有限,任务队列可能会积压大量任务,导致任务处理延迟增加。
- I/O 操作阻塞:如果
DataA
和 DataB
的获取涉及 I/O 操作(如网络请求、磁盘读取等),I/O 操作的阻塞会导致线程长时间等待,降低系统整体性能。
优化方案
- 合理配置线程池:根据系统的硬件资源和任务类型,合理配置线程池的核心线程数、最大线程数、队列容量等参数,避免线程资源竞争和任务队列积压。
- 使用异步 I/O:如果任务涉及 I/O 操作,使用异步 I/O 方式(如
java.nio
包下的异步 I/O 类),避免线程阻塞。
- 减少不必要的任务创建:尽量复用已有的任务结果,避免重复创建和执行相同的任务。
代码示例
优化前
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
class DataA {}
class DataB {}
class DataC {}
public class CompletableFutureExample {
public static DataA fetchDataA() {
// 模拟获取 DataA 的操作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new DataA();
}
public static DataB fetchDataB() {
// 模拟获取 DataB 的操作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new DataB();
}
public static DataC combineData(DataA a, DataB b) {
// 模拟合并 DataA 和 DataB 为 DataC 的操作
return new DataC();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<DataA> futureA = CompletableFuture.supplyAsync(() -> fetchDataA());
CompletableFuture<DataB> futureB = CompletableFuture.supplyAsync(() -> fetchDataB());
CompletableFuture<DataC> futureC = futureA.thenCombine(futureB, (a, b) -> combineData(a, b));
DataC result = futureC.get();
}
}
优化方案一:合理配置线程池
import java.util.concurrent.*;
class DataA {}
class DataB {}
class DataC {}
public class CompletableFutureOptimized1 {
private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
public static DataA fetchDataA() {
// 模拟获取 DataA 的操作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new DataA();
}
public static DataB fetchDataB() {
// 模拟获取 DataB 的操作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new DataB();
}
public static DataC combineData(DataA a, DataB b) {
// 模拟合并 DataA 和 DataB 为 DataC 的操作
return new DataC();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<DataA> futureA = CompletableFuture.supplyAsync(() -> fetchDataA(), executor);
CompletableFuture<DataB> futureB = CompletableFuture.supplyAsync(() -> fetchDataB(), executor);
CompletableFuture<DataC> futureC = futureA.thenCombine(futureB, (a, b) -> combineData(a, b));
DataC result = futureC.get();
executor.shutdown();
}
}
优化方案二:使用异步 I/O(假设获取数据是网络请求,这里简单模拟)
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
class DataA {}
class DataB {}
class DataC {}
public class CompletableFutureOptimized2 {
private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
public static CompletableFuture<DataA> fetchDataAAsync() {
CompletableFuture<DataA> future = new CompletableFuture<>();
executor.submit(() -> {
try {
// 模拟异步网络请求获取 DataA
Thread.sleep(100);
future.complete(new DataA());
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
});
return future;
}
public static CompletableFuture<DataB> fetchDataBAsync() {
CompletableFuture<DataB> future = new CompletableFuture<>();
executor.submit(() -> {
try {
// 模拟异步网络请求获取 DataB
Thread.sleep(100);
future.complete(new DataB());
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
});
return future;
}
public static DataC combineData(DataA a, DataB b) {
// 模拟合并 DataA 和 DataB 为 DataC 的操作
return new DataC();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<DataA> futureA = fetchDataAAsync();
CompletableFuture<DataB> futureB = fetchDataBAsync();
CompletableFuture<DataC> futureC = futureA.thenCombine(futureB, (a, b) -> combineData(a, b));
DataC result = futureC.get();
executor.shutdown();
}
}