可能导致问题的原因
- 线程资源耗尽:CompletableFuture默认使用ForkJoinPool.commonPool(),该线程池的线程数量有限(通常为CPU核心数 - 1)。在高并发场景下,如果任务量远超线程池容量,新任务将被阻塞等待线程资源,可能导致任务堆积,最终耗尽线程资源。
- 任务调度不合理:大量CompletableFuture任务可能会出现任务优先级不合理的情况,导致重要任务得不到及时执行,造成性能瓶颈。
- 资源回收不及时:CompletableFuture任务执行过程中可能会占用各种资源(如数据库连接、网络连接等),如果任务执行完毕后这些资源没有及时释放,随着任务数量增加,资源会被耗尽。
避免问题的策略和方法
- 合理配置线程池:
- 创建自定义线程池,根据系统资源和任务特性设置合适的线程数量。例如,对于CPU密集型任务,线程数可设置为CPU核心数;对于I/O密集型任务,线程数可适当增加。
- 示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CompletableFutureExample {
private static final ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
Future<String> future = CompletableFuture.supplyAsync(() -> "Hello, CompletableFuture!", executor);
try {
System.out.println(future.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
- 任务调度优化:
- 使用优先级队列或自定义调度算法来安排任务执行顺序。例如,对于一些对响应时间敏感的任务,可以设置较高的优先级。
- 示例代码(简单模拟优先级调度):
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
class PriorityTask implements Comparable<PriorityTask> {
private final int priority;
private final Runnable task;
private static final AtomicInteger counter = new AtomicInteger();
public PriorityTask(int priority, Runnable task) {
this.priority = priority;
this.task = task;
}
@Override
public int compareTo(PriorityTask other) {
return Integer.compare(this.priority, other.priority);
}
public void run() {
task.run();
}
}
public class PriorityTaskScheduler {
private final PriorityBlockingQueue<PriorityTask> taskQueue = new PriorityBlockingQueue<>();
private final Thread workerThread;
public PriorityTaskScheduler() {
workerThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
PriorityTask task = taskQueue.take();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
workerThread.start();
}
public void submitTask(int priority, Runnable task) {
taskQueue.add(new PriorityTask(priority, task));
}
public void shutdown() {
workerThread.interrupt();
try {
workerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- 资源回收机制:
- 使用
try - finally
块确保任务执行完毕后及时释放资源。例如,在使用数据库连接时:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;
public class DatabaseResourceExample {
public static CompletableFuture<String> queryDatabase() {
return CompletableFuture.supplyAsync(() -> {
Connection connection = null;
PreparedStatement statement = null;
ResultSet resultSet = null;
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb", "user", "password");
statement = connection.prepareStatement("SELECT * FROM users WHERE id = 1");
resultSet = statement.executeQuery();
if (resultSet.next()) {
return resultSet.getString("name");
}
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
try {
if (resultSet != null) resultSet.close();
if (statement != null) statement.close();
if (connection != null) connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return null;
});
}
public static void main(String[] args) {
queryDatabase().thenAccept(System.out::println);
}
}
- 限流:
- 使用令牌桶算法或漏桶算法限制并发任务数量,防止任务过多导致资源耗尽。例如,使用Guava的RateLimiter:
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.CompletableFuture;
public class RateLimitingExample {
private static final RateLimiter rateLimiter = RateLimiter.create(10); // 每秒允许10个任务
public static CompletableFuture<Void> executeTask(Runnable task) {
rateLimiter.acquire();
return CompletableFuture.runAsync(task);
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
executeTask(() -> System.out.println("Task " + i + " is running"));
}
}
}