如何让一个 Observable 序列在发射之前等待另一个完成?

IT技术 javascript observable rxjs
2021-02-24 20:13:47

说我有一个Observable,像这样:

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });

然后,我有第二个Observable

var two = someOtherObservable.take(1);

现在,我想subscribe()two的,但我想肯定使该one已完成之前,two用户被激发。

我可以使用什么样的缓冲方法two让第二个等待第一个完成?

我想我希望暂停two直到one完成。

6个回答

我能想到的几种方法

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
@mspasiuk 根据 OP 要求,他们只希望在第一个完成订阅第二个forkJoin同时订阅。
2021-04-22 20:13:47
@Spray'n'Pray 不,因为这会在收到第一个值后完成订阅,one因此它甚至不会最终订阅two
2021-04-28 20:13:47
为什么不使用Observable.forkJoin()请参阅此链接learnrxjs.io/operators/combination/forkjoin.html
2021-05-01 20:13:47
我最终使用pauseandresume而不是publishand connect,但示例二本质上是我采用的路线。
2021-05-05 20:13:47
这个方法是否总是使第一个 observable ( one)two在 subscribe() 函数中的第二个 ( )之前解析
2021-05-16 20:13:47

跳过直到()与最后()

skipUntil :忽略发出的项目,直到另一个 observable 发出

last:从序列中发出最后一个值(即等待它完成然后发出)

请注意,从传递给的 observable 发出的任何内容都skipUntil将取消跳过,这就是我们需要添加last()- 以等待流完成的原因。

main$.skipUntil(sequence2$.pipe(last()))

官方:https : //rxjs-dev.firebaseapp.com/api/operators/skipUntil


可能的问题:请注意,如果没有发出任何内容last(),它本身就会出错last()运营商确实有default配合使用的谓词参数,但只有当。我认为如果这种情况对您来说是个问题(如果sequence2$可以完成而不发出),那么其中一个应该可以工作(目前未经测试):

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

请注意,这undefined是要发出的有效项目,但实际上可以是任何值。另请注意,这是连接到sequence2$main$管道而不是管道。

你的语法是错误的。skipUntil 不能直接附加到 observable 否则你会得到以下错误:'属性'skipUntil'不存在于'Observable<any>'类型。您需要先通过 .pipe() 运行它
2021-05-05 20:13:47
是的,这是需要管道之前的旧答案。谢谢你提到它。我现在更新它,但我在我的手机上。随意编辑答案。
2021-05-05 20:13:47
很笨拙的演示:angular-vgznak.stackblitz.io需要点击打开控制台托盘
2021-05-07 20:13:47

如果要确保保留执行顺序,可以使用 flatMap 作为以下示例

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

结果将是:

"1"
"11"
"111"
"finished"

这是利用 switchMap 的结果选择器的另一种可能性

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */ 
  });

由于switchMap的结果选择器已经贬值,这里是一个更新的版本

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
  take(1),
  switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
  /* do something */ 
});

这是一种可重复使用的方法(它是typescript,但您可以将其调整为 js):

function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) => signal.pipe(
        first(),
        switchMap(_ => source),
    );
}

您可以像使用任何运算符一样使用它:

var two = someOtherObservable.pipe(waitFor(one), take(1));

它基本上是一个操作符,它推迟对源 observable 的订阅,直到信号 observable 发出第一个事件。

是否有此可重用功能的 rxswift 版本
2021-04-27 20:13:47