面试题答案
一键面试基于Node.js流式处理的系统架构设计
1. 数据采集
数据源
- 交易记录:可能来自电商平台的交易日志文件,或者数据库的交易表实时变更。
- 用户信息:存储在用户信息数据库,如MySQL、MongoDB等。
- 商品信息:同样存储在商品数据库中。
采集方式
- 日志文件:使用
fs.createReadStream
以流的方式读取日志文件。Node.js的流特性允许逐块读取文件,避免一次性加载整个大文件到内存,例如:
const fs = require('fs');
const readStream = fs.createReadStream('transaction_log.log', {
encoding: 'utf8',
highWaterMark: 16384 // 每次读取16KB
});
readStream.on('data', (chunk) => {
// 处理读取到的数据块
});
readStream.on('end', () => {
// 读取完成
});
- 数据库变更:对于关系型数据库(如MySQL),可以使用
mysql2
库结合其CHANGE DATA CATCHING
特性,以流的方式获取数据库变更。对于MongoDB,可使用mongodb
库的watch
方法监听集合的变更,以流的形式获取数据。
2. 数据传输
消息队列
使用RabbitMQ、Kafka等消息队列系统。Node.js客户端库(如amqplib
for RabbitMQ,kafka-node
for Kafka)可以创建生产者将采集到的数据发送到消息队列。
- 生产者:
const amqp = require('amqplib');
async function sendDataToQueue(data) {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'ecommerce_data';
await channel.assertQueue(queue, { durable: false });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(data)));
console.log('Data sent to the queue');
await channel.close();
await connection.close();
}
- 消费者:消费者从消息队列以流的方式接收数据。在Node.js中,可以使用
amqplib
的consume
方法(RabbitMQ)或kafka-node
的Consumer
类(Kafka)。这种方式可以确保数据的有序传输,并在高并发场景下有效缓冲数据。
3. 数据处理
数据处理模块
- 使用Node.js Stream API:创建Transform流对数据进行处理。例如,对于交易数据,可能需要解析JSON格式的数据、过滤无效数据、根据地区和时间进行分组等操作。
const { Transform } = require('stream');
class DataTransformer extends Transform {
constructor() {
super({ objectMode: true });
}
_transform(chunk, encoding, callback) {
// 解析JSON数据
const data = JSON.parse(chunk);
// 过滤无效数据
if (data.isValid) {
// 进行分组操作
const groupedData = groupByRegionAndTime(data);
this.push(groupedData);
}
callback();
}
}
const transformer = new DataTransformer();
- 分布式处理:为了提高可扩展性,可以使用
cluster
模块在多核CPU上创建多个工作进程并行处理数据。每个工作进程可以独立处理流数据,然后将结果汇总。
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
// 工作进程处理数据
const dataStream = getStreamFromQueue();
dataStream.pipe(new DataTransformer()).pipe(storeResult());
}
4. 数据存储
存储选择
- 时间序列数据库(如InfluxDB):适合存储按时间序列的分析结果,如不同时间段的商品销售趋势。Node.js有
influx
库用于与InfluxDB交互。
const Influx = require('influx');
const influx = new Influx({
host: 'localhost',
database: 'ecommerce_analytics',
schema: [
{
measurement: 'product_sales',
fields: {
quantity: Influx.FieldType.FLOAT
},
tags: ['region', 'product']
}
]
});
async function storeSalesData(data) {
await influx.writePoints([
{
measurement: 'product_sales',
tags: { region: data.region, product: data.product },
fields: { quantity: data.quantity }
}
]);
}
- 文档数据库(如MongoDB):用于存储复杂的分析结果,如用户购买行为模式。Node.js通过
mongodb
库进行操作。
容错性
在数据存储过程中,使用重试机制来处理可能的存储失败。例如,在向InfluxDB写入数据失败时,等待一段时间后重试:
async function storeSalesDataWithRetry(data, retries = 3) {
try {
await influx.writePoints([
{
measurement: 'product_sales',
tags: { region: data.region, product: data.product },
fields: { quantity: data.quantity }
}
]);
} catch (error) {
if (retries > 0) {
await new Promise(resolve => setTimeout(resolve, 1000));
await storeSalesDataWithRetry(data, retries - 1);
} else {
throw error;
}
}
}
5. 前端展示
实时数据获取
- WebSocket:在Node.js后端使用
ws
库创建WebSocket服务器,将实时分析结果推送给前端。
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
// 监听分析结果更新
const resultStream = getAnalysisResultStream();
resultStream.on('data', (data) => {
ws.send(JSON.stringify(data));
});
});
- 前端使用JavaScript的
WebSocket
API接收数据并更新页面:
const socket = new WebSocket('ws://localhost:8080');
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
// 更新前端图表或表格
};
- 使用实时图表库(如Chart.js结合其实时更新功能):在前端根据接收到的数据实时绘制图表,展示商品销售趋势、用户购买行为模式等分析结果。
可扩展性
为了应对大量前端连接,可以使用负载均衡器(如Nginx)来分发WebSocket连接到多个Node.js实例。同时,前端可以采用缓存策略,减少不必要的数据请求,提高系统整体性能。