设计思路
- 任务串行执行:使用
async/await
或 Promise
链来确保任务按顺序执行,前一个任务成功完成后才执行下一个。
- 错误处理:在每个任务执行时捕获错误,一旦某个任务失败,执行回滚操作并记录错误信息。
- 回滚机制:为每个任务设计相应的回滚逻辑,例如如果数据已部分存入数据库,需要设计删除或恢复操作。
- 高并发处理:使用队列控制并发任务数量,确保每个任务链的执行不会相互干扰。
核心代码框架
const { exec } = require('child_process');
const util = require('util');
const execPromise = util.promisify(exec);
// 模拟数据库操作
const databaseClient = {
insertData: async (data) => {
// 实际执行数据库插入操作
console.log(`Inserting data: ${data}`);
},
deleteData: async (data) => {
// 实际执行数据库删除操作
console.log(`Deleting data: ${data}`);
}
};
// 数据预处理任务
const preprocessData = async () => {
try {
const { stdout, stderr } = await execPromise('external_preprocess_script.sh');
if (stderr) {
throw new Error(stderr);
}
return stdout;
} catch (error) {
throw new Error(`Preprocess error: ${error.message}`);
}
};
// 数据转换任务
const transformData = async (preprocessedData) => {
try {
const { stdout, stderr } = await execPromise(`external_transform_tool.sh ${preprocessedData}`);
if (stderr) {
throw new Error(stderr);
}
return stdout;
} catch (error) {
throw new Error(`Transform error: ${error.message}`);
}
};
// 存储数据到数据库任务
const storeData = async (transformedData) => {
try {
await databaseClient.insertData(transformedData);
return transformedData;
} catch (error) {
throw new Error(`Store error: ${error.message}`);
}
};
// 回滚存储操作
const rollbackStore = async (dataToRollback) => {
try {
await databaseClient.deleteData(dataToRollback);
} catch (error) {
console.error(`Rollback store error: ${error.message}`);
}
};
// 任务链执行
const executeTaskChain = async () => {
let preprocessedData, transformedData, storedData;
try {
preprocessedData = await preprocessData();
transformedData = await transformData(preprocessedData);
storedData = await storeData(transformedData);
console.log('All tasks completed successfully');
} catch (error) {
console.error(`Task failed: ${error.message}`);
if (storedData) {
await rollbackStore(storedData);
}
}
};
// 高并发控制队列
class TaskQueue {
constructor(concurrency) {
this.concurrency = concurrency;
this.tasks = [];
this.running = 0;
}
addTask(task) {
this.tasks.push(task);
this.runNext();
}
runNext() {
while (this.running < this.concurrency && this.tasks.length > 0) {
const task = this.tasks.shift();
this.running++;
task()
.then(() => {
this.running--;
this.runNext();
})
.catch((error) => {
console.error(`Task in queue failed: ${error.message}`);
this.running--;
this.runNext();
});
}
}
}
// 使用队列控制并发执行任务链
const queue = new TaskQueue(5);
for (let i = 0; i < 10; i++) {
queue.addTask(() => executeTaskChain());
}
代码说明
- 任务函数:
preprocessData
、transformData
和 storeData
使用 execPromise
来异步执行外部脚本和工具,并在出现错误时抛出异常。
- 回滚函数:
rollbackStore
用于在存储任务失败时回滚数据库操作。
- 任务链执行:
executeTaskChain
使用 async/await
按顺序执行任务,并在任务失败时进行回滚和错误处理。
- 高并发控制:
TaskQueue
类用于控制并发任务数量,确保在高并发场景下任务链的正确执行。每个任务链通过 queue.addTask(() => executeTaskChain())
添加到队列中执行。