面试题答案
一键面试1. 整体设计思路
采用Node.js的流(Stream)来处理数据,通过可读流(Readable Stream)从各个数据源获取数据,经过处理流(Transform Stream)进行合并、过滤等操作,最后使用可写流(Writable Stream)将数据存储到本地数据库。这样可以避免一次性将大量数据加载到内存中,有效管理内存。
2. 模块设计
数据源读取模块
- 设计思路:为每个数据源创建一个可读流。利用
http.request
(针对HTTP API)等方式获取数据,将响应数据作为可读流的输入。可以使用http.request
的response
事件来处理接收到的数据,并通过stream.pipeline
将其连接到后续处理流。 - 代码示例:
const http = require('http');
const { pipeline } = require('stream');
function createDataSourceReadStream(url) {
return new Promise((resolve, reject) => {
const req = http.request(url, (res) => {
if (res.statusCode < 200 || res.statusCode >= 300) {
return reject(new Error(`HTTP error! status: ${res.statusCode}`));
}
resolve(res);
});
req.on('error', reject);
req.end();
});
}
数据合并模块
- 设计思路:使用
Transform
流来合并来自多个数据源的数据。在_transform
方法中,将接收到的数据块进行合并处理,例如简单地拼接字符串数据(如果数据是文本格式),或合并数组数据(如果数据是数组格式)。 - 代码示例:
const { Transform } = require('stream');
class DataMerger extends Transform {
constructor() {
super({ objectMode: true });
this.data = [];
}
_transform(chunk, encoding, callback) {
this.data.push(chunk);
callback();
}
_flush(callback) {
this.push(this.data.flat());
callback();
}
}
数据过滤模块
- 设计思路:同样基于
Transform
流,在_transform
方法中根据设定的过滤条件对数据进行过滤。例如,如果数据是对象数组,可以根据对象的某个属性值进行过滤。 - 代码示例:
class DataFilter extends Transform {
constructor(filterCondition) {
super({ objectMode: true });
this.filterCondition = filterCondition;
}
_transform(chunk, encoding, callback) {
if (this.filterCondition(chunk)) {
this.push(chunk);
}
callback();
}
}
数据存储模块
- 设计思路:使用
Writable
流将处理后的数据存储到本地数据库。例如,如果使用SQLite数据库,可以利用sqlite3
库的run
方法将数据插入到数据库表中。在_write
方法中执行数据库写入操作。 - 代码示例:
const sqlite3 = require('sqlite3').verbose();
class DatabaseWriter extends Writable {
constructor(dbPath) {
super({ objectMode: true });
this.db = new sqlite3.Database(dbPath);
this.createTable();
}
createTable() {
this.db.run(`CREATE TABLE IF NOT EXISTS my_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data TEXT
)`);
}
_write(chunk, encoding, callback) {
this.db.run('INSERT INTO my_data (data) VALUES (?)', [chunk], callback);
}
_final(callback) {
this.db.close(callback);
}
}
3. 数据流走向
- 数据源读取:每个数据源的可读流通过
createDataSourceReadStream
创建,开始从远程API获取数据。 - 数据合并:多个数据源的可读流通过
stream.pipeline
连接到DataMerger
的实例,合并后的数据流继续向下流动。 - 数据过滤:合并后的数据流入
DataFilter
实例,经过过滤后的数据继续向下传输。 - 数据存储:过滤后的数据最终流入
DatabaseWriter
实例,写入本地数据库完成整个处理流程。
示例使用stream.pipeline
连接各个流:
const { pipeline } = require('stream');
const source1 = createDataSourceReadStream('url1');
const source2 = createDataSourceReadStream('url2');
const merger = new DataMerger();
const filter = new DataFilter((data) => data.someCondition);
const writer = new DatabaseWriter('my.db');
pipeline(
source1,
merger,
filter,
writer,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
pipeline(
source2,
merger,
filter,
writer,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
这样通过流式处理,可以在高并发情况下有效处理数据,避免内存溢出问题。