RxJava 2.0 全新来袭

804696063 8年前
   <h2>前言</h2>    <p>之前写RxJava相关文章的时候,就有人想让我谈谈RxJava2.0的新特性,说实话,一开始我是拒绝的。因为在我看来,RxJava2.0虽然是版本的重大升级,但总归还是RxJava,升级一个版本还能上天是咋的?了解一下它的更新文档不就好了么?真的有必要单出一篇文章来谈这个么?</p>    <p>但是详细的了解了RxJava2.0以及部分源码之后,我觉得还是有必要对RxJava2.0做一个说明,帮助大家对于RxJava有更好的认识。</p>    <h2>铺垫</h2>    <p>假如你还不是很熟悉RxJava,或者对于背压这个概念(2.0更新中会涉及到背压的概念)很模糊,希望你也能读一读下面两篇铺垫的文章:</p>    <ul>     <li>关于RxJava最友好的文章</li>     <li>关于RxJava最友好的文章----背压</li>    </ul>    <p>关于背压的那篇文章本来是本文的一部分,因为篇幅过大,被剥离出去了,所以建议大家有时间也一并阅读。</p>    <h2>正文</h2>    <p>RxJava2.0有很多的更新,一些改动甚至冲击了我之前的文章里的内容,这也是我想写这篇文章的原因之一。不过想要写这篇文章其实也是有难度的,因为相关的资料去其实是很少的,还得自己硬着头皮上....不过俗话说得好,有困难要上,没有困难创造困难也要上。</p>    <p>在这里,我会按照我们之前关于RxJava的文章的讲述顺序:观察者模式,操作符,线程调度,这三个方面依次看有哪些更新。</p>    <h2>添加依赖</h2>    <p>这个估计得放在最前面。</p>    <p>Android端使用RxJava需要依赖新的包名:</p>    <pre>  <code class="language-java">//RxJava的依赖包(我使用的最新版本)      compile 'io.reactivex.rxjava2:rxjava:2.0.1'      //RxAndroid的依赖包      compile 'io.reactivex.rxjava2:rxandroid:2.0.1'</code></pre>    <h2>观察者模式</h2>    <p>首先声明, <strong>RxJava以观察者模式为骨架,在2.0中依然如此</strong> 。</p>    <p>不过此次更新中,出现了两种观察者模式:</p>    <ul>     <li>Observable(被观察者)/Observer(观察者)</li>     <li>Flowable(被观察者)/Subscriber(观察者)</li>    </ul>    <p style="text-align:center"><img src="https://simg.open-open.com/show/b6a6da8d2b90129984268ff5db4e9ebb.png"></p>    <p>RxJava2.X中, <strong>Observeable用于订阅Observer</strong> ,是不支持背压的,而 <strong>Flowable用于订阅Subscriber</strong> ,是支持背压(Backpressure)的。</p>    <p>关于背压这个概念以及它在1.0版本中的缺憾在上一篇文章中我已经介绍到了,如果你不是很清楚,我在这里在做一个介绍: 背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略 ,在1.0中,关于背压最大的遗憾,就是集中在Observable这个类中,导致有的Observable支持背压,有的不支持。为了解决这种缺憾,新版本把支持背压和不支持背压的Observable区分开来。</p>    <h3>Observable/Observer</h3>    <p>Observable正常用法:</p>    <pre>  <code class="language-java">Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {              @Override              public void subscribe(ObservableEmitter<Integer> e) throws Exception {                  e.onNext(1);                  e.onNext(2);                  e.onComplete();              }          });    Observer mObserver=new Observer<Integer>() {              //这是新加入的方法,在订阅后发送数据之前,              //回首先调用这个方法,而Disposable可用于取消订阅              @Override              public void onSubscribe(Disposable d) {                }                @Override              public void onNext(Integer value) {                }                @Override              public void onError(Throwable e) {                }                @Override              public void onComplete() {                }          };    mObservable.subscribe(mObserver);</code></pre>    <p>这种观察者模型是不支持背压的。</p>    <p>啥叫不支持背压呢?</p>    <p>当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException,消耗内存过大只会OOM</p>    <p>我在测试的时候,快速发送了100000个整形数据,下游延迟接收,结果被观察者的数据全部发送出去了,内存确实明显增加了,遗憾的是没有OOM。</p>    <p>所以,当我们使用Observable/Observer的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线,仅供各位参考)</p>    <h3>Flowable/Subscriber</h3>    <pre>  <code class="language-java">Flowable.range(0,10)          .subscribe(new Subscriber<Integer>() {              Subscription sub;              //当订阅后,会首先调用这个方法,其实就相当于onStart(),              //传入的Subscription s参数可以用于请求数据或者取消订阅              @Override              public void onSubscribe(Subscription s) {                  Log.w("TAG","onsubscribe start");                  sub=s;                  sub.request(1);                  Log.w("TAG","onsubscribe end");              }                @Override              public void onNext(Integer o) {                  Log.w("TAG","onNext--->"+o);                  sub.request(1);              }              @Override              public void onError(Throwable t) {                  t.printStackTrace();              }              @Override              public void onComplete() {                  Log.w("TAG","onComplete");              }          });</code></pre>    <p>输出如下:</p>    <pre>  <code class="language-java">onsubscribe start  onNext--->0  onNext--->1  onNext--->2  ...  onNext--->10  onComplete  onsubscribe end</code></pre>    <p>Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。</p>    <p>当然,Flowable也可以通过creat()来创建:</p>    <pre>  <code class="language-java">Flowable.create(new FlowableOnSubscribe<Integer>() {              @Override              public void subscribe(FlowableEmitter<Integer> e) throws Exception {                  e.onNext(1);                  e.onNext(2);                  e.onNext(3);                  e.onNext(4);                  e.onComplete();              }          }          //需要指定背压策略          , BackpressureStrategy.BUFFER);</code></pre>    <p>Flowable虽然可以通过create()来创建,但是你必须指定背压的策略,以保证你创建的Flowable是支持背压的(这个在1.0的时候就很难保证,可以说RxJava2.0收紧了create()的权限)。</p>    <p>根据上面的代码的结果输出中可以看到,当我们调用subscription.request(n)方法的时候,不等onSubscribe()中后面的代码执行,就会立刻执行到onNext方法,因此,如果你在onNext方法中使用到需要初始化的类时,应当尽量在subscription.request(n)这个方法调用之前做好初始化的工作;</p>    <p>当然,这也不是绝对的,我在测试的时候发现,通过create()自定义Flowable的时候,即使调用了subscription.request(n)方法,也会等onSubscribe()方法中后面的代码都执行完之后,才开始调用onNext。</p>    <p>TIPS: 尽可能确保在request()之前已经完成了所有的初始化工作,否则就有空指针的风险。</p>    <h3>其他观察者模式</h3>    <p>当然,除了上面这两种观察者,还有一类观察者</p>    <ul>     <li>Single/SingleObserver</li>     <li>Completable/CompletableObserver</li>     <li>Maybe/MaybeObserver</li>    </ul>    <p>其实这三者都差不多,Maybe/MaybeObserver可以说是前两者的复合体,因此以Maybe/MaybeObserver为例简单介绍一下这种观察者模式的用法</p>    <pre>  <code class="language-java">//判断是否登陆  Maybe.just(isLogin())      //可能涉及到IO操作,放在子线程      .subscribeOn(Schedulers.newThread())      //取回结果传到主线程      .observeOn(AndroidSchedulers.mainThread())      .subscribe(new MaybeObserver<Boolean>() {              @Override              public void onSubscribe(Disposable d) {                }                @Override              public void onSuccess(Boolean value) {                  if(value){                      ...                  }else{                      ...                  }              }                @Override              public void onError(Throwable e) {                }                @Override              public void onComplete() {                }          });</code></pre>    <p>上面就是Maybe/MaybeObserver的普通用法,你可以看到,实际上,这种观察者模式并不用于发送大量数据,而是发送单个数据,也就是说,当你只想要某个事件的结果(true or false)的时候,你可以用这种观察者模式</p>    <p>这是上面那些被观察者的上层接口:</p>    <pre>  <code class="language-java">//Observable接口  interface ObservableSource<T> {      void subscribe(Observer<? super T> observer);  }  //Single接口  interface SingleSource<T> {      void subscribe(SingleObserver<? super T> observer);  }  //Completable接口  interface CompletableSource {      void subscribe(CompletableObserver observer);  }  //Maybe接口  interface MaybeSource<T> {      void subscribe(MaybeObserver<? super T> observer);  }  //Flowable接口  public interface Publisher<T> {      public void subscribe(Subscriber<? super T> s);  }</code></pre>    <p>其实我们可以看到,每一种观察者都继承自各自的接口,这也就把他们能完全的区分开,各自独立(特别是Observable和Flowable),保证了他们各自的拓展或者配套的操作符不会相互影响。</p>    <p>例如flatMap操作符实现:</p>    <pre>  <code class="language-java">//Flowable中flatMap的定义  Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);    //Observable中flatMap的定义  Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);</code></pre>    <p>假如你想为Flowable写一个自定义的操作符,那么只要保证Function< Publisher >中的类型实现了Publisher接口即可。这么说可能很抽象,大家不理解其实也没关系,因为并不推荐大家自定义操作符,RxJava中的操纵符的组合已经可以满足大家的需求了。</p>    <p>当然,你也会注意到上面那些接口中的subscribe()方法的返回类型为void了,在1.X中,这个方法一般会返回一个Subscription对象,用于取消订阅。现在,这个功能的对象已经被放到观察者Observer或者subscriber的内部实现方法中了,</p>    <p>Flowable/Subscriber</p>    <pre>  <code class="language-java">public interface Subscriber<T> {        public void onSubscribe(Subscription s);      public void onNext(T t);      public void onError(Throwable t);      public void onComplete();  }    public interface Subscription {      public void request(long n);      public void cancel();  }</code></pre>    <p>上面的实例中,onSubscribe(Subscription s)传入的参数s就肩负着取消订阅的功能,当然,他也可以用于请求上游的数据。</p>    <p>在Observable/observer中,传入的参数是另一个对象</p>    <p>Observable/Observer</p>    <pre>  <code class="language-java">public interface Observer<T> {     void onSubscribe(Disposable d);      void onNext(T value);      void onError(Throwable e);      void onComplete();  }    public interface Disposable {      /**       * Dispose the resource, the operation should be idempotent.       */      void dispose();      /**       * Returns true if this resource has been disposed.       * @return true if this resource has been disposed       */      boolean isDisposed();  }</code></pre>    <p>在Observer接口中,onSubscribe(Disposable d)方法传入的Disposable也是用于取消订阅,基本功能是差不多的,只不过命名不一致,大家知道就好。</p>    <p>其实这种设计可以说还是符合逻辑的,因为取消订阅这个动作就只有观察者(Observer等)才能做的,现在把它并入到观察者内部,也算顺理成章吧。</p>    <p>最后再提一点更新,就是被观察者不再接收null作为数据源了。</p>    <h2>操作符相关</h2>    <p>这一块其实可以说没什么改动,大部分之前你用过的操作符都没变,即使有所变动,也只是包名或类名的改动。大家可能经常用到的就是Action和Function。</p>    <h3>Action相关</h3>    <p>之前我在文章里介绍过关于Action这类接口,在1.0中,这类接口是从Action0,Action1...往后排(数字代表可接受的参数),现在做出了改动</p>    <p>Rx1.0-----------Rx2.0</p>    <p>Action1--------Action</p>    <p>Action1--------Consumer</p>    <p>Action2--------BiConsumer</p>    <p>后面的Action都去掉了,只保留了ActionN</p>    <h3>Function相关</h3>    <p>同上,也是命名方式的改变</p>    <p>上面那两个类, 和RxJava1.0相比,他们都增加了throws Exception,也就是说,在这些方法做某些操作就不需要try-catch 。</p>    <p>例如:</p>    <pre>  <code class="language-java">Flowable.just("file.txt")  .map(name -> Files.readLines(name))  .subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);</code></pre>    <p>Files.readLines(name)这类io方法本来是需要try-catch的,现在直接抛出异常,就可以放心的使用lambda ,保证代码的简洁优美。</p>    <h3>doOnCancel/doOnDispose/unsubscribeOn</h3>    <p>以doOnCancel为例,大概就是当取消订阅时,会调用这个方法,例如:</p>    <pre>  <code class="language-java">Flowable.just(1, 2, 3)  .doOnCancel(() -> System.out.println("Cancelled!"))  .take(2)  .subscribe(System.out::println);</code></pre>    <p>take新操符会取消后面那些还未被发送的事件,因而会触发doOnCancel</p>    <p>其他的一些操作符基本没变,或者只是改变了名字,在这里就不一一介绍了,需要提一下的是,很多操作符都有两套,一套用于Observable,一套用于Flowable。</p>    <h2>线程调度</h2>    <p>可以说这一块儿基本也没有改动,如果一定要说的话。</p>    <ul>     <li>那就是去掉了Schedulers.immediate()这个线程环境</li>     <li>移除的还有Schedulers.test()(我好像从来没用过这个方法)</li>     <li>io.reactivex.Scheduler这个抽象类支持直接调度自定义线程任务(这个我也没怎么用)</li>    </ul>    <h2>补充</h2>    <p>如果你想把自己的RxJava1.0的迁移到2.0的版本,可以使用这个库RxJava2Interop ,在github上可以找到,它可以在Rxjava1.0和2.0之间相互转换,也就是说,不仅可以把1.0的代码迁移到2.0,你还可以把2.0的代码迁移到1.0,哈哈。</p>    <h2>结尾</h2>    <p>其实从整篇文章的分析来看,改动最大的还是观察者模式的实现,被拆分和细化了,主要分成了Observable和Flowable两大类,当然还有与之相关联的其他变动, <strong>总体来看这一版本可以说是对于观察者和被观察者的重构</strong> 。</p>    <p>RxJava2.0的范例代码我没精力去写了,也正巧有位外国朋友已经写了RxJava2.0的demo,下面是他的项目地址:</p>    <p>在github上搜索:RxJava2-Android-Samples</p>    <p>当然,学习2.0 的过程中有什么问题也可以在这里留言讨论。</p>    <h2>附录</h2>    <p>下面我截图展示一下2.0相对于1.0的一些改动的细节,仅做参考。</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/989084248d8bcfce5842dd289e12452d.png"></p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/8a0d756960ea83b45ea8baa471fc89c5.png"></p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/989084248d8bcfce5842dd289e12452d.png"></p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/8b75276506cfb72e23d0cd1e9aa865e1.png"></p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/b8e6bc0579da7cee580a9d950966fba7.png"></p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/96b98e340e4a7b11bc4ea7237f3b291c.png"></p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/ff625b20db12c9c1faa56efd1cc3169a.png"></p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/b9106e6f3a304722d5ce1be74a6511e3.png"></p>    <p>其实这些都是官方给出的列表,截图在这里只是方便大家观摩。</p>    <p> </p>    <p>来自:http://www.jianshu.com/p/220955eefc1f</p>    <p> </p>