MST

星途 面试题库

面试题:Node.js Stream 数据传输中的背压处理

当使用 Stream 进行数据传输时,可能会遇到背压问题。请描述在 Node.js 中背压产生的原因,以及如何在代码层面处理背压,以确保数据传输的稳定性和可靠性。
17.5万 热度难度
前端开发Node.js

知识考点

AI 面试

面试题答案

一键面试

背压产生的原因

  1. 生产者与消费者速度不匹配:在 Node.js 中,当数据生产者(如可读流 Readable Stream)生成数据的速度快于数据消费者(如可写流 Writable Stream)处理数据的速度时,就会出现背压问题。例如,从文件系统读取数据(生产者)并写入网络套接字(消费者),如果读取速度远超过网络写入速度,数据就会在中间堆积。
  2. 缓冲区限制:可写流有一个内部缓冲区,用于暂存即将写入的数据。当缓冲区已满,而生产者仍在快速生成数据时,就无法再往缓冲区添加新数据,从而引发背压。

代码层面处理背压的方法

  1. 使用 pipe 方法并监听背压事件
    const fs = require('fs');
    const http = require('http');
    
    const server = http.createServer((req, res) => {
        const readableStream = fs.createReadStream('largeFile.txt');
        const writableStream = res;
    
        // pipe 方法会自动处理背压
        readableStream.pipe(writableStream);
    
        // 监听背压事件
        readableStream.on('drain', () => {
            console.log('背压缓解,可继续写入数据');
        });
    });
    
    server.listen(3000, () => {
        console.log('Server running on port 3000');
    });
    
  2. 手动处理背压
    const fs = require('fs');
    const http = require('http');
    
    const server = http.createServer((req, res) => {
        const readableStream = fs.createReadStream('largeFile.txt');
        const writableStream = res;
    
        let paused = false;
        readableStream.on('data', (chunk) => {
            if (paused) {
                // 如果处于暂停状态,将数据暂存或处理其他逻辑
                return;
            }
            const writeResult = writableStream.write(chunk);
            if (!writeResult) {
                // 如果写入返回 false,说明缓冲区已满,暂停可读流
                paused = true;
                readableStream.pause();
            }
        });
    
        writableStream.on('drain', () => {
            // 当可写流缓冲区有空间时,恢复可读流
            paused = false;
            readableStream.resume();
        });
    
        readableStream.on('end', () => {
            writableStream.end();
        });
    });
    
    server.listen(3000, () => {
        console.log('Server running on port 3000');
    });
    
  3. 使用 Transform 流时处理背压
    const { Transform } = require('stream');
    
    const transformStream = new Transform({
        transform(chunk, encoding, callback) {
            // 处理数据转换
            const transformedChunk = chunk.toString().toUpperCase();
            const writeResult = this.push(transformedChunk);
            if (!writeResult) {
                // 如果 push 返回 false,说明下游缓冲区已满,暂停读取
                this.pause();
            }
            callback();
        },
        flush(callback) {
            // 处理结束时的操作
            callback();
        }
    });
    
    transformStream.on('drain', () => {
        // 当缓冲区有空间时,恢复读取
        this.resume();
    });