Hot 和 Cold observables:是否有“hot”和“cold”运算符?

IT技术 javascript angular rxjs reactive-programming rxjs5
2021-01-16 17:21:56

我回顾了以下 SO 问题: Hot 和 Cold observables 是什么?

总结一下:

  • 当一个冷的 observable 有一个观察者来消费它们时,它会发出它的值,即观察者接收到的值的序列与订阅时间无关。所有观察者都将使用相同的值序列。
  • 热 observable 发出与其订阅无关的值,即观察者接收到的值是订阅时间的函数。

然而,我觉得热与冷仍然是混淆的根源。所以这里是我的问题:

  • 默认情况下所有 rx observables 都是冷的(主体除外)?

    我经常读到事件是热可观察的典型隐喻,但我也读到它Rx.fromEvent(input, 'click')是冷可观察(?)。

  • 是否有/什么 Rx 操作符可以将冷的 observable 变成热的 observable(除了publish, 和share)?

    例如,它如何与 Rx 运算符一起工作withLatestFrom让我们cold$成为一个已在某处订阅的冷 observable。sth$.withLatestFrom(cold$,...)是一个热门的 observable 吗?

    或者,如果我sth1$.withLatestFrom(cold$,...), sth2$.withLatestFrom(cold$,...)订阅并订阅了sth1and sth2,我是否总是会看到两者的值相同sth

  • Rx.fromEvent正如其中一个答案中提到的那样,我认为会产生冷的可观察量,但事实并非如此。但是,我仍然对这种行为感到困惑:https : //codepen.io/anon/pen/NqQMJR? editors =101不同的订阅从同一个 observable 获得不同的值。click事件不是共享的吗?

4个回答

几个月后我又回到了我原来的问题,同时想分享所获得的知识。我将使用以下代码作为解释支持(jsfiddle):

var ta_count = document.getElementById('ta_count');
var ta_result = document.getElementById('ta_result');
var threshold = 3;

function emits ( who, who_ ) {return function ( x ) {
  who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}

var messages$ = Rx.Observable.create(function (observer){
  var count= 0;
  setInterval(function(){
    observer.onNext(++count);
  }, 1000)
})
.do(emits(ta_count, 'count'))
.map(function(count){return count < threshold})
.do(emits(ta_result, 'result'))

messages$.subscribe(function(){});

正如其中一个答案中提到的,定义一个 observable 会导致一系列回调和参数注册。必须启动数据流,这是通过subscribe函数完成的之后可以找到(为了说明而简化)详细流程。

简化流程图

默认情况下,Observable 是冷的。订阅 observable 将导致发生订阅的上游链。最后一个订阅导致执行一个函数,该函数将处理一个源并将其数据发送给它的观察者。

该观察者依次向下一个观察者发送数据,从而产生下游数据流,直至下沉观察者。下面的简化图显示了当两个订阅者订阅同一个 observable 时的订阅和数据流。

Cold observable 简化流程图

可以通过使用主题或通过multicast运算符(及其派生词,请参见下面的注释 3)来创建热可观察对象

multicast引擎盖下运营商利用一个主题,并返回一个可连接的可观察的。对运营商的所有订阅都将是对内部主题的订阅。connect被调用时,内部主体订阅上游可观察对象,数据流向下游。主体在内部操作订阅观察者的列表,并将传入数据多播给所有订阅的观察者。

下图总结了这种情况。

Hot observable 简化流程图

最后,更重要的是理解观察者模式导致的数据流和运算符的实现。

例如,如果obs是热的,是hotOrCold = obs.op1冷的还是热的?不管答案是什么:

  • 如果没有订阅者obs.op1,则没有数据流过op1如果有订阅者到 hot obs,那意味着obs.op1可能会丢失一些数据。
  • 假设它op1不是类似多播的运算符,订阅两次 tohotOrCold将订阅两次op1,并且每个值 fromobs将流过两次op1

注意事项:

  1. 此信息应该对 Rxjs v4 有效。虽然第 5 版经历了相当大的变化,但其中大部分仍然适用。
  2. 取消订阅、错误和完成流程未表示,因为它们不在问题的范围内。调度程序也不考虑在内。除其他外,它们影响数据流的时间,但先验地不影响其方向和内容。
  3. 根据用于组播的主题类型,有不同的派生组播算子:

Subject type | `Publish` Operator | `Share` operator ------------------ | --------------------------- | ----------------- Rx.Subject | Rx.Observable.publish | share Rx.BehaviorSubject | Rx.Observable.publishValue | shareValue Rx.AsyncSubject | Rx.Observable.publishLast | N/A Rx.ReplaySubject | Rx.Observable.replay | shareReplay

更新:另请参阅本·莱什 (Ben Lesh)撰写的有关该主题的以下文章 (here和 there )。

有关主题的更多详细信息可以在另一个 SO 问题中找到:不同 RxJS 主题的语义是什么?

很棒的学习。感谢您分享这个!你能解释一下“发布运营商”和“共享运营商”是什么意思吗?
2021-03-18 17:21:56
publish运营商是multicast有不同类型的作为参数传递科目运营商。对于publish例如,在这里看到:github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/...share运算符是通过附加.refCount()publish运算符获得的对于share例如,参照github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/...
2021-03-27 17:21:56
正确记录主题仍然很好(当前的文档可能很难弄清楚)。我认为最好的方法是,如果您针对该主题记录一个特定的问题(没有双关语)。在那里回答这个问题超出了范围,它将允许添加更多细节。现在的答案已经很长了,我想把它集中在这个问题上。问题可以是what are the semantics of the miscellaneous kinds of Rxjs subjects或您找到的任何方式来简明扼要地表述它。
2021-03-28 17:21:56
你说的对。我已经发布了一个问题:stackoverflow.com/questions/34849873/...
2021-04-05 17:21:56
还有一个关于“onComplete”的问题在这里stackoverflow.com/questions/34850043/...也许你已经为自己详细阐述了这一点。接线员的头衔现在很清楚了,谢谢。
2021-04-05 17:21:56

您的总结和链接的问题都是正确的,我认为这些术语可能会让您感到困惑。我建议您将热和冷 observables 视为主动和被动 observables(分别)。

也就是说,无论是否有人订阅,活动(热)可观察对象都会发出项目。再举一个规范的例子,无论有人是否在听,按钮点击事件都会发生。这种区别很重要,因为,例如,如果我单击一个按钮然后订阅按钮点击(按该顺序),我将看不到已经发生的按钮点击。

被动(冷) observable 将等到订阅者存在后再发出项目。想象一个按钮,在有人监听事件之前你不能点击它——这将确保你总是看到每一个点击事件。

默认情况下,所有 Rx 可观察对象都是“冷的”(或被动的)吗?不,Rx.fromEvent(input, 'click')例如是一个热(或活跃)的可观察对象。

我还读到这Rx.fromEvent(input, 'click')是一个冷的 observable(?)

事实并非如此。

是否有 Rx 操作符可以将冷的 observable 变成热的 observable?

将热(主动)可观察对象转变为冷(被动)可观察对象的概念是:您需要记录在没有订阅任何内容时发生的事件,并将这些项目(以各种方式)提供给将来出现的订阅者。一种方法是使用Subject例如,您可以使用 aReplaySubject来缓冲发出的项目并将它们重播给未来的订阅者。

您命名的两个运算符 (publishshare) 都在内部使用主题来提供该功能。

它如何与 Rx 运算符一起使用withLatestFrom让我们cold$成为一个已订阅的冷 observable。something$.withLatestFrom(cold$,...)是一个热门的 observable 吗?

如果something是一个热可观察对象,那么是的。如果something是冷可观察对象,则不是。回到事件示例,如果something是按钮单击事件流:

var clickWith3 = Rx.fromEvent(input, 'click')
    .withLatest(Rx.Observable.from([1, 2, 3]);

或者,如果我foo$.withLatestFrom(cold$,...), bar$.withLatestFrom(cold$,...)订阅并订阅fooand bar,我是否总是会看到两者的相同值?

不总是。同样,例如,如果foobar是单击不同的按钮,那么您将看到不同的值。同样,即使它们是同一个按钮,如果您的组合函数( 的第二个参数withLatest)没有为相同的输入返回相同的结果,那么您将不会看到相同的值(因为它会被调用两次,如解释以下)。

Rx.fromEvent正如其中一个答案中提到的那样,我认为会产生冷的可观察量,但事实并非如此。但是,我仍然对这种行为感到困惑:codepen.io/anon/pen/NqQMJR?editors=101不同的订阅从同一个 observable 获得不同的值。click事件不是共享的吗?

我会向您指出Enigmativity对我对相同行为提出的问题的这个很好的答案这个答案会比我能更好地解释它,但它的要点是源(点击事件)是“共享的”,是的,但你对它的操作不是。如果您不仅要共享单击事件,还要共享对它的操作,则需要明确地这样做。

Or if I do foo$.withLatestFrom(cold$,...), bar$.withLatestFrom(cold$,...) and subscribe to foo and bar, will I always see the same values for both?我的问题更多的是,如果取自的值cold$在两种情况下都相同。我的问题是withLatestFrom. 我知道我们对冷 $ 有两个不同的订阅,所以我希望根据订阅时间提取两个不同的值,这会破坏运算符的语义,所以我期望运算符cold$变成热流。但最好是测试,我会尽快发布答案。
2021-04-09 17:21:56

values在您的 codepen 中是惰性的 - 直到某些东西订阅之后才会发生任何事情,此时它会运行并连接它。所以在你的例子中,虽然你订阅了同一个变量,但它创建了两个不同的流;每个订阅调用一个。

你可以认为values作为流的生成clickmap附着。

.share() 在该地图的末尾将创建我们期望的行为,因为它是隐式订阅。

这不是您所有问题的答案(我想知道所有问题!)但可以肯定的是,所有fromEventObservable 都很热门。Click 似乎不是因为它不是像 mousemove 这样的“连续”事件,但无论如何订阅源(addEventListeneron调用)只在创建 Observable 时完成一次。所以很热。你可以在这里那里在操作符的源代码中看到它-share无论事件名称或来源是什么,创建的 observable 都是d。

然而,如果你将点击映射到一个函数,并订阅两次,它会为同一次点击两次通过可观察链。Codepen 在这里:codepen.io/anon/pen/NqQMJR?editors=101
2021-03-15 17:21:56
Erik,添加了所有这些问题的答案。如果您有时间,请告诉我您的想法。
2021-03-27 17:21:56