确保线程间有效通信及避免资源竞争的机制
- 锁机制
- 互斥锁:使用
synchronized
关键字或者ReentrantLock
类。例如,synchronized
关键字可以修饰方法或者代码块,当一个线程进入同步代码块时,其他线程就无法进入,从而避免资源竞争。ReentrantLock
相比synchronized
更加灵活,支持公平锁、非公平锁,以及可中断的锁获取等特性。
- 读写锁:
ReadWriteLock
及其实现类ReentrantReadWriteLock
,适用于读多写少的场景。允许多个线程同时进行读操作,但只允许一个线程进行写操作,写操作时其他读写线程都不能访问,这样既保证了读操作的并发性能,又避免了写操作时的数据不一致问题。
- 信号量:
Semaphore
可以控制同时访问资源的线程数量。例如,在网络连接池场景下,可以设置Semaphore
的许可数量为连接池中的连接数,每个线程在获取网络连接前先获取一个许可,使用完连接后释放许可,这样可以有效控制并发访问连接池的线程数量,避免资源耗尽。
- 线程安全的容器:使用
ConcurrentHashMap
、CopyOnWriteArrayList
等线程安全的容器类。这些容器内部通过锁机制或者其他并发控制手段,确保在多线程环境下的数据一致性。例如ConcurrentHashMap
在保证线程安全的同时,通过分段锁等技术提高了并发性能。
- 线程间通信机制
- 等待 - 通知机制:在Java中可以使用
Object
类的wait()
、notify()
和notifyAll()
方法。例如,一个线程在获取不到网络资源时,可以调用wait()
方法进入等待状态,当其他线程释放资源后调用notify()
或者notifyAll()
方法唤醒等待的线程。
- 管道流:
PipedInputStream
和PipedOutputStream
、PipedReader
和PipedWriter
用于线程之间通过管道进行字节流或者字符流的通信。例如,一个线程将数据写入PipedOutputStream
,另一个线程从对应的PipedInputStream
中读取数据,实现线程间的数据传递。
使用Java并发工具类实现线程协作需求示例
- CountDownLatch
- 作用:
CountDownLatch
允许一个或多个线程等待其他一组线程完成操作后再继续执行。
- 示例:假设有一个网络爬虫程序,需要等待多个线程完成网页下载后,再进行数据的汇总分析。
import java.util.concurrent.CountDownLatch;
public class WebCrawler {
public static void main(String[] args) {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(new DownloadTask(latch)).start();
}
try {
latch.await();
System.out.println("所有网页下载完成,开始数据分析。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class DownloadTask implements Runnable {
private final CountDownLatch latch;
public DownloadTask(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 开始下载网页。");
// 模拟网页下载过程
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 网页下载完成。");
latch.countDown();
}
}
}
- CyclicBarrier
- 作用:
CyclicBarrier
可以让一组线程相互等待,直到所有线程都到达某个屏障点后,再一起继续执行。与CountDownLatch
不同的是,CyclicBarrier
可以重复使用。
- 示例:在一个分布式计算场景中,多个计算节点需要先进行数据预处理,然后再同时进行计算。
import java.util.concurrent.CyclicBarrier;
public class DistributedComputing {
public static void main(String[] args) {
int nodeCount = 3;
CyclicBarrier barrier = new CyclicBarrier(nodeCount, () -> {
System.out.println("所有节点数据预处理完成,开始进行计算。");
});
for (int i = 0; i < nodeCount; i++) {
new Thread(new ComputeNode(barrier)).start();
}
}
static class ComputeNode implements Runnable {
private final CyclicBarrier barrier;
public ComputeNode(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 开始数据预处理。");
// 模拟数据预处理过程
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 数据预处理完成,等待其他节点。");
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 开始进行计算。");
}
}
}