生产者代码逻辑
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
queue.put(i); // 将元素放入队列,若队列满则阻塞
System.out.println("Producer produced: " + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
消费者代码逻辑
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Integer item = queue.take(); // 从队列取出元素,若队列为空则阻塞
System.out.println("Consumer consumed: " + item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
测试代码
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Main {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
Thread producerThread = new Thread(new Producer(queue));
Thread consumerThread = new Thread(new Consumer(queue));
producerThread.start();
consumerThread.start();
try {
producerThread.join();
consumerThread.interrupt();
consumerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
BlockingQueue保证线程安全的特性
- 阻塞操作:
put(E e)
:当队列满时,调用此方法的线程会被阻塞,直到队列有空间可用。这确保了在多线程环境下,不会出现因为队列满而导致的元素丢失或数据竞争问题。
take()
:当队列空时,调用此方法的线程会被阻塞,直到队列中有元素可取。这避免了消费者在队列为空时获取到无效数据的情况。
- 内部同步机制:BlockingQueue的实现类(如
LinkedBlockingQueue
、ArrayBlockingQueue
等)内部使用了锁(如ReentrantLock
)和条件变量(Condition
)来实现线程同步。这些机制保证了在多线程访问队列时,对队列的插入、删除等操作是线程安全的。例如,在LinkedBlockingQueue
中,使用ReentrantLock
来保护队列的状态,不同的Condition
对象用于控制队列满和队列空的情况。
- 原子性操作:BlockingQueue的各种方法(如
put
、take
、offer
、poll
等)都是原子性的。也就是说,这些操作要么完全执行,要么完全不执行,不会出现部分执行的情况,从而避免了数据不一致问题。