RxJava 教程第三部分:驯服数据流之 hot & cold Observable
c4r79936
8年前
<p> </p> <p>Observable 数据流有两种类型:hot 和 cold。这两种类型有很大的不同。本节介绍他们的区别,以及作为 Rx 开发者应该如何正确的使用他们。</p> <p>Cold observables</p> <p>只有当有订阅者订阅的时候, Cold Observable 才开始执行发射数据流的代码。并且每个订阅者订阅的时候都独立的执行一遍数据流代码。 Observable.interval 就是一个 Cold Observable。每一个订阅者都会独立的收到他们的数据流。</p> <pre> <code class="language-java">Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS); cold.subscribe(i -> System.out.println("First: " + i)); Thread.sleep(500); cold.subscribe(i -> System.out.println("Second: " + i)); </code></pre> <p>结果:</p> <pre> <code class="language-java">First: 0 First: 1 First: 2 Second: 0 First: 3 Second: 1 First: 4 Second: 2 ... </code></pre> <p>虽然这两个 Subscriber 订阅到同一个Observable 上,只是订阅的时间不同,他们都收到同样的数据流,但是同一时刻收到的数据是不同的。</p> <p>在本教程中之前所见到的 Observable 都是 Cold Observable。 Observable.create 创建的也是 Cold Observable,而 just, range, timer 和 from 这些创建的同样是 Cold Observable。</p> <p>Hot observables</p> <p>Hot observable 不管有没有订阅者订阅,他们创建后就开发发射数据流。 一个比较好的示例就是 鼠标事件。 不管系统有没有订阅者监听鼠标事件,鼠标事件一直在发生,当有订阅者订阅后,从订阅后的事件开始发送给这个订阅者,之前的事件这个订阅者是接受不到的;如果订阅者取消订阅了,鼠标事件依然继续发射。</p> <p>Publish</p> <p>Cold Observable 和 Hot Observable 之间可以相互转化。使用 publish 操作函数可以把 Cold Observable 转化为 Hot Observable。</p> <pre> <code class="language-java">public final ConnectableObservable<T> publish() </code></pre> <p><img src="https://simg.open-open.com/show/fee3d60715406aca9c1772bde19e2726.png"></p> <p>publish 返回一个 ConnectableObservable 对象,这个对象是 Observable 的之类,多了三个函数:</p> <pre> <code class="language-java">public final Subscriptionconnect() public abstract void connect(Action1<? super Subscription> connection) public Observable<T> refCount() </code></pre> <p>另外还有一个重载函数,可以在发射数据之前对数据做些处理:</p> <pre> <code class="language-java">public final <R> Observable<R> publish(Func1<? super Observable<T>,? extends Observable<R>> selector) </code></pre> <p>之前介绍的所有对 Observable 的操作都可以在 selector 中使用。你可以通过 selector 参数创建一个 Subscription ,后来的订阅者都订阅到这一个 Subscription 上,这样可以确保所有的订阅者都在同一时刻收到同样的数据。</p> <p>这个重载函数返回的是 Observable 而不是 ConnectableObservable , 所以下面讨论的操作函数无法在这个重载函数返回值上使用。</p> <p>connect</p> <p>ConnectableObservable 如果不调用 connect 函数则不会触发数据流的执行。当调用 connect 函数以后,会创建一个新的 subscription 并订阅到源 Observable (调用 publish 的那个 Observable)。这个 subscription 开始接收数据并把它接收到的数据转发给所有的订阅者。这样,所有的订阅者在同一时刻都可以收到同样的数据。</p> <pre> <code class="language-java">ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); cold.connect(); cold.subscribe(i -> System.out.println("First: " + i)); Thread.sleep(500); cold.subscribe(i -> System.out.println("Second: " + i)); </code></pre> <p>结果:</p> <pre> <code class="language-java">First: 0 First: 1 First: 2 Second: 2 First: 3 Second: 3 First: 4 Second: 4 First: 5 Second: 5 </code></pre> <p>Disconnecting</p> <p>connect 函数返回的是一个 Subscription,和 Observable.subscribe返回的结果一样。 可以使用这个 Subscription 来取消订阅到 ConnectableObservable。 如果调用 这个 Subscription 的 unsubscribe 函数,可以停止把数据转发给 Observer,但是这些 Observer 并没有从 ConnectableObservable 上取消注册,只是停止接收数据了。如果再次调用 connect , 则 ConnectableObservable 开始一个新的订阅,在 ConnectableObservable 上订阅的 Observer 会再次开始接收数据。</p> <pre> <code class="language-java">ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); Subscription s = connectable.connect(); connectable.subscribe(i -> System.out.println(i)); Thread.sleep(1000); System.out.println("Closing connection"); s.unsubscribe(); Thread.sleep(1000); System.out.println("Reconnecting"); s = connectable.connect(); </code></pre> <p>结果:</p> <pre> <code class="language-java">0 1 2 3 4 Closingconnection Reconnecting 0 1 2 ... </code></pre> <p>通过调用 connect 来重新开始订阅,会创建一个新的订阅。如果源 Observable 为 Cold Observable 则数据流会重新执行一遍。</p> <p>如果你不想结束数据流,只想从 publish 返回的 Hot Observable 上取消注册,则可以使用 subscribe 函数返回的 Subscription 对象。</p> <pre> <code class="language-java">ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); Subscription s = connectable.connect(); Subscriptions1 = connectable.subscribe(i -> System.out.println("First: " + i)); Thread.sleep(500); Subscriptions2 = connectable.subscribe(i -> System.out.println("Second: " + i)); Thread.sleep(500); System.out.println("Unsubscribing second"); s2.unsubscribe(); </code></pre> <p>结果:</p> <pre> <code class="language-java">First: 0 First: 1 First: 2 Second: 2 First: 3 Second: 3 First: 4 Second: 4 Unsubscribingsecond First: 5 First: 6 </code></pre> <p>refCount</p> <p>ConnectableObservable.refCount 返回一个特殊的 Observable , 这个 Observable 只要有订阅者就会继续发射数据。</p> <pre> <code class="language-java">Observable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish().refCount(); Subscriptions1 = cold.subscribe(i -> System.out.println("First: " + i)); Thread.sleep(500); Subscriptions2 = cold.subscribe(i -> System.out.println("Second: " + i)); Thread.sleep(500); System.out.println("Unsubscribe second"); s2.unsubscribe(); Thread.sleep(500); System.out.println("Unsubscribe first"); s1.unsubscribe(); System.out.println("First connection again"); Thread.sleep(500); s1 = cold.subscribe(i -> System.out.println("First: " + i)); </code></pre> <p>结果:</p> <pre> <code class="language-java">First: 0 First: 1 First: 2 Second: 2 First: 3 Second: 3 Unsubscribesecond First: 4 First: 5 First: 6 Unsubscribefirst Firstconnectionagain First: 0 First: 1 First: 2 First: 3 First: 4 </code></pre> <p>如果没有订阅者订阅到 refCount 返回的 Observable,则不会执行数据流的代码。如果所有的订阅者都取消订阅了,则数据流停止。重新订阅再回重新开始数据流。</p> <p>replay</p> <pre> <code class="language-java">public final ConnectableObservable<T> replay() </code></pre> <p><img src="https://simg.open-open.com/show/0e7cf2efa00f4d441dc686c788daeaa5.png"></p> <p>replay 和 ReplaySubject 类似。当和源 Observable 链接后,开始收集数据。当有 Observer 订阅的时候,就把收集到的数据线发给 Observer。然后和其他 Observer 同时接受数据。</p> <pre> <code class="language-java">ConnectableObservable<Long> cold = Observable.interval(200, TimeUnit.MILLISECONDS).replay(); Subscription s = cold.connect(); System.out.println("Subscribe first"); Subscriptions1 = cold.subscribe(i -> System.out.println("First: " + i)); Thread.sleep(700); System.out.println("Subscribe second"); Subscriptions2 = cold.subscribe(i -> System.out.println("Second: " + i)); Thread.sleep(500); </code></pre> <p>结果:</p> <pre> <code class="language-java">Subscribefirst First: 0 First: 1 First: 2 Subscribesecond Second: 0 Second: 1 Second: 2 First: 3 Second: 3 </code></pre> <p>replay 和 publish 一样也返回一个 ConnectableObservable 。所以我们可以在上面使用 refCount 来创建新的 Observable 也可以取消注册。</p> <p>replay 有 8个重载函数:</p> <pre> <code class="language-java">ConnectableObservable<T> replay() <R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector) <R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize) <R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, int bufferSize, long time, java.util.concurrent.TimeUnitunit) <R> Observable<R> replay(Func1<? super Observable<T>,? extends Observable<R>> selector, long time, java.util.concurrent.TimeUnitunit) ConnectableObservable<T> replay(int bufferSize) ConnectableObservable<T> replay(int bufferSize, long time, java.util.concurrent.TimeUnitunit) ConnectableObservable<T> replay(long time, java.util.concurrent.TimeUnitunit) </code></pre> <p>有三个参数 bufferSize、 selector 和 time (以及指定时间单位的 unit)</p> <p>– bufferSize 用来指定缓存的最大数量。当新的 Observer 订阅的时候,最多只能收到 bufferSize 个之前缓存的数据。</p> <p>– time, unit 用来指定一个数据存货的时间,新订阅的 Observer 只能收到时间不超过这个参数的数据。</p> <p>– selector 和 publish(selector) 用来转换重复的 Observable。</p> <p>下面是一个 bufferSize 的示例:</p> <pre> <code class="language-java">ConnectableObservable<Long> source = Observable.interval(1000, TimeUnit.MILLISECONDS) .take(5) .replay(2); source.connect(); Thread.sleep(4500); source.subscribe(System.out::println); </code></pre> <p>结果:</p> <pre> <code class="language-java">2 3 4 </code></pre> <p>cache</p> <p>cache 操作函数和 replay 类似,但是隐藏了 ConnectableObservable ,并且不用管理 subscription 了。当第一个 Observer 订阅的时候,内部的 ConnectableObservable 订阅到源 Observable。后来的订阅者会收到之前缓存的数据,但是并不会重新订阅到源 Observable 上。</p> <pre> <code class="language-java">public final Observable<T> cache() public final Observable<T> cache(int capacity) </code></pre> <p><img src="https://simg.open-open.com/show/6bcd44749dcbfe693633d6c2cabf1a17.png"></p> <pre> <code class="language-java">Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS) .take(5) .cache(); Thread.sleep(500); obs.subscribe(i -> System.out.println("First: " + i)); Thread.sleep(300); obs.subscribe(i -> System.out.println("Second: " + i)); </code></pre> <p>结果:</p> <pre> <code class="language-java">First: 0 First: 1 First: 2 Second: 0 Second: 1 Second: 2 First: 3 Second: 3 First: 4 Second: 4 </code></pre> <p>从上面示例中可以看到,只有当有订阅者订阅的时候,源 Observable 才开始执行。当第二个订阅者订阅的时候,会收到之前缓存的数据。</p> <p>需要注意的是,如果所有的订阅者都取消订阅了 内部的 ConnectableObservable 不会取消订阅,这点和 refCount 不一样。只要第一个订阅者订阅了,内部的 ConnectableObservable 就链接到源 Observable上了并且不会取消订阅了。 这点非常重要,因为当我们一单订阅了,就没法取消源 Observable了, 直到源 Observable 结束或者程序内存溢出。 可以指定缓存个数的重载函数也没法解决这个问题,缓存限制只是作为一个优化的提示,并不会限制内部的缓存大小。</p> <pre> <code class="language-java">Observable<Long> obs = Observable.interval(100, TimeUnit.MILLISECONDS) .take(5) .doOnNext(System.out::println) .cache() .doOnSubscribe(() -> System.out.println("Subscribed")) .doOnUnsubscribe(() -> System.out.println("Unsubscribed")); Subscriptionsubscription = obs.subscribe(); Thread.sleep(150); subscription.unsubscribe(); </code></pre> <p>结果:</p> <pre> <code class="language-java">Subscribed 0 Unsubscribed 1 2 3 4 </code></pre> <p>上面的示例中,doOnNext 打印源 Observable 发射的每个数据。而 doOnSubscribe 和doOnUnsubscribe 打印缓存后的 Observable 的订阅和取消订阅事件。可以看到当订阅者订阅的时候,数据流开始发射,取消订阅数据流并不会停止。</p> <p>Multicast</p> <p>share 函数是 Observable.publish().refCount() 的别名。可以让你的订阅者分享一个 subscription,只要还有订阅者在,这个 subscription 就继续工作。</p> <p> </p> <p>来自: <a href="/misc/goto?guid=4959671584450402906" rel="nofollow">http://blog.chengyunfeng.com/?p=975</a></p>