import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
private static final int QUEUE_CAPACITY = 5;
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
public static void main(String[] args) {
Thread producer1 = new Thread(new Producer(queue, 1));
Thread producer2 = new Thread(new Producer(queue, 2));
Thread producer3 = new Thread(new Producer(queue, 3));
Thread consumer1 = new Thread(new Consumer(queue, 1));
Thread consumer2 = new Thread(new Consumer(queue, 2));
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
}
static class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int producerId;
Producer(BlockingQueue<Integer> queue, int producerId) {
this.queue = queue;
this.producerId = producerId;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
int item = producerId * 10 + i;
queue.put(item);
System.out.println("Producer " + producerId + " added " + item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
static class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
private final int consumerId;
Consumer(BlockingQueue<Integer> queue, int consumerId) {
this.queue = queue;
this.consumerId = consumerId;
}
@Override
public void run() {
while (true) {
try {
int item = queue.take();
System.out.println("Consumer " + consumerId + " removed " + item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
同步机制和原理解释:
- ArrayBlockingQueue:
ArrayBlockingQueue
是 java.util.concurrent
包下的一个类,它实现了 BlockingQueue
接口。它是一个有界队列,容量在构造时指定(这里为 QUEUE_CAPACITY = 5
)。
- 当队列满时,往队列中添加元素的操作(如
put
方法)会被阻塞,直到队列中有空闲空间。
- 当队列空时,从队列中取出元素的操作(如
take
方法)会被阻塞,直到队列中有元素。
put
方法:
producer
线程调用 queue.put(item)
方法往队列中添加元素。如果队列已满,put
方法会使当前线程进入等待状态(阻塞),直到队列中有空闲空间。这是通过内部的锁机制和条件变量实现的。
- 当队列有空间时,线程被唤醒,元素被添加到队列中。
take
方法:
consumer
线程调用 queue.take()
方法从队列中取出元素。如果队列已空,take
方法会使当前线程进入等待状态(阻塞),直到队列中有元素。
- 当队列中有元素时,线程被唤醒,元素被从队列中取出。
- 线程安全:
ArrayBlockingQueue
内部使用一个锁(ReentrantLock
)来保证线程安全。所有对队列的操作(添加、取出等)都需要获取这个锁。
- 同时,通过
Condition
对象(通过锁创建)来实现线程的等待和唤醒机制,从而实现生产者 - 消费者模型中的同步。例如,当队列满时,生产者线程等待在 notFull
条件上;当队列空时,消费者线程等待在 notEmpty
条件上。当队列状态改变(有空间或有元素)时,相应的条件会被唤醒等待的线程。