没有任何内置的东西,所以你必须建立自己的。AFAIK,也没有用于此的库。
首先,从“延迟”开始——一个允许外部代码解决它的Promise:
class Deferral<T> {
constructor() {
this.promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
promise: Promise<T>;
resolve: (thenableOrResult?: T | PromiseLike<T>) => void;
reject: (error: any) => void;
}
然后就可以定义一个“等待队列”,它代表所有等待进入临界区的代码块:
class WaitQueue<T> {
private deferrals: Deferral<T>[];
constructor() {
this.deferrals = [];
}
get isEmpty(): boolean {
return this.deferrals.length === 0;
}
enqueue(): Promise<T> {
const deferral = new Deferral<T>();
this.deferrals.push(deferral);
return deferral.promise;
}
dequeue(result?: T) {
const deferral = this.deferrals.shift();
deferral.resolve(result);
}
}
最后,您可以定义一个异步信号量,如下所示:
export class AsyncSemaphore {
private queue: WaitQueue<void>;
private _count: number;
constructor(count: number = 0) {
this.queue = new WaitQueue<void>();
this._count = count;
}
get count(): number { return this._count; }
waitAsync(): Promise<void> {
if (this._count !== 0) {
--this._count;
return Promise.resolve();
}
return this.queue.enqueue();
}
release(value: number = 1) {
while (value !== 0 && !this.queue.isEmpty) {
this.queue.dequeue();
--value;
}
this._count += value;
}
}
用法示例:
async function performActionsInParallel() {
const semaphore = new AsyncSemaphore(10);
const listOfActions = [...];
const promises = listOfActions.map(async (action) => {
await semaphore.waitAsync();
try {
await doSomething(action);
}
finally {
semaphore.release();
}
});
const results = await Promise.all(promises);
}
此方法首先创建一个节流阀,然后立即启动所有异步操作。每个异步操作都会首先(异步地)等待信号量空闲,然后执行操作,最后释放信号量(允许另一个信号量进入)。当所有异步操作完成后,检索所有结果。
警告:此代码 100% 完全未经测试。我什至没有尝试过一次。