class AsyncIterableLimit {
constructor(data, limit) {
this.data = data;
this.limit = limit;
this.index = 0;
this.pendingCount = 0;
this.queue = [];
}
async *[Symbol.asyncIterator]() {
while (this.index < this.data.length || this.pendingCount > 0) {
while (this.pendingCount < this.limit && this.index < this.data.length) {
const task = this.data[this.index++];
this.pendingCount++;
const result = await new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject });
this.runNext();
});
yield result;
}
await new Promise(resolve => setTimeout(resolve, 100));
}
}
runNext() {
if (this.queue.length === 0) return;
const { task, resolve, reject } = this.queue.shift();
setTimeout(() => {
task()
.then(resolve)
.catch(reject)
.finally(() => {
this.pendingCount--;
this.runNext();
});
}, 1000);
}
}
// 使用示例
const data = [
() => Promise.resolve(1),
() => Promise.resolve(2),
() => Promise.resolve(3),
() => Promise.resolve(4),
() => Promise.resolve(5),
() => Promise.resolve(6)
];
const asyncIterable = new AsyncIterableLimit(data, 3);
(async () => {
for await (const result of asyncIterable) {
console.log(result);
}
})();
设计思路
- 类的初始化:
AsyncIterableLimit
类接受一个包含异步任务的数组data
和最大并发数limit
作为参数。初始化一些计数器和队列。
- 异步迭代器实现:
- 通过实现
Symbol.asyncIterator
方法来使对象成为异步可迭代对象。
- 使用
while
循环,只要还有未处理的数据或者有正在执行的任务,就持续迭代。
- 在内部
while
循环中,只要当前正在执行的任务数小于限制并且还有未处理的数据,就取出新的任务并通过Promise
包装,等待任务完成后yield
结果。
- 任务执行控制:
runNext
方法负责从队列中取出任务并执行,执行完成后更新正在执行的任务数,并递归调用自身以执行下一个任务。
- 每个任务通过
setTimeout
模拟异步操作。
- 使用示例:
- 创建
AsyncIterableLimit
实例,传入异步任务数组和限制数。
- 使用
for await...of
循环迭代异步可迭代对象,依次处理每个任务的结果。