MST

星途 面试题库

面试题:Java CompletableFuture thenCombine在高并发场景下的优化

在高并发环境下,有大量成对的CompletableFuture任务,每对任务分别返回不同类型的数据(假设为DataA和DataB),需要使用thenCombine方法将它们合并为DataC类型的数据。随着并发量的增加,系统出现性能瓶颈。请分析可能导致性能瓶颈的原因,并提出至少两种优化方案,且通过代码示例说明如何利用CompletableFuture的特性及相关并发工具进行优化。
37.8万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

可能导致性能瓶颈的原因

  1. 线程资源竞争:高并发环境下,大量的 CompletableFuture 任务创建和执行会导致线程资源竞争激烈,线程频繁切换上下文,消耗大量 CPU 时间。
  2. 任务队列积压:任务提交到线程池后,如果线程池的处理能力有限,任务队列可能会积压大量任务,导致任务处理延迟增加。
  3. I/O 操作阻塞:如果 DataADataB 的获取涉及 I/O 操作(如网络请求、磁盘读取等),I/O 操作的阻塞会导致线程长时间等待,降低系统整体性能。

优化方案

  1. 合理配置线程池:根据系统的硬件资源和任务类型,合理配置线程池的核心线程数、最大线程数、队列容量等参数,避免线程资源竞争和任务队列积压。
  2. 使用异步 I/O:如果任务涉及 I/O 操作,使用异步 I/O 方式(如 java.nio 包下的异步 I/O 类),避免线程阻塞。
  3. 减少不必要的任务创建:尽量复用已有的任务结果,避免重复创建和执行相同的任务。

代码示例

优化前

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

class DataA {}
class DataB {}
class DataC {}

public class CompletableFutureExample {
    public static DataA fetchDataA() {
        // 模拟获取 DataA 的操作
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new DataA();
    }

    public static DataB fetchDataB() {
        // 模拟获取 DataB 的操作
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new DataB();
    }

    public static DataC combineData(DataA a, DataB b) {
        // 模拟合并 DataA 和 DataB 为 DataC 的操作
        return new DataC();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<DataA> futureA = CompletableFuture.supplyAsync(() -> fetchDataA());
        CompletableFuture<DataB> futureB = CompletableFuture.supplyAsync(() -> fetchDataB());

        CompletableFuture<DataC> futureC = futureA.thenCombine(futureB, (a, b) -> combineData(a, b));

        DataC result = futureC.get();
    }
}

优化方案一:合理配置线程池

import java.util.concurrent.*;

class DataA {}
class DataB {}
class DataC {}

public class CompletableFutureOptimized1 {
    private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

    public static DataA fetchDataA() {
        // 模拟获取 DataA 的操作
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new DataA();
    }

    public static DataB fetchDataB() {
        // 模拟获取 DataB 的操作
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new DataB();
    }

    public static DataC combineData(DataA a, DataB b) {
        // 模拟合并 DataA 和 DataB 为 DataC 的操作
        return new DataC();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<DataA> futureA = CompletableFuture.supplyAsync(() -> fetchDataA(), executor);
        CompletableFuture<DataB> futureB = CompletableFuture.supplyAsync(() -> fetchDataB(), executor);

        CompletableFuture<DataC> futureC = futureA.thenCombine(futureB, (a, b) -> combineData(a, b));

        DataC result = futureC.get();
        executor.shutdown();
    }
}

优化方案二:使用异步 I/O(假设获取数据是网络请求,这里简单模拟)

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

class DataA {}
class DataB {}
class DataC {}

public class CompletableFutureOptimized2 {
    private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

    public static CompletableFuture<DataA> fetchDataAAsync() {
        CompletableFuture<DataA> future = new CompletableFuture<>();
        executor.submit(() -> {
            try {
                // 模拟异步网络请求获取 DataA
                Thread.sleep(100);
                future.complete(new DataA());
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        });
        return future;
    }

    public static CompletableFuture<DataB> fetchDataBAsync() {
        CompletableFuture<DataB> future = new CompletableFuture<>();
        executor.submit(() -> {
            try {
                // 模拟异步网络请求获取 DataB
                Thread.sleep(100);
                future.complete(new DataB());
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        });
        return future;
    }

    public static DataC combineData(DataA a, DataB b) {
        // 模拟合并 DataA 和 DataB 为 DataC 的操作
        return new DataC();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<DataA> futureA = fetchDataAAsync();
        CompletableFuture<DataB> futureB = fetchDataBAsync();

        CompletableFuture<DataC> futureC = futureA.thenCombine(futureB, (a, b) -> combineData(a, b));

        DataC result = futureC.get();
        executor.shutdown();
    }
}