在 RxJS 中链接 Observable

IT技术 javascript promise reactive-programming rxjs rxjs5
2021-02-09 00:16:48

我正在学习 RxJS 和 Angular 2。假设我有一个带有多个异步函数调用的 promise 链,它依赖于前一个的结果,如下所示:

var promiseChain = new Promise((resolve, reject) => {
  setTimeout(() => {
    resolve(1);
  }, 1000);
}).then((result) => {
  console.log(result);

  return new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(result + 2);
    }, 1000);
  });
}).then((result) => {
  console.log(result);

  return new Promise((resolve, reject) => {
      setTimeout(() => {
      resolve(result + 3);
        }, 1000);
  });
});

promiseChain.then((finalResult) => {
  console.log(finalResult);
});

我尝试只使用 RxJS 而不使用 promise 来做同样的事情,结果如下:

var observableChain = Observable.create((observer) => {
  setTimeout(() => {
    observer.next(1);
    observer.complete();
  }, 1000);
}).flatMap((result) => {
  console.log(result);

  return Observable.create((observer) => {
    setTimeout(() => {
      observer.next(result + 2);
      observer.complete()
    }, 1000);
  });
}).flatMap((result) => {
  console.log(result);

  return Observable.create((observer) => {
    setTimeout(() => {
      observer.next(result + 3);
      observer.complete()
    }, 1000);
  });
});

observableChain.subscribe((finalResult) => {
  console.log(finalResult);
});

它产生与Promise链相同的输出。我的问题是

  1. 我这样做对吗?我可以对上述代码进行任何与 RxJS 相关的改进吗

  2. 如何让这个可观察链重复执行?即在最后添加另一个订阅只会产生额外的 6,尽管我希望它打印 1、3 和 6。

    observableChain.subscribe((finalResult) => { console.log(finalResult); });

    observableChain.subscribe((finalResult) => { console.log(finalResult); });

    1 3 6 6

1个回答

关于promise composition vs. Rxjs,由于这是一个常见问题,你可以参考之前关于SO的一些问题,其中:

基本上flatMap相当于Promise.then.

对于您的第二个问题,您是要重放已经发出的值,还是要在新值到达时对其进行处理?在第一种情况下,检查publishReplay操作员。在第二种情况下,标准订阅就足够了。但是,您可能需要注意寒冷。与热二分法取决于您的来源(参见热和冷可观察值:是否有“热”和“冷”运算符?有关概念的说明性解释)

.pipe(mergeMap(() => forkJoin({first: first$, second: second$,}))); 如果你想开始不止一个。
2021-04-05 00:16:48
RXJs 6+ 中不推荐使用 flatMap,请改用 mergeMap
2021-04-13 00:16:48