实现思路
- 文件分块读取:将大文件按固定大小(如4096字节)分块,每个线程负责读取一个或多个分块,减少单个线程I/O压力。
- 线程安全的数据结构:使用
collections.Counter
统计每种操作类型的次数,在多线程环境下通过Lock
来保证对Counter
对象操作的线程安全性。
- 队列用于任务分配:利用
Queue
存储文件分块,线程从队列中获取分块进行处理,实现任务的分配和同步。
性能优化考量
- 减少I/O操作:通过分块读取和多线程并发处理,减少文件I/O等待时间。
- 降低锁竞争:尽量减少锁的使用范围,只在更新共享的
Counter
对象时加锁,其他操作在线程内独立完成。
- 合理线程数量:根据系统CPU核心数和文件大小合理设置线程数量,避免过多线程导致上下文切换开销过大。
Python 代码实现
import threading
import queue
from collections import Counter
import os
def count_operation_type(file_chunk, result_counter, lock):
for line in file_chunk.decode('utf - 8').splitlines():
try:
_, operation_type, _ = line.split(',')
with lock:
result_counter[operation_type] += 1
except ValueError:
pass
def read_file_in_threads(file_path, num_threads):
result_counter = Counter()
lock = threading.Lock()
q = queue.Queue()
chunk_size = 4096
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
q.put(chunk)
def worker():
while not q.empty():
chunk = q.get()
count_operation_type(chunk, result_counter, lock)
q.task_done()
threads = []
for _ in range(num_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for t in threads:
t.join()
return result_counter
if __name__ == "__main__":
file_path = "your_log_file.log"
num_threads = os.cpu_count() or 1
result = read_file_in_threads(file_path, num_threads)
for operation_type, count in result.items():
print(f"{operation_type}: {count}")