MST

星途 面试题库

面试题:Python在大数据场景下NumPy和Pandas数据处理优化

假设你需要处理一个非常大的数据集(内存无法完全容纳),其中包含多个CSV文件,每个文件有上万行和几十列。数据需要进行复杂的清洗(例如去除异常值、填补缺失值)、转换(例如对某些列进行对数变换)和聚合操作(例如按某一列分组计算其他列的统计量)。请阐述如何利用Python的NumPy和Pandas,结合适当的优化技巧(如分块处理、多进程等)高效完成这些任务,并给出关键代码示例。
30.3万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

1. 分块处理

Pandas 可以逐块读取大的 CSV 文件,减少内存占用。

import pandas as pd

# 分块读取CSV文件
chunk_size = 10000
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    # 数据清洗:去除异常值
    for col in chunk.columns:
        if pd.api.types.is_numeric_dtype(chunk[col]):
            Q1 = chunk[col].quantile(0.25)
            Q3 = chunk[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            chunk = chunk[(chunk[col] >= lower_bound) & (chunk[col] <= upper_bound)]
    
    # 数据清洗:填补缺失值
    chunk.fillna(method='ffill', inplace=True)
    chunk.fillna(method='bfill', inplace=True)
    
    # 数据转换:对数变换
    for col in chunk.columns:
        if pd.api.types.is_numeric_dtype(chunk[col]):
            chunk[col] = np.log1p(chunk[col])
    
    # 聚合操作:按某一列分组计算其他列的统计量
    grouped = chunk.groupby('grouping_column').agg({'numeric_column':'mean'})
    print(grouped)

2. 多进程处理

可以使用 multiprocessing 库并行处理分块数据。

import pandas as pd
import numpy as np
from multiprocessing import Pool


def process_chunk(chunk):
    # 数据清洗:去除异常值
    for col in chunk.columns:
        if pd.api.types.is_numeric_dtype(chunk[col]):
            Q1 = chunk[col].quantile(0.25)
            Q3 = chunk[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            chunk = chunk[(chunk[col] >= lower_bound) & (chunk[col] <= upper_bound)]
    
    # 数据清洗:填补缺失值
    chunk.fillna(method='ffill', inplace=True)
    chunk.fillna(method='bfill', inplace=True)
    
    # 数据转换:对数变换
    for col in chunk.columns:
        if pd.api.types.is_numeric_dtype(chunk[col]):
            chunk[col] = np.log1p(chunk[col])
    
    # 聚合操作:按某一列分组计算其他列的统计量
    grouped = chunk.groupby('grouping_column').agg({'numeric_column':'mean'})
    return grouped


if __name__ == '__main__':
    chunk_size = 10000
    chunks = []
    for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
        chunks.append(chunk)
    
    with Pool() as pool:
        results = pool.map(process_chunk, chunks)
    
    final_result = pd.concat(results)
    print(final_result)

3. 使用 NumPy 优化数值计算

在数据转换步骤中,NumPy 可以加速对数变换等数值计算。 在上述代码的数据转换部分:

import numpy as np
# 数据转换:对数变换
for col in chunk.columns:
    if pd.api.types.is_numeric_dtype(chunk[col]):
        chunk[col] = np.log1p(chunk[col].values)

这里使用 chunk[col].values 将 Pandas 系列转换为 NumPy 数组,直接在 NumPy 数组上进行对数变换,提升计算效率。

通过分块处理、多进程并行以及合理使用 NumPy 优化数值计算,可以高效处理内存无法完全容纳的大 CSV 数据集。