import csv
import numpy as np
import sys
def process_csv(input_file, output_file):
# 数据清洗函数,示例:去除空行,这里假设空行为所有列为空
def clean_row(row):
return all(field.strip() for field in row)
# 读取超大CSV文件,按块读取以优化内存
data_chunks = []
specific_column = []
with open(input_file, 'r', newline='', encoding='utf-8') as infile:
reader = csv.reader(infile)
header = next(reader) # 读取表头
specific_column_index = header.index('specific_column_name') # 假设要统计的列名
for row in reader:
if clean_row(row):
data_chunks.append(row)
specific_column.append(float(row[specific_column_index]))
# 特定列数据统计
if specific_column:
mean_value = np.mean(specific_column)
median_value = np.median(specific_column)
print(f"平均值: {mean_value}, 中位数: {median_value}")
else:
print("特定列无有效数据")
# 将处理后的数据写入新的CSV文件
with open(output_file, 'w', newline='', encoding='utf-8') as outfile:
writer = csv.writer(outfile)
writer.writerow(header)
writer.writerows(data_chunks)
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: python script.py input_file.csv output_file.csv")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2]
process_csv(input_file, output_file)
- 按块读取:使用
csv.reader
逐行读取文件,避免一次性加载整个文件到内存。
- 数据清洗:定义
clean_row
函数,示例中是去除所有列为空的行,可根据实际需求调整。
- 特定列统计:使用
numpy
库计算平均值和中位数,在遍历文件时提取特定列数据。
- 写入新文件:使用
csv.writer
将处理后的数据写入新的CSV文件。