几个月后我又回到了我原来的问题,同时想分享所获得的知识。我将使用以下代码作为解释支持(jsfiddle):
var ta_count = document.getElementById('ta_count');
var ta_result = document.getElementById('ta_result');
var threshold = 3;
function emits ( who, who_ ) {return function ( x ) {
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}
var messages$ = Rx.Observable.create(function (observer){
var count= 0;
setInterval(function(){
observer.onNext(++count);
}, 1000)
})
.do(emits(ta_count, 'count'))
.map(function(count){return count < threshold})
.do(emits(ta_result, 'result'))
messages$.subscribe(function(){});
正如其中一个答案中提到的,定义一个 observable 会导致一系列回调和参数注册。必须启动数据流,这是通过subscribe
函数完成的。之后可以找到(为了说明而简化)详细流程。
默认情况下,Observable 是冷的。订阅 observable 将导致发生订阅的上游链。最后一个订阅导致执行一个函数,该函数将处理一个源并将其数据发送给它的观察者。
该观察者依次向下一个观察者发送数据,从而产生下游数据流,直至下沉观察者。下面的简化图显示了当两个订阅者订阅同一个 observable 时的订阅和数据流。
可以通过使用主题或通过multicast
运算符(及其派生词,请参见下面的注释 3)来创建热可观察对象。
在multicast
引擎盖下运营商利用一个主题,并返回一个可连接的可观察的。对运营商的所有订阅都将是对内部主题的订阅。当connect
被调用时,内部主体订阅上游可观察对象,数据流向下游。主体在内部操作订阅观察者的列表,并将传入数据多播给所有订阅的观察者。
下图总结了这种情况。
最后,更重要的是理解观察者模式导致的数据流和运算符的实现。
例如,如果obs
是热的,是hotOrCold = obs.op1
冷的还是热的?不管答案是什么:
- 如果没有订阅者
obs.op1
,则没有数据流过op1
。如果有订阅者到 hot obs
,那意味着obs.op1
可能会丢失一些数据。
- 假设它
op1
不是类似多播的运算符,订阅两次 tohotOrCold
将订阅两次op1
,并且每个值 fromobs
将流过两次op1
。
注意事项:
- 此信息应该对 Rxjs v4 有效。虽然第 5 版经历了相当大的变化,但其中大部分仍然适用。
- 取消订阅、错误和完成流程未表示,因为它们不在问题的范围内。调度程序也不考虑在内。除其他外,它们影响数据流的时间,但先验地不影响其方向和内容。
- 根据用于组播的主题类型,有不同的派生组播算子:
Subject type | `Publish` Operator | `Share` operator
------------------ | --------------------------- | -----------------
Rx.Subject | Rx.Observable.publish | share
Rx.BehaviorSubject | Rx.Observable.publishValue | shareValue
Rx.AsyncSubject | Rx.Observable.publishLast | N/A
Rx.ReplaySubject | Rx.Observable.replay | shareReplay
更新:另请参阅本·莱什 (Ben Lesh)撰写的有关该主题的以下文章 (here和 there )。
有关主题的更多详细信息可以在另一个 SO 问题中找到:不同 RxJS 主题的语义是什么?