如何在 JS/TS 中实现伪阻塞异步队列?

IT技术 javascript typescript asynchronous promise async-await
2021-03-16 02:58:49

所以这是一个矛盾的说法:我想在 javascript/typescript 中创建一个异步阻塞队列(如果你可以在没有 typescript 的情况下实现它,那很好)。基本上我想实现一些类似于 Java 的BlockingQueue期望而不是它实际上被阻塞,它将是异步的,我可以等待出队。

这是我要实现的接口:

interface AsyncBlockingQueue<T> {
  enqueue(t: T): void;
  dequeue(): Promise<T>;
}

我会像这样使用它:

// enqueue stuff somewhere else

async function useBlockingQueue() {
  // as soon as something is enqueued, the promise will be resolved:
  const value = await asyncBlockingQueue.dequeue();
  // this will cause it to await for a second value
  const secondValue = await asyncBlockingQueue.dequeue();
}

有任何想法吗?

2个回答

这实际上很简单,dequeue将创建一个enqueue将解决的Promise我们只需要将解析器保持在队列中 - 并且还关心值在出队之前入队的情况,将已经履行的Promise保持在队列中。

class AsyncBlockingQueue {
  constructor() {
    // invariant: at least one of the arrays is empty
    this.resolvers = [];
    this.promises = [];
  }
  _add() {
    this.promises.push(new Promise(resolve => {
      this.resolvers.push(resolve);
    }));
  }
  enqueue(t) {
    // if (this.resolvers.length) this.resolvers.shift()(t);
    // else this.promises.push(Promise.resolve(t));
    if (!this.resolvers.length) this._add();
    this.resolvers.shift()(t);
  }
  dequeue() {
    if (!this.promises.length) this._add();
    return this.promises.shift();
  }
  // now some utilities:
  isEmpty() { // there are no values available
    return !this.promises.length; // this.length <= 0
  }
  isBlocked() { // it's waiting for values
    return !!this.resolvers.length; // this.length < 0
  }
  get length() {
    return this.promises.length - this.resolvers.length;
  }
  [Symbol.asyncIterator]() {
    // Todo: Use AsyncIterator.from()
    return {
      next: () => this.dequeue().then(value => ({done: false, value})),
      [Symbol.asyncIterator]() { return this; },
    };
  }
}

我不知道 TypeScript,但大概添加必要的类型注释很简单。

为了获得更好的性能,请使用带有循环缓冲区的 Queue 实现而不是普通数组,例如this one您也可能只使用一个队列并记住您当前是存储Promise还是解析器。

@Bergi 在这个解决方案中,为什么要签if (!this.resolvers.length)enqueue该逻辑似乎对 Promise(和解析器)的创建进行编码,然后立即删除解析器。我错过了什么?解析器数组的长度是否超过 1?
2021-04-25 02:58:49
我要感谢 stackoverflow 让我添加更多内容!以达到最小字符数
2021-05-07 02:58:49
@52d6c6af 不,我认为这与我所知道的蹦床无关
2021-05-08 02:58:49
@ 52d6c6af解析器的长度变得高于0时有更多的dequeue()比的呼叫enqueue()的呼叫,Promise的长度变得高于0时有更多的enqueue()比的呼叫dequeue()的呼叫。
2021-05-14 02:58:49

这只是@Bergi 的答案,但是使用 typescript + generics 进行了一些修改,以使其在我的 typescript 窥视的严格模式下工作。

class AsyncBlockingQueue<T> {
  private _promises: Promise<T>[];
  private _resolvers: ((t: T) => void)[];

  constructor() {
    this._resolvers = [];
    this._promises = [];
  }

  private _add() {
    this._promises.push(new Promise(resolve => {
      this._resolvers.push(resolve);
    }));
  }

  enqueue(t: T) {
    if (!this._resolvers.length) this._add();
    const resolve = this._resolvers.shift()!;
    resolve(t);
  }

  dequeue() {
    if (!this._promises.length) this._add();
    const promise = this._promises.shift()!;
    return promise;
  }

  isEmpty() {
    return !this._promises.length;
  }

  isBlocked() {
    return !!this._resolvers.length;
  }

  get length() {
    return this._promises.length - this._resolvers.length;
  }
}
不应该发生”真的是“永远不会发生” - 有一个检查会在数组为空时添加元素,然后再删除:-)
2021-04-23 02:58:49
@Bergi 更新以匹配。typescript类型 Array.prototype.shift 因为<T>() => T | undefined我不得不undefined从类型中删除不过我看到了你的支票:)
2021-05-05 02:58:49