MST

星途 面试题库

面试题:Java ArrayBlockingQueue容量限制下的多线程协作

在多线程环境下,假设有一个ArrayBlockingQueue,容量为5。有3个线程负责往队列中添加元素,2个线程负责从队列中取出元素。请用Java代码实现这个场景,并处理好线程之间的协作与同步,确保队列的容量限制始终生效。同时,解释代码中使用的同步机制和原理。
48.6万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {

    private static final int QUEUE_CAPACITY = 5;
    private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);

    public static void main(String[] args) {
        Thread producer1 = new Thread(new Producer(queue, 1));
        Thread producer2 = new Thread(new Producer(queue, 2));
        Thread producer3 = new Thread(new Producer(queue, 3));

        Thread consumer1 = new Thread(new Consumer(queue, 1));
        Thread consumer2 = new Thread(new Consumer(queue, 2));

        producer1.start();
        producer2.start();
        producer3.start();

        consumer1.start();
        consumer2.start();
    }

    static class Producer implements Runnable {
        private final BlockingQueue<Integer> queue;
        private final int producerId;

        Producer(BlockingQueue<Integer> queue, int producerId) {
            this.queue = queue;
            this.producerId = producerId;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    int item = producerId * 10 + i;
                    queue.put(item);
                    System.out.println("Producer " + producerId + " added " + item);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    static class Consumer implements Runnable {
        private final BlockingQueue<Integer> queue;
        private final int consumerId;

        Consumer(BlockingQueue<Integer> queue, int consumerId) {
            this.queue = queue;
            this.consumerId = consumerId;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    int item = queue.take();
                    System.out.println("Consumer " + consumerId + " removed " + item);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

同步机制和原理解释:

  1. ArrayBlockingQueue
    • ArrayBlockingQueuejava.util.concurrent 包下的一个类,它实现了 BlockingQueue 接口。它是一个有界队列,容量在构造时指定(这里为 QUEUE_CAPACITY = 5)。
    • 当队列满时,往队列中添加元素的操作(如 put 方法)会被阻塞,直到队列中有空闲空间。
    • 当队列空时,从队列中取出元素的操作(如 take 方法)会被阻塞,直到队列中有元素。
  2. put 方法
    • producer 线程调用 queue.put(item) 方法往队列中添加元素。如果队列已满,put 方法会使当前线程进入等待状态(阻塞),直到队列中有空闲空间。这是通过内部的锁机制和条件变量实现的。
    • 当队列有空间时,线程被唤醒,元素被添加到队列中。
  3. take 方法
    • consumer 线程调用 queue.take() 方法从队列中取出元素。如果队列已空,take 方法会使当前线程进入等待状态(阻塞),直到队列中有元素。
    • 当队列中有元素时,线程被唤醒,元素被从队列中取出。
  4. 线程安全
    • ArrayBlockingQueue 内部使用一个锁(ReentrantLock)来保证线程安全。所有对队列的操作(添加、取出等)都需要获取这个锁。
    • 同时,通过 Condition 对象(通过锁创建)来实现线程的等待和唤醒机制,从而实现生产者 - 消费者模型中的同步。例如,当队列满时,生产者线程等待在 notFull 条件上;当队列空时,消费者线程等待在 notEmpty 条件上。当队列状态改变(有空间或有元素)时,相应的条件会被唤醒等待的线程。