设计思路
- 任务表示:将每个异步任务定义为返回 Promise 的函数,任务依赖通过在函数参数中传入依赖任务的结果来体现。
- 依赖解析:构建一个任务图,记录每个任务及其依赖关系。通过拓扑排序确定任务执行顺序,确保依赖的任务先执行。
- 并发控制:使用队列来管理任务执行,控制同时执行的任务数量,避免因资源限制导致系统崩溃。
- 失败重试:在任务执行失败时,根据设定的重试次数和重试间隔进行重试。
- 性能优化:利用 Promise.allSettled 来并行执行无依赖关系的任务,提高整体执行效率。
关键代码实现
class TaskExecutor {
constructor(maxConcurrency) {
this.maxConcurrency = maxConcurrency;
this.taskQueue = [];
this.runningTasks = new Set();
}
addTask(task, dependencies = []) {
this.taskQueue.push({ task, dependencies });
}
async execute() {
const sortedTasks = this.topologicalSort();
for (const { task, dependencies } of sortedTasks) {
while (this.runningTasks.size >= this.maxConcurrency) {
await new Promise(resolve => setTimeout(resolve, 100));
}
const depResults = await Promise.allSettled(dependencies.map(dep => dep()));
const results = depResults.map(result => {
if (result.status === 'fulfilled') {
return result.value;
} else {
throw result.reason;
}
});
const executeTask = async () => {
const maxRetries = 3;
const retryInterval = 1000;
for (let i = 0; i < maxRetries; i++) {
try {
return await task(...results);
} catch (error) {
if (i === maxRetries - 1) {
throw error;
}
await new Promise(resolve => setTimeout(resolve, retryInterval));
}
}
};
const taskPromise = executeTask();
this.runningTasks.add(taskPromise);
taskPromise.then(() => this.runningTasks.delete(taskPromise)).catch(() => this.runningTasks.delete(taskPromise));
}
await Promise.all(this.runningTasks);
}
topologicalSort() {
const inDegree = {};
const graph = {};
for (const { task, dependencies } of this.taskQueue) {
inDegree[task] = 0;
graph[task] = [];
}
for (const { task, dependencies } of this.taskQueue) {
for (const dep of dependencies) {
if (!graph[dep]) {
graph[dep] = [];
}
graph[dep].push(task);
inDegree[task] = (inDegree[task] || 0) + 1;
}
}
const queue = [];
for (const task in inDegree) {
if (inDegree[task] === 0) {
queue.push({ task, dependencies: [] });
}
}
const sortedTasks = [];
while (queue.length > 0) {
const { task, dependencies } = queue.shift();
sortedTasks.push({ task, dependencies });
if (graph[task]) {
for (const neighbor of graph[task]) {
inDegree[neighbor]--;
if (inDegree[neighbor] === 0) {
const neighborDeps = this.taskQueue.find(t => t.task === neighbor).dependencies;
queue.push({ task: neighbor, dependencies: neighborDeps });
}
}
}
}
return sortedTasks;
}
}
使用示例
// 定义任务
const taskA = () => Promise.resolve('A result');
const taskB = (aResult) => Promise.resolve(`B result with ${aResult}`);
const taskC = () => Promise.reject('C failed');
const executor = new TaskExecutor(2);
executor.addTask(taskA);
executor.addTask(taskB, [taskA]);
executor.addTask(taskC);
executor.execute().then(() => {
console.log('All tasks completed');
}).catch(error => {
console.error('Error in task execution:', error);
});