1. Queue
- 原理:
Queue
是 multiprocessing
模块中用于进程间通信的队列。它基于管道和锁机制实现,能够安全地在多个进程间传递数据。队列是线程和进程安全的。
- 实际场景:常用于生产者 - 消费者模型。例如,一个进程负责生成数据(生产者),另一个或多个进程负责处理数据(消费者)。比如在图片处理系统中,一个进程从文件夹中读取图片路径(生产者),另一个进程根据路径读取图片并进行处理(消费者)。
- 避免数据冲突和死锁:由于
Queue
本身是线程和进程安全的,所以在基本使用场景下不会出现数据冲突。对于死锁问题,只要确保生产者和消费者都有合理的退出机制,例如生产者在生产完所有数据后关闭队列,消费者在队列为空且没有新数据即将到来时停止消费,就可以避免死锁。
- 示例代码:
import multiprocessing
def producer(queue):
for i in range(5):
queue.put(i)
print(f"Produced {i}")
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f"Consumed {item}")
if __name__ == '__main__':
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
queue.put(None) # 发送结束信号
p2.join()
2. Pipe
- 原理:
Pipe
用于创建一个管道,返回一对连接对象,分别表示管道的两端,通过这两个对象可以在两个进程间进行数据传递。管道默认是全双工的,即两端都可以同时进行读写操作。
- 实际场景:适用于两个进程之间需要直接通信的场景。例如,主进程需要向子进程发送一些配置信息,子进程处理完任务后返回结果给主进程。在分布式计算系统中,协调进程与计算节点进程之间的通信就可以使用
Pipe
。
- 避免数据冲突和死锁:为避免数据冲突,要明确两端的读写操作顺序,尽量不要同时进行双向的写操作。对于死锁问题,合理规划进程逻辑,确保在适当的时候进行读写操作,避免双方都在等待对方先发送数据的情况。例如,可以约定由一端先发起通信。
- 示例代码:
import multiprocessing
def sender(pipe):
conn1, conn2 = pipe
conn1.close()
for i in range(5):
conn2.send(i)
print(f"Sent {i}")
conn2.close()
def receiver(pipe):
conn1, conn2 = pipe
conn2.close()
while True:
try:
item = conn1.recv()
print(f"Received {item}")
except EOFError:
break
conn1.close()
if __name__ == '__main__':
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=((pipe,)))
p2 = multiprocessing.Process(target=receiver, args=((pipe,)))
p1.start()
p2.start()
p1.join()
p2.join()