ReactiveX のStream分岐は2種類ありますよね。
一つは publish、もう一つはshare
どちらもパッと見同じように見えるのだけれども、
実はちょっとした違いがあります。
一言でいうと、分配と複製になります。
ここでつらつらと説明をしてもイマイチ伝わらないかと思うので、
まずは実際のコードを見ていきましょう!
share
shareは先程言った2つの内、複製に当たるんですが…
イメージ的には、このように分岐をして複製するみたいな感じです。
最初のObservableが複製されて、
複製されたObservableからSubscribeがコールされます。
このとき、Observableが複製されるので
Observable処理は4回呼ばれます…
うーん…
これはどういう使い方するんだろう…
コードと処理結果はこちらになります!
コード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package com.hilo_islay import io.reactivex.Observable import io.reactivex.disposables.CompositeDisposable import io.reactivex.schedulers.Schedulers import java.util.concurrent.TimeUnit fun main(args: Array<String>) { var shareCount = 0 val disposable = CompositeDisposable() val observable = Observable.create<Int> { val i = ++shareCount; println("Share :$i"); it.onNext(i) } .subscribeOn(Schedulers.computation()) observable.subscribe() observable.share().subscribeOn(Schedulers.computation()) .subscribe { println("Subscribe :$it") } .let { disposable.add(it) } observable.share().subscribeOn(Schedulers.computation()) .subscribe { println("Subscribe :$it") } .let { disposable.add(it) } observable.share().subscribeOn(Schedulers.computation()) .subscribe { println("Subscribe :$it") } .let { disposable.add(it) } TimeUnit.SECONDS.sleep(30) } |
処理結果
Share :1
Share :2
Subscribe :2
Share :3
Subscribe :3
Share :4
Subscribe :4
Share(Observable)が4回呼ばれて、Subscribeが3回呼ばれていますね。
分岐だけで使うのは楽なんだけども、親から複製されてしまうので使い所が難しいですね;;
publish
publishは完全に分岐です!
同じように、画像から見ていきましょう。
shareとは違い、Observableが一つでSubscribeが作られた分だけ生成されます。
動きとしてはこちらの方が使い勝手がいいのですが、
publishに関しては「connect()」をコールするまでSubscribeはコールされません。
つまり、うまいこと複製をするタイミングを見極める必要がありますね。
コードと処理結果はこちらです。
コード
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
package com.hilo_islay import io.reactivex.Observable import io.reactivex.disposables.CompositeDisposable import io.reactivex.schedulers.Schedulers import java.util.concurrent.TimeUnit fun main(args : Array<String>) { var publishCount= 0 val disposable = CompositeDisposable() val pub = Observable.create<Int>{ val i = ++publishCount; println("Publish $i"); it.onNext(i) } .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.newThread()) .publish() pub.subscribe { println("Subscribe :$it") }.let { disposable.add(it) } pub.subscribe { println("Subscribe :$it") }.let { disposable.add(it) } pub.subscribe { println("Subscribe :$it") }.let { disposable.add(it) } pub.connect() TimeUnit.SECONDS.sleep(30) } |
処理結果
Publish 1
Subscribe :1
Subscribe :1
Subscribe :1
結果を見てもらうとわかるように、shareと違って、
全てPublishのnextで渡された値が表示されています。
分岐をさせるには、こちらを使っていくのが良さそうですね!
分岐自体はshare、publishの2種類がありますが
基本的にpublishの方を使っていけば大丈夫だと思います!!