import java.util.concurrent.*;
public class CompletableFutureExample {
private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
public static String getDataFromDB() {
// 模拟从数据库读取数据
return "原始数据";
}
public static String processData(String data) {
// 模拟数据处理
return data + " 已处理";
}
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> getDataFromDB(), executorService)
.thenApplyAsync(CompletableFutureExample::processData, executorService)
.thenAccept(System.out::println)
.exceptionally(e -> {
e.printStackTrace();
return null;
});
executorService.shutdown();
}
}
- 首先创建一个线程池
executorService
,这里使用 Executors.newFixedThreadPool(10)
创建一个固定大小为10的线程池。
CompletableFuture.supplyAsync(() -> getDataFromDB(), executorService)
:使用 supplyAsync
方法异步执行 getDataFromDB
方法,并指定使用创建的线程池 executorService
。该方法返回一个 CompletableFuture
,其泛型类型与 getDataFromDB
方法的返回类型相同(这里是 String
)。
.thenApplyAsync(CompletableFutureExample::processData, executorService)
:对上一步返回的 CompletableFuture
使用 thenApplyAsync
方法,异步执行 processData
方法处理从数据库获取的数据,同样使用线程池 executorService
。thenApplyAsync
方法返回一个新的 CompletableFuture
,其泛型类型与 processData
方法的返回类型相同(这里也是 String
)。
.thenAccept(System.out::println)
:对处理后的数据进行消费,这里简单地将结果打印到控制台。
.exceptionally(e -> {... })
:捕获在异步任务执行过程中可能抛出的异常,并进行处理(这里只是打印异常堆栈信息)。
- 最后调用
executorService.shutdown()
关闭线程池。