设计思路
- 使用
volatile
标志位:定义一个volatile
类型的标志位,用于线程间通信,告知线程是否需要终止。
- 生产者 - 消费者模型处理:在生产者线程中,当标志位为终止状态时,不再生产新任务,并等待队列中的任务被消费完。消费者线程在处理完当前任务后,检查标志位,若为终止状态且任务队列为空,则终止。
- 线程池处理:使用
ExecutorService
的shutdown()
和awaitTermination()
方法。shutdown()
方法启动有序关闭,不再接受新任务,但会继续执行已提交的任务。awaitTermination()
方法等待所有任务执行完成或超时。
- 资源释放:在每个线程的终止逻辑中,确保释放线程持有的资源,如文件句柄、数据库连接等。
关键代码示例
- 生产者 - 消费者模型示例
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private volatile boolean stop = false;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
int i = 0;
while (!stop) {
try {
queue.put(i++);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 等待队列中的任务被消费完
while (!queue.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public void stopProducer() {
stop = true;
}
}
class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private volatile boolean stop = false;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!stop ||!queue.isEmpty()) {
try {
Integer item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public void stopConsumer() {
stop = true;
}
}
- 线程池示例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolShutdownExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 提交任务
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
// 模拟任务执行
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 启动有序关闭
executorService.shutdown();
try {
// 等待所有任务执行完成或超时
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException ie) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}