方案设计思路
- 线程池配置:创建一个大小合适的线程池来执行异步任务。线程池大小需根据系统硬件资源(如 CPU 核心数、内存等)进行合理配置。例如,如果主要是 CPU 密集型任务,线程池大小可以设置为 CPU 核心数;如果是 I/O 密集型任务,线程池大小可以适当增大。
- CompletableFuture 处理异步任务:使用 CompletableFuture 来管理每个用户数据处理的异步子任务。将每个子任务封装成 CompletableFuture,利用其提供的方法(如 thenCombine、thenApply 等)来处理任务之间的依赖关系,并实现并行执行。
- 流处理整合:使用流处理(如 Java 8 的 Stream API)来遍历百万级别的用户数据,将每个用户数据的处理过程并行化。流处理提供了简洁的方式来处理集合数据,并且可以方便地与 CompletableFuture 结合使用。
关键代码示例
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class HighConcurrencyDataProcessor {
private static final int THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
// 模拟子任务
private CompletableFuture<String> subTask1(String input) {
return CompletableFuture.supplyAsync(() -> {
// 模拟子任务处理逻辑
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return input + " processed by subTask1";
}, executorService);
}
private CompletableFuture<String> subTask2(String input) {
return CompletableFuture.supplyAsync(() -> {
// 模拟子任务处理逻辑
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return input + " processed by subTask2";
}, executorService);
}
// 处理单个用户数据
private CompletableFuture<String> processUserData(String userData) {
CompletableFuture<String> task1 = subTask1(userData);
CompletableFuture<String> task2 = subTask2(userData);
return task1.thenCombine(task2, (result1, result2) -> result1 + " and " + result2);
}
// 处理百万级用户数据
public List<String> processUserList(List<String> userList) {
try {
return userList.parallelStream()
.map(this::processUserData)
.map(CompletableFuture::join)
.collect(Collectors.toList());
} finally {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
HighConcurrencyDataProcessor processor = new HighConcurrencyDataProcessor();
List<String> userList = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
userList.add("User" + i);
}
List<String> processedList = processor.processUserList(userList);
System.out.println("Processed user data size: " + processedList.size());
}
}
性能优化理论依据
- 线程池合理配置:避免线程过多导致的上下文切换开销。对于 CPU 密集型任务,线程数过多会导致 CPU 在不同线程间频繁切换,降低整体性能;对于 I/O 密集型任务,适当增加线程数可以充分利用等待 I/O 的时间,提高系统的吞吐量。
- CompletableFuture 并行执行:利用 CompletableFuture 的特性,使得多个子任务可以并行执行,减少整体任务的执行时间。通过 thenCombine 等方法,可以在不阻塞主线程的情况下,处理任务之间的依赖关系,提高程序的并发性能。
- 流处理并行化:Java 8 的 Stream API 的并行流可以自动将数据分块并利用多个线程进行处理,提高遍历百万级数据的效率。并行流会根据系统的 CPU 核心数等因素自动调整任务的并行度,合理利用系统资源。
避免死锁的措施
- 资源获取顺序一致:在处理子任务时,确保所有任务获取资源的顺序是一致的。如果存在多个任务需要获取多个资源,按照固定顺序获取可以避免死锁。
- 设置超时:在使用锁或其他资源时,设置合理的超时时间。如果在规定时间内无法获取到资源,任务可以放弃并尝试其他方式,避免无限期等待导致死锁。在上述代码中,线程池的关闭操作设置了超时,防止线程池在关闭过程中出现死锁。