面试题答案
一键面试import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class AsyncTaskScheduler {
private static final ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "MyThread-" + threadNumber.getAndIncrement());
t.setDaemon(true);
return t;
}
}
);
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
// 模拟任务A
System.out.println("Task A is running.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return true;
}, executor)
.thenApplyAsync(result -> {
// 根据任务A的结果决定执行哪组任务
if (result) {
return CompletableFuture.allOf(
CompletableFuture.runAsync(() -> {
System.out.println("Task B1 is running.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor),
CompletableFuture.runAsync(() -> {
System.out.println("Task B2 is running.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor)
);
} else {
return CompletableFuture.allOf(
CompletableFuture.runAsync(() -> {
System.out.println("Task C1 is running.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor),
CompletableFuture.runAsync(() -> {
System.out.println("Task C2 is running.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor)
);
}
}, executor)
.thenApplyAsync(future -> {
// 等待子任务完成
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return "All sub - tasks completed.";
}, executor)
.thenAcceptAsync(result -> {
// 执行任务D
System.out.println("Task D is running with result: " + result);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor)
.exceptionally(ex -> {
ex.printStackTrace();
return null;
})
.whenComplete((v, ex) -> {
executor.shutdown();
});
}
}
性能优化
- 线程池复用:
- 使用
Executors.newFixedThreadPool
创建固定大小的线程池,根据系统可用处理器数量设置线程池大小,这样可以避免频繁创建和销毁线程带来的开销。例如:
private static final ExecutorService executor = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), new ThreadFactory() { private final AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "MyThread-" + threadNumber.getAndIncrement()); t.setDaemon(true); return t; } } );
- 使用自定义的
ThreadFactory
设置线程为守护线程,使得在主线程结束时,这些线程也能自动结束,避免资源泄漏。
- 使用
- 减少上下文切换:
- 在
CompletableFuture
的链式调用中,使用thenApplyAsync
、thenAcceptAsync
等方法,并传入同一个线程池executor
,这样可以减少不同线程池之间切换带来的上下文切换开销。例如:
- 在
.thenApplyAsync(result -> { // 根据任务A的结果决定执行哪组任务 if (result) { return CompletableFuture.allOf( CompletableFuture.runAsync(() -> { System.out.println("Task B1 is running."); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executor), CompletableFuture.runAsync(() -> { System.out.println("Task B2 is running."); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executor) ); } else { return CompletableFuture.allOf( CompletableFuture.runAsync(() -> { System.out.println("Task C1 is running."); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executor), CompletableFuture.runAsync(() -> { System.out.println("Task C2 is running."); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }, executor) ); } }, executor)
3. **合理的任务粒度**:
- 在设计任务时,尽量使任务有一定的粒度,避免将任务划分得过于细碎。例如任务A、B1、B2、C1、C2、D的执行时间相对合理,不会因为过于短小的任务导致频繁的线程调度开销。
4. **异常处理**:
- 使用`exceptionally`方法对整个任务链中的异常进行统一处理,避免因未处理的异常导致程序中断。例如:
```java
.exceptionally(ex -> {
ex.printStackTrace();
return null;
})
- 资源清理:
- 在任务完成后(通过
whenComplete
方法),关闭线程池,释放资源。例如:
- 在任务完成后(通过
.whenComplete((v, ex) -> { executor.shutdown(); });