mergeMap 运算符的用例是什么?

IT技术 javascript rxjs observable
2021-03-21 18:40:19

我完全不明白这样做的目的mergeMap我听说有两种解释:

  1. 就像SelectAll()在 .NET LINQ 中一样 - 不。
  2. 它是 RxJSmergemap- 不(或者我无法复制)的组合。

考虑以下代码

    var obs1 = new Rx.Observable.interval(1000);
    var obs2 = new Rx.Observable.interval(1000);
    
    //Just a merge and a map, works fine
    obs1.merge(obs2).map(x=> x+'a').subscribe(
      next => console.log(next)
    )
    
    //Who know what - seems to do the same thing as a plain map on 1 observable
    obs1.mergeMap(val => Rx.Observable.of(val + `B`))
        .subscribe(
          next => console.log(next)
        )

最后一篇标有“谁知道什么”的文章只不过是一张地图obs1——有什么意义?

什么是mergeMap真正做到?什么是有效用例的示例?(最好有一些代码)

根本没有帮助我的文章(上面的合并地图代码来自其中之一):1 , 2

2个回答

tl;博士; mergeMapmap. 理解mergeMap是获得 Rx 全部功能的必要条件。


相似之处

  • 二者mergeMapmap作用于单流(相对于zipcombineLatest

  • 二者mergeMapmap可以改变一个流的元件(相对于filterdelay

差异

地图

  • 不能改变源流的大小(假设:map本身没有throw);对于源中的每个元素,mapped只发出一个元素;map不能忽略元素(例如filter);

  • 在默认调度程序的情况下,转换是同步发生的;要 100% 清楚:源流可能会异步传递其元素,但每个下一个元素都会立即mapped重新发送;map无法及时移动元素,例如delay

  • 对返回值没有限制

  • idx => x

合并映射

  • 可以改变源流的大小;对于每个元素,可能会创建/发出任意数量(0、1 或许多)的新元素

  • 它提供了对异步性的完全控制——无论是何时创建/发出新元素,以及应同时处理多少来自源流的元素;例如,假设源流发出 10 个元素,但maxConcurrency设置为 2,则将立即处理两个第一个元素,其余 8 个元素将被缓冲;一旦其中一个被处理complete,源流中的下一个元素将被处理等等 - 这有点棘手,但请看下面的例子

  • 所有其他操作符都可以只用mergeMapObservable构造函数来实现

  • 可用于递归异步操作

  • 返回值必须是 Observable 类型(或者 Rx 必须知道如何从中创建可观察的 - 例如Promise,数组)

  • idx => Rx.Observable.of(x)

数组类比

let array = [1,2,3]
fn             map                    mergeMap
x => x*x       [1,4,9]                error /*expects array as return value*/
x => [x,x*x]   [[1,1],[2,4],[3,9]]    [1,1,2,4,3,9]

类比不显示完整图像,它基本上对应于.mergeMapmaxConcurrency设置为1。在这种情况下的元素将被排序为以上,但在一般情况下,它不必须是如此。我们唯一的保证是新元素的发射将按照它们在底层流中的位置进行排序。例如:[3,1,2,4,9,1]and[2,3,1,1,9,4]是有效的,但[1,1,4,2,3,9]不是(因为42在底层流中发出的)。

几个使用的例子mergeMap

// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
  return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}

Rx.Observable.range(1, 3)
  .mapWithMergeMap(x => x * x)
  .subscribe(x => console.log('mapWithMergeMap', x))

// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
  return this.mergeMap(x =>
    filterFn(x) ?
    Rx.Observable.of(x) :
    Rx.Observable.empty()); // return no element
}

Rx.Observable.range(1, 3)
  .filterWithMergeMap(x => x === 3)
  .subscribe(x => console.log('filterWithMergeMap', x))

// implement .delay with .mergeMap 
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
  return this.mergeMap(x =>
    Rx.Observable.create(obs => {
      // setTimeout is naive - one should use scheduler instead
      const token = setTimeout(() => {
        obs.next(x);
        obs.complete();
      }, delayMs)
      return () => clearTimeout(token);
    }))
}

Rx.Observable.range(1, 3)
  .delayWithMergeMap(500)
  .take(2)
  .subscribe(x => console.log('delayWithMergeMap', x))

// recursive count
const count = (from, to, interval) => {
  if (from > to) return Rx.Observable.empty();
  return Rx.Observable.timer(interval)
    .mergeMap(() =>
      count(from + 1, to, interval)
      .startWith(from))
}

count(1, 3, 1000).subscribe(x => console.log('count', x))

// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
  Rx.Observable.if(
    () => from > to,
    Rx.Observable.empty(),
    Rx.Observable.timer(interval)
    .mergeMap(() => countMoreRxWay(from + 1, to, interval)
      .startWith(from)))

const maxConcurrencyExample = () =>
  Rx.Observable.range(1,7)
    .do(x => console.log('emitted', x))
    .mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
    .do(x => console.log('processed', x))
    .subscribe()

setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>

@arturgrzesiak 如果我用于mergeMap每个基本的凝乳操作怎么办?它会影响性能还是其他一些?我没有试过,只是有一个疑问。
2021-04-26 18:40:19
@k11k2 抱歉回复晚了 - 就像 3 年前我有类似的疑问,从那时起我一直将整个应用程序构建为单一的可观察对象,并且 Rx 性能从来都不是问题。
2021-05-07 18:40:19
所以这就是它的工作原理。RxJs 文档上完全非人类可读的解释应该替换为这个。 reactivex.io/rxjs/class/es6/...跆拳道M8?
2021-05-17 18:40:19

.mergeMap()允许您将高阶 Observable 展平为单个流。例如:

Rx.Observable.from([1,2,3,4])
  .map(i => getFreshApiData())
  .subscribe(val => console.log('regular map result: ' + val));

//vs

Rx.Observable.from([1,2,3,4])
  .mergeMap(i => getFreshApiData())
  .subscribe(val => console.log('mergeMap result: ' + val));

function getFreshApiData() {
  return Rx.Observable.of('retrieved new data')
    .delay(1000);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

有关.xxxMap()运算符的深入解释,请参阅我在另一个问题上的回答Rxjs - 如何提取数组中的多个值并将它们同步反馈给可观察流