MST
星途 面试题库

面试题:Node.js 中 Child Process 异步执行复杂任务链的可靠性与故障处理

在一个 Node.js 应用中,需要通过 Child Process 异步执行一系列复杂的任务链,例如先进行数据预处理(调用外部脚本),然后执行数据转换(调用另一个外部工具),最后将结果存储到数据库(通过调用数据库客户端命令行工具)。每个任务都依赖前一个任务的成功执行。描述如何设计一个可靠的任务执行机制,确保在任何一个任务失败时,能够正确处理故障,回滚已执行的任务,并记录详细的错误信息。同时说明如何在高并发场景下保证整个任务链的正确执行,给出完整的设计思路和核心代码框架。
29.9万 热度难度
前端开发Node.js

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 任务串行执行:使用 async/awaitPromise 链来确保任务按顺序执行,前一个任务成功完成后才执行下一个。
  2. 错误处理:在每个任务执行时捕获错误,一旦某个任务失败,执行回滚操作并记录错误信息。
  3. 回滚机制:为每个任务设计相应的回滚逻辑,例如如果数据已部分存入数据库,需要设计删除或恢复操作。
  4. 高并发处理:使用队列控制并发任务数量,确保每个任务链的执行不会相互干扰。

核心代码框架

const { exec } = require('child_process');
const util = require('util');
const execPromise = util.promisify(exec);

// 模拟数据库操作
const databaseClient = {
    insertData: async (data) => {
        // 实际执行数据库插入操作
        console.log(`Inserting data: ${data}`);
    },
    deleteData: async (data) => {
        // 实际执行数据库删除操作
        console.log(`Deleting data: ${data}`);
    }
};

// 数据预处理任务
const preprocessData = async () => {
    try {
        const { stdout, stderr } = await execPromise('external_preprocess_script.sh');
        if (stderr) {
            throw new Error(stderr);
        }
        return stdout;
    } catch (error) {
        throw new Error(`Preprocess error: ${error.message}`);
    }
};

// 数据转换任务
const transformData = async (preprocessedData) => {
    try {
        const { stdout, stderr } = await execPromise(`external_transform_tool.sh ${preprocessedData}`);
        if (stderr) {
            throw new Error(stderr);
        }
        return stdout;
    } catch (error) {
        throw new Error(`Transform error: ${error.message}`);
    }
};

// 存储数据到数据库任务
const storeData = async (transformedData) => {
    try {
        await databaseClient.insertData(transformedData);
        return transformedData;
    } catch (error) {
        throw new Error(`Store error: ${error.message}`);
    }
};

// 回滚存储操作
const rollbackStore = async (dataToRollback) => {
    try {
        await databaseClient.deleteData(dataToRollback);
    } catch (error) {
        console.error(`Rollback store error: ${error.message}`);
    }
};

// 任务链执行
const executeTaskChain = async () => {
    let preprocessedData, transformedData, storedData;
    try {
        preprocessedData = await preprocessData();
        transformedData = await transformData(preprocessedData);
        storedData = await storeData(transformedData);
        console.log('All tasks completed successfully');
    } catch (error) {
        console.error(`Task failed: ${error.message}`);
        if (storedData) {
            await rollbackStore(storedData);
        }
    }
};

// 高并发控制队列
class TaskQueue {
    constructor(concurrency) {
        this.concurrency = concurrency;
        this.tasks = [];
        this.running = 0;
    }

    addTask(task) {
        this.tasks.push(task);
        this.runNext();
    }

    runNext() {
        while (this.running < this.concurrency && this.tasks.length > 0) {
            const task = this.tasks.shift();
            this.running++;
            task()
               .then(() => {
                    this.running--;
                    this.runNext();
                })
               .catch((error) => {
                    console.error(`Task in queue failed: ${error.message}`);
                    this.running--;
                    this.runNext();
                });
        }
    }
}

// 使用队列控制并发执行任务链
const queue = new TaskQueue(5);
for (let i = 0; i < 10; i++) {
    queue.addTask(() => executeTaskChain());
}

代码说明

  1. 任务函数preprocessDatatransformDatastoreData 使用 execPromise 来异步执行外部脚本和工具,并在出现错误时抛出异常。
  2. 回滚函数rollbackStore 用于在存储任务失败时回滚数据库操作。
  3. 任务链执行executeTaskChain 使用 async/await 按顺序执行任务,并在任务失败时进行回滚和错误处理。
  4. 高并发控制TaskQueue 类用于控制并发任务数量,确保在高并发场景下任务链的正确执行。每个任务链通过 queue.addTask(() => executeTaskChain()) 添加到队列中执行。