实现思路
- 共享队列:使用
LinkedList
来实现共享队列,它是线程不安全的,所以需要在操作时保证线程安全。
- 线程安全控制:使用
ReentrantLock
来保证对共享队列操作的线程安全,通过Condition
实现生产者和消费者线程的等待与唤醒。
- 生产者线程:当队列满时,调用
Condition
的await
方法等待,队列不满时向队列添加数据,并调用Condition
的signalAll
方法唤醒所有等待的消费者线程。
- 消费者线程:当队列空时,调用
Condition
的await
方法等待,队列不空时从队列取出数据,并调用Condition
的signalAll
方法唤醒所有等待的生产者线程。
代码实现
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerExample {
private final LinkedList<Integer> queue;
private final int capacity;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public ProducerConsumerExample(int capacity) {
this.queue = new LinkedList<>();
this.capacity = capacity;
}
public void produce(int item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
System.out.println("Queue is full. Producer waiting...");
notFull.await();
}
queue.add(item);
System.out.println("Produced: " + item);
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public int consume() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println("Queue is empty. Consumer waiting...");
notEmpty.await();
}
int item = queue.removeFirst();
System.out.println("Consumed: " + item);
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ProducerConsumerExample pc = new ProducerConsumerExample(5);
Thread producer1 = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
pc.produce(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread producer2 = new Thread(() -> {
try {
for (int i = 11; i <= 20; i++) {
pc.produce(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumer1 = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
pc.consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumer2 = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
pc.consume();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
try {
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
线程安全处理
ReentrantLock
:在produce
和consume
方法中,通过lock.lock()
获取锁,确保在同一时间只有一个线程可以访问共享队列,避免数据竞争。
Condition
:使用notFull
和notEmpty
两个Condition
对象。当队列满时,生产者线程调用notFull.await()
等待,当队列空时,消费者线程调用notEmpty.await()
等待。当生产者向队列添加数据后,调用notEmpty.signalAll()
唤醒所有等待的消费者线程;当消费者从队列取出数据后,调用notFull.signalAll()
唤醒所有等待的生产者线程。
try - finally
块:在获取锁后,使用try - finally
块确保无论是否发生异常,都能正确释放锁,通过lock.unlock()
来释放锁,保证线程安全。