实现思路
- Readable Stream:基于
EventEmitter
来触发数据读取和结束等事件。利用Buffer
模块来存储和处理读取的数据。维护一个内部缓冲区,当有数据可读时,触发data
事件。当数据读取完毕,触发end
事件。
- Writable Stream:同样基于
EventEmitter
,接收数据写入操作。将写入的数据存入内部缓冲区,当缓冲区有空间时,触发drain
事件。当所有数据写入完成,触发finish
事件。
- 背压处理:在Writable Stream中,当写入速度过快,缓冲区满时,需要暂停写入操作,直到缓冲区有空间(即触发
drain
事件)再恢复写入。在Readable Stream中,当读取速度过慢,缓冲区满时,需要控制数据的读取速度,避免数据丢失。
关键代码片段
Readable Stream
const EventEmitter = require('events');
const Buffer = require('buffer').Buffer;
class MyReadableStream extends EventEmitter {
constructor() {
super();
this.buffer = Buffer.alloc(0);
this._read();
}
_read() {
// 模拟从数据源读取数据,这里简单生成一些数据
const newData = Buffer.from('new data');
this.buffer = Buffer.concat([this.buffer, newData]);
this.emit('data', this.buffer);
this.emit('end');
}
}
Writable Stream
class MyWritableStream extends EventEmitter {
constructor() {
super();
this.buffer = Buffer.alloc(0);
this.highWaterMark = 16; // 假设高水位线为16字节
}
write(chunk) {
if (this.buffer.length + chunk.length > this.highWaterMark) {
// 缓冲区满,处理背压
return false;
}
this.buffer = Buffer.concat([this.buffer, chunk]);
this.emit('drain');
return true;
}
end() {
this.emit('finish');
}
}
与Node.js原生Stream实现的异同点
- 相同点
- 基本原理相同,都基于
EventEmitter
来处理事件驱动,利用Buffer
模块处理数据。
- 都需要处理背压问题以保证数据的可靠传输。
- 不同点
- 功能完整性:原生
Stream
具有更丰富的功能和更完善的实现,如支持多种模式(流动模式和暂停模式)、复杂的管道操作等。自定义实现相对简单,仅包含基础功能。
- 性能优化:原生
Stream
经过了大量的性能优化,在处理大规模数据和高并发场景下表现更好。自定义实现可能存在性能瓶颈。
- 错误处理:原生
Stream
有更完备的错误处理机制,自定义实现需要开发者手动添加更多的错误处理逻辑。