优化思路
- 内存管理:
- 避免一次性将所有文件内容读入内存,采用流(stream)的方式逐块处理数据,减少内存占用。
- 对于复杂处理过程中产生的中间数据,合理分配内存,及时释放不再使用的变量。
- 文件系统资源利用:
- 限制同时打开的文件数量,避免文件描述符耗尽。可以使用队列控制并发读写操作的数量。
- 批量处理文件,减少文件系统的频繁切换操作。
- I/O 性能瓶颈:
- 使用异步 I/O 操作,如
fs.promises
提供的异步方法,避免阻塞事件循环。
- 利用
cluster
模块实现多进程并行处理,充分利用多核 CPU 的优势,但要注意进程间通信和资源同步。
- 采用缓冲技术,适当增加读写缓冲区的大小,减少 I/O 次数。
实现方案
const fs = require('fs');
const path = require('path');
const { promisify } = require('util');
const { Worker } = require('worker_threads');
const readFile = promisify(fs.readFile);
const writeFile = promisify(fs.writeFile);
// 配置参数
const inputDir = 'input';
const outputDir = 'output';
const maxConcurrent = 10; // 最大并发数
// 读取文件列表
async function getFileList(dir) {
const files = await promisify(fs.readdir)(dir);
return files.map(file => path.join(dir, file));
}
// 单个文件处理函数
async function processFile(inputPath, outputPath) {
const data = await readFile(inputPath);
// 复杂处理,这里假设是简单的字符串替换
const processedData = data.toString().replace(/old/g, 'new');
await writeFile(outputPath, processedData);
}
// 主处理函数
async function main() {
const inputFiles = await getFileList(inputDir);
const outputFiles = inputFiles.map(file => path.join(outputDir, path.basename(file)));
const taskQueue = [];
for (let i = 0; i < inputFiles.length; i++) {
taskQueue.push(processFile(inputFiles[i], outputFiles[i]));
if (taskQueue.length >= maxConcurrent || i === inputFiles.length - 1) {
await Promise.all(taskQueue);
taskQueue.length = 0;
}
}
}
// 使用多进程优化版本
async function mainWithCluster() {
const inputFiles = await getFileList(inputDir);
const outputFiles = inputFiles.map(file => path.join(outputDir, path.basename(file)));
const workerPromises = [];
for (let i = 0; i < inputFiles.length; i += maxConcurrent) {
const worker = new Worker('./worker.js', {
workerData: {
inputFiles: inputFiles.slice(i, i + maxConcurrent),
outputFiles: outputFiles.slice(i, i + maxConcurrent)
}
});
const promise = new Promise((resolve, reject) => {
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code!== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
workerPromises.push(promise);
}
await Promise.all(workerPromises);
}
// 启动处理
// main().catch(console.error);
mainWithCluster().catch(console.error);
worker.js 文件内容
const { workerData, parentPort } = require('worker_threads');
const fs = require('fs');
const path = require('path');
const { promisify } = require('util');
const readFile = promisify(fs.readFile);
const writeFile = promisify(fs.writeFile);
async function processFile(inputPath, outputPath) {
const data = await readFile(inputPath);
const processedData = data.toString().replace(/old/g, 'new');
await writeFile(outputPath, processedData);
}
async function main() {
const { inputFiles, outputFiles } = workerData;
for (let i = 0; i < inputFiles.length; i++) {
await processFile(inputFiles[i], outputFiles[i]);
}
parentPort.postMessage('Processed successfully');
}
main().catch(err => {
parentPort.postMessage(err);
});