设计思路
- 性能优化
- 线程池复用:使用线程池来执行任务,避免频繁创建和销毁线程带来的开销。例如,使用
ThreadPoolExecutor
类,根据任务类型和预期并发量设置合适的核心线程数、最大线程数、队列容量等参数。
- 异步执行:让任务在后台线程异步执行,主线程可以继续处理其他事务,提高系统整体的并发处理能力。
- 资源管理
- 任务队列管理:合理设置任务队列的大小,避免任务堆积过多导致内存溢出。当队列满时,可以根据业务需求选择拒绝策略,如
AbortPolicy
(默认,直接抛出异常)、CallerRunsPolicy
(由调用者线程处理任务)等。
- 线程资源回收:在线程执行完任务后,确保线程能够正确返回线程池被复用,而不是一直占用资源。对于长时间运行的任务,可以考虑设置合理的超时机制,超时后中断任务并释放资源。
- 错误处理
- 异常捕获与传递:在任务执行过程中,捕获可能出现的异常,并将异常传递给等待结果的调用方。可以在自定义的
Future
实现类中提供方法来获取任务执行过程中的异常信息。
- 重试机制:对于一些由于临时性故障(如网络抖动等)导致的失败任务,可以设计重试机制,自动重试一定次数,提高任务执行的成功率。
关键实现要点
- 自定义Future接口
- 定义获取任务执行结果的方法,如
get()
,该方法应阻塞等待任务完成并返回结果。
- 定义获取任务执行状态的方法,如
isDone()
,用于判断任务是否已经完成。
- 定义获取任务执行异常的方法,如
getException()
,用于获取任务执行过程中抛出的异常。
public interface MyFuture<V> {
V get() throws InterruptedException, ExecutionException;
boolean isDone();
Exception getException();
}
- 自定义Callable接口
- 定义任务执行逻辑的接口,类似于Java原生的
Callable
接口,任务实现该接口并返回结果。
public interface MyCallable<V> {
V call() throws Exception;
}
- 自定义FutureTask类
- 实现
MyFuture
接口,同时持有MyCallable
任务对象。
- 内部维护任务的执行状态,如未开始、执行中、已完成、已取消等。
- 提供构造方法接收
MyCallable
任务对象。
- 实现
get()
方法,通过wait()
和notify()
机制实现阻塞等待任务完成并返回结果。
- 实现
isDone()
方法,根据任务执行状态返回结果。
- 实现
getException()
方法,返回任务执行过程中捕获的异常。
public class MyFutureTask<V> implements MyFuture<V> {
private volatile State state = State.NEW;
private MyCallable<V> callable;
private V result;
private Exception exception;
private static enum State {
NEW,
RUNNING,
DONE,
CANCELLED
}
public MyFutureTask(MyCallable<V> callable) {
this.callable = callable;
}
@Override
public V get() throws InterruptedException, ExecutionException {
synchronized (this) {
while (state != State.DONE && state != State.CANCELLED) {
wait();
}
if (exception != null) {
throw new ExecutionException(exception);
}
return result;
}
}
@Override
public boolean isDone() {
return state == State.DONE || state == State.CANCELLED;
}
@Override
public Exception getException() {
return exception;
}
public void run() {
if (state != State.NEW) {
return;
}
try {
state = State.RUNNING;
result = callable.call();
state = State.DONE;
} catch (Exception e) {
exception = e;
state = State.DONE;
} finally {
synchronized (this) {
notifyAll();
}
}
}
}
- 自定义线程池与任务提交
- 实现自定义的线程池,管理线程资源并提交任务。
- 线程池中的线程从任务队列中获取
MyFutureTask
任务并执行。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MyThreadPool {
private BlockingQueue<MyFutureTask<?>> taskQueue;
private Thread[] threads;
public MyThreadPool(int corePoolSize, int maxPoolSize, int queueCapacity) {
taskQueue = new LinkedBlockingQueue<>(queueCapacity);
threads = new Thread[maxPoolSize];
for (int i = 0; i < corePoolSize; i++) {
threads[i] = new Worker();
threads[i].start();
}
}
public MyFuture<?> submit(MyCallable<?> callable) {
MyFutureTask<?> task = new MyFutureTask<>(callable);
try {
taskQueue.put(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return task;
}
private class Worker extends Thread {
@Override
public void run() {
while (true) {
MyFutureTask<?> task;
try {
task = taskQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
task.run();
}
}
}
}