实现思路
- 分块处理数据:将大数据集分割成多个小的数据块,每次只处理一个数据块,避免一次性加载全部数据到内存。
- 使用
ThreadPoolExecutor
或ProcessPoolExecutor
:ThreadPoolExecutor
适用于I/O密集型任务,ProcessPoolExecutor
适用于CPU密集型任务。这里假设任务是CPU密集型,选择ProcessPoolExecutor
。
- 提交任务并获取结果:将每个数据块的处理任务提交到线程池或进程池,然后收集处理结果。
关键代码片段
import concurrent.futures
import pandas as pd
def process_chunk(chunk):
# 这里是具体的数据处理逻辑,例如对chunk进行计算等操作
result = chunk.sum()
return result
def parallel_processing(file_path, chunk_size):
results = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
future = executor.submit(process_chunk, chunk)
results.append(future)
return [future.result() for future in concurrent.futures.as_completed(results)]
监控和优化内存使用情况
- 监控内存使用:
- 使用
memory_profiler
库:安装memory_profiler
库(pip install memory_profiler
),然后在代码中使用@profile
装饰器标记要监控内存使用的函数,运行代码时使用mprof run your_script.py
,最后使用mprof plot
生成内存使用情况图表。
psutil
库:在代码中使用psutil
库获取当前进程的内存使用情况。例如:
import psutil
def monitor_memory():
process = psutil.Process()
memory_info = process.memory_info()
print(f"当前进程内存使用: {memory_info.rss / 1024 / 1024} MB")
- 优化内存使用:
- 及时释放不再使用的变量:在处理完一个数据块后,确保相关变量被释放,可以使用
del
关键字手动删除不再需要的变量。
- 调整分块大小:通过监控内存使用情况,调整
chunk_size
的大小,找到一个既能充分利用并行计算优势,又不会导致内存溢出的合适值。