使用CompletableFuture实现顺序执行和错误处理
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class AsyncTaskSequence {
// 模拟任务A,返回用户ID
public static CompletableFuture<String> taskA() {
return CompletableFuture.supplyAsync(() -> {
// 模拟实际获取用户ID的操作
if (Math.random() < 0.2) {
throw new RuntimeException("Task A failed");
}
return "user123";
});
}
// 模拟任务B,根据用户ID从数据库获取用户基本信息
public static CompletableFuture<String> taskB(String userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟实际从数据库获取用户基本信息的操作
if (Math.random() < 0.2) {
throw new RuntimeException("Task B failed");
}
return "User basic info for " + userId;
});
}
// 模拟任务C,根据用户基本信息调用远程服务获取用户扩展信息
public static CompletableFuture<String> taskC(String userBasicInfo) {
return CompletableFuture.supplyAsync(() -> {
// 模拟实际调用远程服务获取用户扩展信息的操作
if (Math.random() < 0.2) {
throw new RuntimeException("Task C failed");
}
return "User extended info for " + userBasicInfo;
});
}
public static void main(String[] args) {
CompletableFuture<String> resultFuture = taskA()
.thenComposeAsync(AsyncTaskSequence::taskB)
.thenComposeAsync(AsyncTaskSequence::taskC)
.exceptionally(ex -> {
// 捕获异常,返回友好错误信息
return "An error occurred: " + ex.getMessage();
});
try {
String result = resultFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
性能优化
- 线程池优化:上述代码中使用了
supplyAsync
的无参形式,它默认使用ForkJoinPool.commonPool()
。在复杂项目中,可以创建自定义的ExecutorService
线程池,根据系统资源和任务特性合理设置线程池大小,以避免线程竞争和过度消耗资源。例如:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AsyncTaskSequenceWithExecutor {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
// 模拟任务A,返回用户ID
public static CompletableFuture<String> taskA() {
return CompletableFuture.supplyAsync(() -> {
// 模拟实际获取用户ID的操作
if (Math.random() < 0.2) {
throw new RuntimeException("Task A failed");
}
return "user123";
}, executor);
}
// 模拟任务B,根据用户ID从数据库获取用户基本信息
public static CompletableFuture<String> taskB(String userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟实际从数据库获取用户基本信息的操作
if (Math.random() < 0.2) {
throw new RuntimeException("Task B failed");
}
return "User basic info for " + userId;
}, executor);
}
// 模拟任务C,根据用户基本信息调用远程服务获取用户扩展信息
public static CompletableFuture<String> taskC(String userBasicInfo) {
return CompletableFuture.supplyAsync(() -> {
// 模拟实际调用远程服务获取用户扩展信息的操作
if (Math.random() < 0.2) {
throw new RuntimeException("Task C failed");
}
return "User extended info for " + userBasicInfo;
}, executor);
}
public static void main(String[] args) {
CompletableFuture<String> resultFuture = taskA()
.thenComposeAsync(AsyncTaskSequenceWithExecutor::taskB, executor)
.thenComposeAsync(AsyncTaskSequenceWithExecutor::taskC, executor)
.exceptionally(ex -> {
// 捕获异常,返回友好错误信息
return "An error occurred: " + ex.getMessage();
});
try {
String result = resultFuture.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
- 减少不必要的等待:在任务执行过程中,尽量减少不必要的阻塞和等待。例如,在任务A、B、C内部的实际操作中,如果有可以异步化的子操作,可以进一步使用
CompletableFuture
进行优化。
资源管理
- 关闭线程池:在应用程序结束时,务必关闭自定义的
ExecutorService
线程池,避免资源泄漏。如上述代码中的executor.shutdown()
。
- 数据库连接管理:在任务B中涉及数据库操作,要确保数据库连接的正确获取和释放。可以使用连接池技术(如HikariCP)来管理数据库连接,提高连接的复用性和性能。
- 远程服务资源管理:任务C中调用远程服务,要合理设置连接超时、读取超时等参数,避免长时间占用资源。同时,对于远程服务返回的资源(如流、连接等)要及时关闭。