MST

星途 面试题库

面试题:Node.js流式处理在高并发大数据场景下的优化

在一个Node.js应用中,需要从多个不同的数据源(如多个远程API接口,且每个接口返回大数据量)获取数据,并对这些数据进行合并、过滤等处理后存储到本地数据库。由于数据源返回数据速度不同,可能出现数据堆积导致内存溢出。请设计一个基于Node.js流式处理的方案,有效解决高并发下的数据处理和内存管理问题,详细说明各个模块的设计思路和数据流走向。
10.7万 热度难度
前端开发Node.js

知识考点

AI 面试

面试题答案

一键面试

1. 整体设计思路

采用Node.js的流(Stream)来处理数据,通过可读流(Readable Stream)从各个数据源获取数据,经过处理流(Transform Stream)进行合并、过滤等操作,最后使用可写流(Writable Stream)将数据存储到本地数据库。这样可以避免一次性将大量数据加载到内存中,有效管理内存。

2. 模块设计

数据源读取模块

  • 设计思路:为每个数据源创建一个可读流。利用http.request(针对HTTP API)等方式获取数据,将响应数据作为可读流的输入。可以使用http.requestresponse事件来处理接收到的数据,并通过stream.pipeline将其连接到后续处理流。
  • 代码示例
const http = require('http');
const { pipeline } = require('stream');

function createDataSourceReadStream(url) {
    return new Promise((resolve, reject) => {
        const req = http.request(url, (res) => {
            if (res.statusCode < 200 || res.statusCode >= 300) {
                return reject(new Error(`HTTP error! status: ${res.statusCode}`));
            }
            resolve(res);
        });
        req.on('error', reject);
        req.end();
    });
}

数据合并模块

  • 设计思路:使用Transform流来合并来自多个数据源的数据。在_transform方法中,将接收到的数据块进行合并处理,例如简单地拼接字符串数据(如果数据是文本格式),或合并数组数据(如果数据是数组格式)。
  • 代码示例
const { Transform } = require('stream');

class DataMerger extends Transform {
    constructor() {
        super({ objectMode: true });
        this.data = [];
    }
    _transform(chunk, encoding, callback) {
        this.data.push(chunk);
        callback();
    }
    _flush(callback) {
        this.push(this.data.flat());
        callback();
    }
}

数据过滤模块

  • 设计思路:同样基于Transform流,在_transform方法中根据设定的过滤条件对数据进行过滤。例如,如果数据是对象数组,可以根据对象的某个属性值进行过滤。
  • 代码示例
class DataFilter extends Transform {
    constructor(filterCondition) {
        super({ objectMode: true });
        this.filterCondition = filterCondition;
    }
    _transform(chunk, encoding, callback) {
        if (this.filterCondition(chunk)) {
            this.push(chunk);
        }
        callback();
    }
}

数据存储模块

  • 设计思路:使用Writable流将处理后的数据存储到本地数据库。例如,如果使用SQLite数据库,可以利用sqlite3库的run方法将数据插入到数据库表中。在_write方法中执行数据库写入操作。
  • 代码示例
const sqlite3 = require('sqlite3').verbose();

class DatabaseWriter extends Writable {
    constructor(dbPath) {
        super({ objectMode: true });
        this.db = new sqlite3.Database(dbPath);
        this.createTable();
    }
    createTable() {
        this.db.run(`CREATE TABLE IF NOT EXISTS my_data (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            data TEXT
        )`);
    }
    _write(chunk, encoding, callback) {
        this.db.run('INSERT INTO my_data (data) VALUES (?)', [chunk], callback);
    }
    _final(callback) {
        this.db.close(callback);
    }
}

3. 数据流走向

  1. 数据源读取:每个数据源的可读流通过createDataSourceReadStream创建,开始从远程API获取数据。
  2. 数据合并:多个数据源的可读流通过stream.pipeline连接到DataMerger的实例,合并后的数据流继续向下流动。
  3. 数据过滤:合并后的数据流入DataFilter实例,经过过滤后的数据继续向下传输。
  4. 数据存储:过滤后的数据最终流入DatabaseWriter实例,写入本地数据库完成整个处理流程。

示例使用stream.pipeline连接各个流:

const { pipeline } = require('stream');

const source1 = createDataSourceReadStream('url1');
const source2 = createDataSourceReadStream('url2');

const merger = new DataMerger();
const filter = new DataFilter((data) => data.someCondition);
const writer = new DatabaseWriter('my.db');

pipeline(
    source1,
    merger,
    filter,
    writer,
    (err) => {
        if (err) {
            console.error('Pipeline failed:', err);
        } else {
            console.log('Pipeline succeeded');
        }
    }
);

pipeline(
    source2,
    merger,
    filter,
    writer,
    (err) => {
        if (err) {
            console.error('Pipeline failed:', err);
        } else {
            console.log('Pipeline succeeded');
        }
    }
);

这样通过流式处理,可以在高并发情况下有效处理数据,避免内存溢出问题。