1. 实现自定义的 Node.js 双工流
const { Duplex } = require('stream');
const fs = require('fs');
class CustomDuplexStream extends Duplex {
constructor() {
super();
}
_write(chunk, encoding, callback) {
// 将接收到的 chunk 中的字母转换为大写
const upperChunk = chunk.toString().toUpperCase();
// 将转换后的内容写入可读端
this.push(upperChunk);
callback();
}
_read(size) {
// 这里 _read 方法在这种简单转换场景下,
// 不需要特别处理,因为 _write 中已经直接 push 转换后的数据
}
}
// 使用示例,处理大文件
const readableStream = fs.createReadStream('largeFile.txt');
const writableStream = fs.createWriteStream('largeFile_converted.txt');
const customDuplex = new CustomDuplexStream();
readableStream.pipe(customDuplex).pipe(writableStream);
2. 性能优化
- 分块处理:在
_write
方法中,Node.js 流会自动以分块(chunk)的形式处理数据,避免一次性将整个大文件读入内存。这确保了内存使用的可控性。
- 背压处理:虽然上述代码没有显式处理背压,但在实际场景中,如果写入速度比读取速度快,可能会导致数据堆积在内存中。可以通过监听
drain
事件来处理背压。例如,在 _write
方法中,如果 this.push
返回 false
,说明缓冲区已满,需要暂停写入,直到 drain
事件触发再继续写入。
_write(chunk, encoding, callback) {
const upperChunk = chunk.toString().toUpperCase();
const writeResult = this.push(upperChunk);
if (!writeResult) {
// 缓冲区已满,暂停写入
this.once('drain', () => {
this.push(upperChunk);
callback();
});
} else {
callback();
}
}
- 及时释放资源:在流结束时,确保所有相关资源(如文件描述符)被正确关闭和释放。
fs.createReadStream
和 fs.createWriteStream
会在流结束时自动关闭文件,但在复杂场景下,要检查是否有其他资源需要手动释放。
- 优化转换逻辑:对于复杂的格式转换逻辑,尽量使用高效的算法和数据结构,避免在转换过程中产生过多的中间数据占用内存。例如在上述简单的字母大写转换中,直接在
chunk
上进行操作,没有引入过多中间变量。