手把手教你使用RxJava 2.0
MaybelleSwa
8年前
<p>网上有很多关于RxJava的技术文章,大多数是关于1.x版本的。随着 RxJava 2.0 的推出,有些文章也介绍了2.x版本新增的内容以及与1.x版本的对比。有些同学如果刚刚接触RxJava,仅仅看RxJava 1.x 的一些技术文章,有时候会有些出入。因此本篇文章基于RxJava 2.0 进行由浅入深的学习,逐步掌握RxJava。</p> <h3>1.作用</h3> <p>RxJava的目的就是 <strong>异步</strong> 。</p> <p>RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性,在深入的使用过程中一定对这点深有体会。</p> <h3>2.工程引用</h3> <p>要应用RxJava,需要在项目中引入依赖:</p> <pre> <code class="language-java">io.reactivex.rxjava2:rxjava:2.0.4 io.reactivex.rxjava2:rxjava:2.0.4</code></pre> <h3>3.概念</h3> <p>要想理解好RxJava,首先要理解清楚其中的几个关键概念。由于RxJava是利用观察者模式来实现一些列的操作,所以对于观察者模式中的观察者,被观察者,以及订阅、事件需要有一个了解。如果不理解观察者模式,不要紧,下面会详细介绍。</p> <p>Observable:在观察者模式中称为“被观察者”;</p> <p>Observer:观察者模式中的“观察者”,可接收Observable发送的数据;</p> <p>subscribe:订阅,观察者与被观察者,通过subscribe()方法进行订阅;</p> <p>Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,该部分内容是2.0新增的,后续文章再介绍。Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable</p> <h3>4.RxJava中的观察者模式</h3> <p>观察者模式的概念很好理解,具体可以解释为:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。</p> <p>在程序的观察者模式,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。</p> <p>下面具体讲RxJava 的观察者模式</p> <p>RxJava 有四个基本概念:Observable (被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在完成某些操作,获得一些结果后,回调触发事件,即发出事件来通知 Observer。</p> <p>关于回调,如果理解则可以跳过这一段,如果不理解,在RxJava中可以简单的理解为:为了方便Observable和Observer交互,在Observable中,将Observer对象传入,在完成某些操作后调用Observer对象的方法,此时将触发Observer中具体实现的对应方法。</p> <p>注意:Observer是个接口,Observable是个类。</p> <p>与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() 之外,还定义了三个特殊的事件:onComplete() 和 onError(),onSubscribe()。</p> <p>onComplete(): 事件队列完结时调用该方法。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。</p> <p>onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。</p> <p>onSubscribe():RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于RxJava1.x中的Subscription,用于解除订阅。</p> <p>注意:onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。</p> <p>讲了这么多,大家会疑惑:这些都跟异步有什么关系?</p> <p>其实这都是在为异步进行铺垫。当大家理解了观察者模式之后,就会很容易理解RxJava的异步实现方式。让Observable (被观察者)开启子线程执行耗操作,完成耗时操作后,触发回调,通知Observer (观察者)进行主线程UI更新。如此轻松便可以实现Android中的异步,且代码简洁明了,集中分布。RxJava中默认Observer (观察者)和Observer (观察者)都在同一线程执行任务。本文主要介绍RxJava中的一些基本使用,关于线程调度问题下篇文章再进行介绍。即本文中的所有操作都默认在同一线程进行。</p> <p>好了,下面我们就开始了解RxJava的一些基本使用。</p> <h3>5.基本的用法</h3> <p>RxJava用法多种多样,其多样性体现在Obserable(被观察者)的创建上。</p> <p>我们先以最基础的Obserable(被观察者)的创建为例介绍RxJava的使用:</p> <p>Observable的创建:</p> <pre> <code class="language-java">Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { //执行一些其他操作 //............. //执行完毕,触发回调,通知观察者 e.onNext("我来发射数据"); } });</code></pre> <p>Observer的创建:</p> <pre> <code class="language-java">Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override //观察者接收到通知,进行相关操作 public void onNext(String aLong) { System.out.println("我接收到数据了"); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } };</code></pre> <p>订阅:</p> <pre> <code class="language-java">observable.subscribe(observer);</code></pre> <p>使用create( )创建Observable最基本的创建方式。可以看到,这里传入了一个 ObservableOnSubscribe对象作为参数,它的作用相当于一个计划表,当 Observable被订阅的时候,ObservableOnSubscribe的subscribe()方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Observer 将会被调用一次 onNext())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。</p> <p>Observable的其他创建方式:</p> <p>just()方式</p> <p>Observable<String> observable = Observable.just("Hello");</p> <p>使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据。通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。</p> <p>fromIterable()方式</p> <pre> <code class="language-java">List<String> list = new ArrayList<String>(); for(int i =0;i<10;i++){ list.add("Hello"+i); } Observable<String> observable = Observable.fromIterable((Iterable<String>) list);</code></pre> <p>使用fromIterable(),遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。</p> <p>注意:Collection接口是Iterable接口的子接口,所以所有Collection接口的实现类都可以作为Iterable对象直接传入fromIterable()方法。</p> <p>defer()方式</p> <pre> <code class="language-java">Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> call() throws Exception { return Observable.just("hello"); } });</code></pre> <p>当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。</p> <p>interval( )方式</p> <pre> <code class="language-java">Observable<String> observable = Observable.interval(2, TimeUnit.SECONDS);</code></pre> <p>创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定2秒一次调用onNext()方法。</p> <p>range( )方式</p> <pre> <code class="language-java">Observable<Integer> observable = Observable.range(1,20);</code></pre> <p>创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常。上述表示发射1到20的数。即调用20次nNext()方法,依次传入1-20数字。</p> <p>timer( )方式</p> <pre> <code class="language-java">Observable<Integer> observable = Observable.timer(2, TimeUnit.SECONDS);</code></pre> <p>创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。</p> <p>repeat( )方式</p> <pre> <code class="language-java">Observable<Integer> observable = Observable.just(123).repeat();</code></pre> <p>创建一个Observable,该Observable的事件可以重复调用。</p> <p>除了Observable(被观察者)的创建之外,RxJava 2.x 还提供了多个函数式接口 ,用于实现简便式的观察者模式。具体的函数式接口包括以下:</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/a94151f557832c4b9a8b0fc17a787ac1.png"></p> <p>以Consumer为例,我们可以实现简便式的观察者模式:</p> <pre> <code class="language-java">Observable.just("hello").subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });</code></pre> <p>其中Consumer中的accept()方法接收一个来自Observable的单个值。Consumer就是一个观察者。其他函数式接口可以类似应用。</p> <p>注意:Observable (被观察者)只有在被Observer (观察者)订阅后才能执行其内部的相关逻辑,下面代码证实了这一点:</p> <pre> <code class="language-java">Observable<Long> observable = Observable.interval(2, TimeUnit.SECONDS); Observer<Long> observer = new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { System.out.println(aLong); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; SystemClock.sleep(10000);//睡眠10秒后,才进行订阅 仍然从0开始,表示Observable内部逻辑刚开始执行 observable.subscribe(observer);</code></pre> <p>01-18 16:09:20.874 12535-12927/com.lvr.rxjavalearning I/System.out: 0</p> <p>01-18 16:09:22.864 12535-12927/com.lvr.rxjavalearning I/System.out: 1</p> <p>01-18 16:09:24.864 12535-12927/com.lvr.rxjavalearning I/System.out: 2</p> <p>01-18 16:09:26.864 12535-12927/com.lvr.rxjavalearning I/System.out: 3</p> <p>除此之外,RxJava中还有许多操作符。 操作符就是用于在Observable和最终的Observer之间,通过转换Observable为其他观察者对象的过程,修改发出的事件,最终将最简洁的数据传递给Observer对象。 下面我们介绍一些比较常用的操作符。</p> <h3>6.RxJava中的操作符</h3> <p>map()操作符</p> <pre> <code class="language-java">Observable<Integer> observable = Observable.just("hello").map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return s.length(); } });</code></pre> <p>map()操作符,就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。</p> <p>flatMap()操作符</p> <pre> <code class="language-java">Observable<Object> observable = Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(List<String> strings) throws Exception { return Observable.fromIterable(strings); } });</code></pre> <p>flatMap()对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象。</p> <p>filter()操作符</p> <pre> <code class="language-java">Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(List<String> strings) throws Exception { return Observable.fromIterable(strings); } }).filter(new Predicate<Object>() { @Override public boolean test(Object s) throws Exception { String newStr = (String) s; if (newStr.charAt(5) - '0' > 5) { return true; } return false; } }).subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { System.out.println((String)o); } });</code></pre> <p>filter()操作符根据test()方法中,根据自己想过滤的数据加入相应的逻辑判断,返回true则表示数据满足条件,返回false则表示数据需要被过滤。最后过滤出的数据将加入到新的Observable对象中,方便传递给Observer想要的数据形式。</p> <p>take()操作符</p> <pre> <code class="language-java">Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(List<String> strings) throws Exception { return Observable.fromIterable(strings); } }).take(5).subscribe(new Consumer<Object>() { @Override public void accept(Object s) throws Exception { System.out.println((String)s); } });</code></pre> <p>take()操作符:输出最多指定数量的结果。</p> <p>doOnNext()</p> <pre> <code class="language-java">Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(List<String> strings) throws Exception { return Observable.fromIterable(strings); } }).take(5).doOnNext(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { System.out.println("准备工作"); } }).subscribe(new Consumer<Object>() { @Override public void accept(Object s) throws Exception { System.out.println((String)s); } });</code></pre> <p>doOnNext()允许我们在每次输出一个元素之前做一些额外的事情。</p> <p>以上就是一些常用的操作符,通过操作符的使用。我们每次调用一次操作符,就进行一次观察者对象的改变,同时将需要传递的数据进行转变,最终Observer对象获得想要的数据。</p> <p>以网络加载为例,我们通过Observable开启子线程,进行一些网络请求获取数据的操作,获得到网络数据后,然后通过操作符进行转换,获得我们想要的形式的数据,然后传递给Observer对象。</p> <p>以上仅仅是介绍RxJava的观察者模式以及RxJava的简单操作与使用。通过本篇文章,可以对RxJava有个简单的了解。后面我会继续介绍RxJava中线程调度的内容,以及RxJava 2.x 中新增的功能。如果大家喜欢这部分内容,可以持续关注,后面会继续更新。</p> <p> </p> <p>来自:http://www.jianshu.com/p/d149043d103a</p> <p> </p>