MST

星途 面试题库

面试题:Java中BlockingQueue线程安全机制相关 - 自定义扩展

假设现有一个特殊需求,需要在BlockingQueue的基础上,实现当队列元素达到一定数量且满足特定条件时,触发一个自定义的通知机制,同时要保证线程安全。请描述实现思路,并给出关键代码示例。
47.6万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

实现思路

  1. 继承BlockingQueue:通过继承BlockingQueue类,以便复用其线程安全的特性以及队列操作方法。
  2. 定义条件和通知机制:定义特定条件以及相应的通知机制,比如可以使用java.util.concurrent.locks.Condition来实现条件等待和唤醒。
  3. 重写入队方法:重写BlockingQueue的入队方法(如put方法),在元素入队后检查队列元素数量和特定条件,满足时触发通知。

关键代码示例

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CustomBlockingQueue<T> extends LinkedBlockingQueue<T> {
    private static final int THRESHOLD = 10;
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();

    public CustomBlockingQueue(int capacity) {
        super(capacity);
    }

    @Override
    public void put(T e) throws InterruptedException {
        super.put(e);
        lock.lock();
        try {
            if (size() >= THRESHOLD && meetsSpecificCondition()) {
                condition.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

    private boolean meetsSpecificCondition() {
        // 这里实现特定条件检查逻辑
        // 例如:return this.stream().allMatch(...)
        return true;
    }

    public void waitForCondition() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }
}

可以这样使用:

public class Main {
    public static void main(String[] args) {
        CustomBlockingQueue<Integer> queue = new CustomBlockingQueue<>(20);
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 15; i++) {
                try {
                    queue.put(i);
                    System.out.println("Produced: " + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                queue.waitForCondition();
                System.out.println("Condition met, starting consumption.");
                // 消费逻辑
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();

        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}