我如何在 Rx Observable 上“等待”?

IT技术 javascript rxjs ecmascript-2016
2021-02-12 07:29:03

我希望能够等待一个可观察的对象,例如

const source = Rx.Observable.create(/* ... */)
//...
await source;

天真的尝试导致等待立即解决而不是阻止执行

编辑:我的完整预期用例的伪代码是:

if (condition) {
  await observable;
}
// a bunch of other code

我知道我可以将其他代码移到另一个单独的函数中并将其传递给订阅回调,但我希望能够避免这种情况。

6个回答

您必须将Promise传递给await. 将 observable 的下一个事件转换为 promise 并等待它。

if (condition) {
  await observable.first().toPromise();
}

编辑说明:此答案最初使用 .take(1) 但已更改为使用 .first() 这避免了如果流在值通过之前结束则永远不会解决 Promise 的问题。

从 RxJS v8 开始,toPromise将被删除。相反,上面可以替换为await firstValueFrom(observable)

现在 toPromise() 被弃用了,我们应该怎么做呢?
2021-03-18 07:29:03
@apricity @AgentME 实际上,在这种情况下,take(1)也不应该使用first()由于您期望恰好发生 ONE 事件,因此您应该使用single()which 将在超过 1 个时抛出异常,而在没有时不抛出异常。如果有多个,则您的代码/数据模型等可能有问题。如果您不使用 single,您最终将任意选择返回的第一个项目,而没有警告还有更多。您必须注意上游数据源的谓词以始终保持相同的顺序。
2021-03-25 07:29:03
不要忘记导入: import 'rxjs/add/operator/first';
2021-03-30 07:29:03
@Jus10 我找不到任何表明toPromise()已弃用的内容。你从哪里读到的?
2021-04-05 07:29:03
@apricity 如果完成时没有值,first()将导致拒绝,并 take(1)导致未决的Promise。
2021-04-11 07:29:03

使用新的firstValueFrom()lastValueFrom()代替的toPromise(),正如这里所指出的,从 RxJS 7 开始不推荐使用,并将在 RxJS 8 中删除。

import { firstValueFrom} from 'rxjs';
import { lastValueFrom } from 'rxjs';

this.myProp = await firstValueFrom(myObservable$);
this.myProp = await lastValueFrom(myObservable$);

这在 RxJS 7+ 中可用

请参阅:https : //indepth.dev/rxjs-heads-up-topromise-is-being-deprecated/

它可能必须是

await observable.first().toPromise();

正如之前评论中所指出的,当有空的已完成可观察对象时take(1)first()运算符之间存在重大差异

Observable.empty().first().toPromise()会导致拒收,EmptyError表示可以相应处理,因为真的没有value。

而且Observable.empty().take(1).toPromise()会导致分辨率undefined值。

其实take(1)不会产生一个悬而未决的Promise。它将产生一个用 解决的Promiseundefined
2021-03-24 07:29:03
谢谢关注,说得对。我不确定为什么帖子会有所不同,可能行为在某个时候发生了变化。
2021-04-13 07:29:03

你需要await一个Promise,所以你会想要使用toPromise(). 有关更多详细信息,请参阅内容toPromise()

编辑:

.toPromise()现在在 RxJS 7 中被弃用(来源:https ://rxjs.dev/deprecations/to-promise

新答案:

作为对已弃用的 toPromise() 方法的替代,您应该使用两个内置静态转换函数 firstValueFrom 或 lastValueFrom 之一。

例子:

import { interval, lastValueFrom } from 'rxjs';
import { take } from 'rxjs/operators';
 
async function execute() {
  const source$ = interval(2000).pipe(take(10));
  const finalNumber = await lastValueFrom(source$);
  console.log(`The final number is ${finalNumber}`);
}
 
execute();
 
// Expected output:
// "The final number is 9"

旧答案:

如果toPromise不推荐使用,您可以使用,.pipe(take(1)).toPromise但正如您在此处看到的那样它没有被弃用。

所以请使用toPromise(RxJs 6) 说:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = sample('First Example')
  .toPromise()
  //output: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

异步/等待示例:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = await sample('First Example').toPromise()
// output: 'First Example'
console.log('From Promise:', result);

在这里阅读更多

Rx 是什么意思?
2021-03-18 07:29:03
toPromise 已被弃用,它已在 RxJS 8 中删除。您链接中的文档已过时。
2021-03-27 07:29:03