资源管理以避免资源耗尽
- 内存管理
- 限制每个进程内存使用:在进程启动时,通过设置合适的资源限制(如
resource
模块在Unix系统上)来限制每个进程的内存使用量。例如:
import resource
# 设置进程最大内存使用量为100MB
soft, hard = resource.getrlimit(resource.RLIMIT_AS)
resource.setrlimit(resource.RLIMIT_AS, (100 * 1024 * 1024, hard))
- **共享内存**:使用`multiprocessing.shared_memory`模块,让多个进程共享同一块内存区域,减少内存重复占用。比如在进程间传递大量数据时,将数据放入共享内存中,而不是每个进程复制一份。
import multiprocessing.shared_memory
import numpy as np
# 创建共享内存块
shm = multiprocessing.shared_memory.SharedMemory(create=True, size=1024)
# 将数据写入共享内存
arr = np.ndarray((1024,), dtype=np.int8, buffer=shm.buf)
arr[:] = np.random.randint(0, 256, size=1024, dtype=np.int8)
- 文件描述符管理
- 及时关闭文件描述符:在进程使用完文件后,确保及时调用
close()
方法关闭文件描述符。可以使用with
语句来自动管理文件的打开和关闭,例如:
with open('test.txt', 'r') as f:
data = f.read()
- **限制打开文件数量**:在Unix系统上,通过`resource`模块设置进程可以打开的最大文件描述符数量。
import resource
# 设置进程最多可打开1024个文件描述符
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
resource.setrlimit(resource.RLIMIT_NOFILE, (1024, hard))
多核CPU环境下优化multiprocessing模块性能
- 任务划分
- 细粒度与粗粒度任务:对于计算密集型任务,尽量使用粗粒度任务,减少进程间通信开销。例如,如果要对一个大数组进行计算,可以将数组分成几个大的块,每个进程处理一块。而对于I/O密集型任务,细粒度任务可能更合适,因为I/O操作等待时间长,多个进程可以在等待I/O时切换执行。
- 进程池
- 合理设置进程池大小:根据CPU核心数来设置进程池的大小。一般来说,对于计算密集型任务,进程池大小可以设置为CPU核心数;对于I/O密集型任务,可以适当增大进程池大小,比如2倍或3倍CPU核心数。例如:
import multiprocessing
def task(x):
return x * x
if __name__ == '__main__':
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
results = pool.map(task, range(10))
- 减少进程间通信开销
- 使用队列和管道优化:如果进程间需要通信,尽量减少通信频率,批量处理数据。例如,使用
multiprocessing.Queue
时,不要每次产生少量数据就放入队列,而是积累一定量的数据后再放入。
import multiprocessing
def producer(queue):
data = []
for i in range(100):
data.append(i)
if len(data) == 10:
queue.put(data)
data = []
if data:
queue.put(data)
def consumer(queue):
while True:
data = queue.get()
if data is None:
break
for item in data:
print(item)
if __name__ == '__main__':
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
queue.put(None)
p2.join()
- 负载均衡
- 动态任务分配:对于不同任务耗时差异较大的情况,可以采用动态任务分配方式。例如,使用
multiprocessing.Pool
的imap_unordered
方法,它会在一个进程完成任务后立即分配新任务,而不是预先分配所有任务。
import multiprocessing
def task(x):
import time
time.sleep(x)
return x * x
if __name__ == '__main__':
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
results = list(pool.imap_unordered(task, [3, 1, 4, 1, 5]))
print(results)