RXJS 等待数组中的所有 observables 完成(或错误)

IT技术 javascript rxjs
2021-03-01 07:44:34

我正在将 observables 推入这样的数组中......

var tasks$ = [];
tasks$.push(Observable.timer(1000));
tasks$.push(Observable.timer(3000));
tasks$.push(Observable.timer(10000));

我想要一个在所有任务 $ 完成时发出的 Observable。请记住,在实践中,tasks$ 没有已知数量的 Observable。

我已经尝试过,Observable.zip(tasks$).subscribe()但如果只有 1 个任务,这似乎失败了,这让我相信 ZIP 需要偶数个元素才能按我期望的方式工作。

我已经尝试过,Observable.concat(tasks$).subscribe()但是 concat 运算符的结果似乎只是一个可观察的数组……例如,与输入基本相同。你甚至不能打电话订阅它。

在 C# 中,这类似于Task.WhenAll(). 在 ES6 中,它类似于Promise.all().

我遇到了许多 SO 问题,但它们似乎都在处理等待已知数量的流(例如将它们映射在一起)。

4个回答

如果您想编写一个在所有源 observable 完成时发出的 observable,您可以使用forkJoin

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/first';

var tasks$ = [];
tasks$.push(Observable.timer(1000).first());
tasks$.push(Observable.timer(3000).first());
tasks$.push(Observable.timer(10000).first());
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });
我不想一直唠叨你,但我认为结果不会显示出来,因为observable的流程不完整。可能只是在计时器调用或 .take(n) 之后添加 .first() 以获得更有趣的结果。forkJoin 是我正在寻找的运营商,谢谢@cartant!
2021-04-26 07:44:34
对于 rxjs 6:import {forkJoin} from 'rxjs';forkJoin(...tasks$).subscribe(results => { console.log(results); });.
2021-05-03 07:44:34
小心如果tasks是动态构建并且为空,forkJoin 将停止可观察序列。请在此处查看我的答案以获取更多信息stackoverflow.com/a/42622968/1224564
2021-05-12 07:44:34
第三个 observable 的 O 不是声明的大写,因此导致 observable 未定义。此外,任务在订阅之前缺少结束美元符号。我无法编辑答案,因为: 编辑必须至少为 6 个字符;这篇文章还有什么需要改进的吗?
2021-05-12 07:44:34
@Simon 这就是我不喜欢芬兰符号的原因之一——尽管我确实在某些地方使用了它。task$s可以说更合适,但看起来超级奇怪。
2021-05-16 07:44:34

您可以使用zip.

组合多个 Observable 以创建一个 Observable,其值是根据每个输入 Observable 的值按顺序计算的。

const obsvA = this._service.getObjA();
const obsvB = this._service.getObjB();
// or with array
// const obsvArray = [obsvA, obsvB];

const zip = Observable.zip(obsvA, obsvB);
// or with array
// const zip = Observable.zip(...obsvArray);
zip.subscribe(
  result => console.log(result), // result is an array with the responses [respA, respB]
);

需要考虑的事项:

  • 不需要是偶数个可观察对象。
  • zip 视觉上
  • 在此处输入图片说明 正如这里所说

    zip 操作符将订阅所有内部 observable,等待每个都发出一个值。一旦发生这种情况,将发出具有相应索引的所有值。这将一直持续到至少一个内部 observable 完成。

  • 当一个 observables 抛出一个错误(或什至两者)时,订阅将关闭(onComplete调用完成时),并且使用一个onError方法,你只会得到第一个错误。
  • zip.subscribe(
      result => console.log(result), // result is an array with the responses [respA, respB]
      error => console.log(error), // will return the error message of the first observable that throws error and then finish it
      () => console.log ('completed after first error or if first observable finishes)
    );
    
    注意:对于 rxjs6,您可以zip在管道中使用,但必须使用zipAll. 如果您确实使用 zip,您将收到弃用警告,并且不会通过正确的输入。但是,您可以仅zip(...)(不再Observable.zip(...)静态地使用 zip
    2021-04-22 07:44:34
    // waits for all Observables no matter of success/fails each of them
    // returns array of items
    // each item represent even first value of Observable or it's error
    export function waitAll(args: Observable<any>[]): Observable<any[]> {
      const final = new Subject<any[]>();
      const flags = new Array(args.length);
      const result = new Array(args.length);
      let total = args.length;
      for (let i = 0; i < args.length; i++) {
        flags[i] = false;
        args[i].subscribe(
          res => {
            console.info('waitAll ' + i + ' ok ', res);
            if (flags[i] === false) {
              flags[i] = true;
              result[i] = res;
              total--;
              if (total < 1) {
                final.next(result);
              }
            }
          },
          error => {
            console.error('waitAll ' + i + ' failed ', error);
            if (flags[i] === false) {
              flags[i] = true;
              result[i] = error;
              total--;
              if (total < 1) {
                final.next(result);
              }
            }
          }
        );
      }
      return final.asObservable();
    }
    

    单元测试:

    describe('waitAll', () => {
      it('should wait for all observables', async () => {
        const o1 = new Subject();
        const o2 = new Subject();
        const o3 = new Subject();
    
        const o = waitAll([o1, o2, o3]);
        const res = {arr: []};
        o.subscribe(result => res.arr = result, err => res.arr = []);
    
        expect(res.arr).toEqual([]);
        o1.next('success1');
        expect(res.arr).toEqual([]);
        o2.error('failed2')
        expect(res.arr).toEqual([]);
        o3.next('success3')
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
    
        o1.next('success1*');
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
        o2.error('failed2*')
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
        o3.next('success3*')
        expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
      });
    });
    
    我觉得这是唯一的答案,而且效果很好。它是唯一一个在一次调用失败后不会自行终止的版本。
    2021-04-21 07:44:34
    这绝对是最接近的,但是如果内部 observable 被取消(takeUntil例如取消订阅),那么外部/最终 observable 永远不会完成。
    2021-05-03 07:44:34

    对我来说,这个样本是最好的解决方案。

    const source = Observable.interval(500);
    const example = source.sample(Observable.interval(2000));
    const subscribe = example.subscribe(val => console.log('sample', val));
    

    所以..只有当第二个(示例)发出时 - 你会看到第一个(源)的最后发出的值。

    在我的任务中,我等待表单验证和其他 DOM 事件。

    这些将依次执行,一个接一个。您不会获得同时执行,这是使用zipforkJoin
    2021-05-16 07:44:34