当 Kotlin 遇见 RxJava 多数据源

vznq4117 8年前
   <h2>温馨提醒</h2>    <p>阅读本文最好有Kotlin基础,若没有基础,可参考之前文章Kotlin初探, <a href="/misc/goto?guid=4959749259654891196">使用Kotlin优雅的开发Android应用</a> ,以及RxJava基础(本文基于RxJava2),当然我也会尽可能详细解释让你顺利阅读本文。</p>    <p><a href="/misc/goto?guid=4959748795438722813">源码传送门</a></p>    <h2>写在前面</h2>    <p>最近几天回过头,看了之前的总结RxJava操作符系列,感觉对Rxjava多数据源的处理不是很理解,所以在总结学习一波。大家都知道,最近Kotlin语言一直占据热搜榜,褒贬不一,但我想说,不管有什么想法都要抛在脑后,毕竟Google爸爸出手,你不情愿也要跟随它的步伐。鉴于此,本篇对RxJava多数据源的总结是基于Kotlin语言,也让大家明白,使用Kotlin开发应用并不是不能使用Java库,现在有一部分人担心,Kotlin第三方库那么少,如果使用Kotlin开发那不是给自己找罪受,其实你完全错了,当你说这话的时候,我敢断定你都还没有接触Kotlin,因为Koltin有一个最重要的优势就是和Java绝对兼容。</p>    <h2>多数据源处理操作符</h2>    <p>在RxJava中多数据源处理的操作符很多,但是最经典的就要数merge,contact,zip了。如果对这三个操作符不是很熟悉的话,可以去查看它的使用,当然如果你懒得去看,我也会简单提一下。merge操作符可以处理多个Observable发送的数据,它是一个异步操作,不保证数据发送的顺序,即有可能出现数据交叉,当一个Observable发送了onError后,未执行的Observable不在继续执行,直接执行merge的onError方法。</p>    <p>contact操作符执行时一个同步操作,严格按照contact中传入Observable先后执行,即前面的先执行后面的后执行,并且最终发送的数据也是有序的,即第一个Observable的数据发送完毕再发送第二个,依次类推。</p>    <p>zip操作符和contact和merge有了本质的区别,它会将每个Observable个数据项分布对应返回一个Observable再发送,最终发送的数据量与最小数据长度相同。</p>    <h2>使用场景分析</h2>    <p>假如现在我们有三种商品,有一个查询商品信息的接口,根据接口可以查询该商品的价格以及出售地点。商品实体类</p>    <pre>  <code class="language-kotlin">data class Goods(var id:Int,var price: Int, var address: String)</code></pre>    <p>在Kotlin语言中,实体类创建用data class 关键词,我们不需要和Java一样创建get/set方法,只需一行代码搞定。</p>    <p>创建模拟网络请求</p>    <pre>  <code class="language-kotlin">object NetRequest {      //模拟网络请求      fun getGoodsObservable(id: Int): Observable<Goods> {  fun getGoodsObservable(id: Int): Observable<Goods> {          return Observable.create {              source ->              Thread.sleep(Random().nextInt(1000).toLong())              var data = Goods(id, Random().nextInt(20), "地址${id}")              source.onNext(data)              source.onComplete()              Log.e("getGoodsObservable:", "${id}")          }      }  }</code></pre>    <p>在上面我们创建了一个单例类,在Kotlin中使用object修饰类时即给我们自动创建了一个单例对象。在每一句代码结尾我们不需要再和Java一样写一个分号“;”来结束,什么也不用写。</p>    <p>Observable.create使用的是lambda表达式,在Kotlin语言中是支持lambda表达式的。source 就是ObservableEmitter ,所以我们可以调用onNext发送数据。为了更准确的模拟网络请求,使用Thread.sleep随机的延迟,模拟网络请求的时间。</p>    <pre>  <code class="language-kotlin">fun getGoodsObservable(id: Int): Observable<Goods> {          return Observable.create {              source ->              Thread.sleep(Random().nextInt(1000).toLong())              var data = Goods(id, Random().nextInt(20), "地址${id}")              source.onNext(data)              source.onComplete()              Log.e("getGoodsObservable:", "${id}")          }</code></pre>    <p>当然由于subscribe只有一个参数,所以我们也可以这样写。也就是省略了source ->,此时it就表示该参数数据。</p>    <pre>  <code class="language-kotlin">return Observable.create {              Thread.sleep(Random().nextInt(1000).toLong())              var data = Goods(id, Random().nextInt(20), "地址${id}")              it.onNext(data)              it.onComplete()              Log.e("getGoodsObservable:", "${id}")          }</code></pre>    <p>在java中实现如下</p>    <pre>  <code class="language-kotlin">return Observable.create(new ObservableOnSubscribe<Goods>() {                @Override              public void subscribe(@NonNull ObservableEmitter<Goods> e) throws Exception {               //处理逻辑              }          });</code></pre>    <h2>merge</h2>    <p>准备好了请求操作,开始使用merge看看执行的效果。</p>    <pre>  <code class="language-kotlin">fun executeMerge() {          Observable.merge(getGoodsObservable(1).subscribeOn(Schedulers.newThread()),                  getGoodsObservable(2).subscribeOn(Schedulers.newThread()),                  getGoodsObservable(3).subscribeOn(Schedulers.newThread()))                  .subscribeOn(Schedulers.newThread())                  .observeOn(AndroidSchedulers.mainThread())                  .toList()                  .subscribe({                      Log.e(TAG, it.toString())                  }, {                      Log.e(TAG, it.toString())                  })      }</code></pre>    <p>merge中有三个网络请求操作,并通过subscribeOn(Schedulers.newThread())将网络请求切换到线程中执行,数据都请求成功后,再通过observeOn(AndroidSchedulers.mainThread())切换到主线程请求数据。为了三请求都成功后,我们在更新UI,所以通过toList()将请求的数据转换成List一块发送。在上面的subscribe依然使用的lambda表达式,subscribe({},{})中第一个括号是onSuccess回调,里面的it是接收到的List< Goods >数据,第二个括号是onError回调,it表示异常Throwable对象。</p>    <p>subscribe部分Java代码</p>    <pre>  <code class="language-kotlin">.subscribe(new Consumer<List<Goods>>() {                      @Override                      public void accept(@NonNull List<Goods> goodses) throws Exception {                        }                  }, new Consumer<Throwable>() {                      @Override                      public void accept(@NonNull Throwable throwable) throws Exception {                        }                  });</code></pre>    <p>当然如果你想使用RxJava2中onSubscribe(@NonNull Disposable d) ,你可以这样使用subscribe</p>    <pre>  <code class="language-kotlin">.subscribe(object : SingleObserver<List<Goods>> {                      override fun onSubscribe(d: Disposable?) {                      }                      override fun onError(e: Throwable?) {                      }                      override fun onSuccess(t: List<Goods>?) {                      }                  })</code></pre>    <p>为了观察,我们将请求成功的数据显示在界面上,我们创建一个Button,TextView。</p>    <pre>  <code class="language-kotlin">class MainActivity : AppCompatActivity(), View.OnClickListener {        val TAG = "MainActivity"      override fun onCreate(savedInstanceState: Bundle?) {          super.onCreate(savedInstanceState)          setContentView(R.layout.activity_main)          setSupportActionBar(toolbar)          //加入这句import kotlinx.android.synthetic.main.activity_main.*          //不用再findViewById,可直接使用          merge.setOnClickListener(this)        }      override fun onClick(v: View) {          when (v.id) {              R.id.merge -> {                  executeMerge()              }          }          //when 关键字和Java中的Switch关键词是类似的,          //只不过它比Java中的Switch强大的多,可以接收任何参数,          //然后判断使用,也可以如下使用          when (v) {              merge -> {              }          }      }  }</code></pre>    <h2>contact</h2>    <p>我们点击执行几次发现,返回的List的数据并不是按照merge参数的先后顺序执行的,它是并发的,最终的顺序,是由网络请求的快慢决定的,请求返回数据越快也就表示该数据最早发送,即在List中最靠前。那么此时出现一个问题,如果我想返回数据的List顺序严格按照位置的先后顺序呢?那此时使用merge的话,是不太现实了。当然前面我们提到contact可以使用。那么直接将merge更改为contact执行以下试试,</p>    <pre>  <code class="language-kotlin">fun executeContact() {          Observable.concat(getGoodsObservable(1).subscribeOn(Schedulers.newThread()),                  getGoodsObservable(2).subscribeOn(Schedulers.newThread()),                  getGoodsObservable(3).subscribeOn(Schedulers.newThread()))                  .subscribeOn(Schedulers.newThread())                  .observeOn(AndroidSchedulers.mainThread())                  .toList()                  .subscribe({                      Log.e(TAG, it.toString())                  }, {                      Log.e(TAG, it.toString())                  })      }</code></pre>    <p>的确,发现无论执行多少次List的数据都能按照contact中Observable顺序发送,我们想要的效果可以实现了,不过你会发现,效率太差了,这是同步执行啊,只有第一个请求成功,才会去请求第二个,然后第三个,假如一次请求需要一秒,那三次请求至少三秒啊,不能忍。</p>    <h2>zip</h2>    <p>鉴于上面两种方式的利弊,如果我们既想如merge一样并发执行,又想和contact一样保证顺序,是不是有点强迫症的意思,当然强大的zip就能实现我们想要的效果。如下实现。.</p>    <pre>  <code class="language-kotlin">fun executeZip() {          Observable.zip(getGoodsObservable(1),                  getGoodsObservable(2),                  getGoodsObservable(3),                  Function3<Goods, Goods, Goods, List<Goods>>                  { goods0, goods1, goods2 ->                      val list = ArrayList<Goods>()                      list.add(goods0)                      list.add(goods1)                      list.add(goods2)                      list                  }).subscribeOn(Schedulers.newThread())                  .observeOn(AndroidSchedulers.mainThread())                  .subscribe({                      Log.e(TAG, it.toString())                  }, {                      Log.e(TAG, it.toString())                  })      }</code></pre>    <p>既然实现了,那我们运行几次,发现完美的实现了我们想要的效果,即并发的执行了,也保证了我们请求数据的顺序性。</p>    <h2>在回调中运用RxJava</h2>    <p>在上面我们的单个网络请求是一个同步的请求,如果我们的网络请求封装了,在线程中请求,请求成功后在主线程中回调,那我们又该如何创建呢使用呢?</p>    <p>先来模拟一个子线程请求网络,请求成功回调数据给主线程。</p>    <pre>  <code class="language-kotlin">fun getGoods(ctx:Context,id: Int,callbacks:(goods:Goods)->Unit): Unit {          ctx.doAsync {              Thread.sleep(Random().nextInt(1000).toLong())              var data = Goods(id, Random().nextInt(20), "地址${id}")              ctx.runOnUiThread {                  callbacks(data)              }          }      }</code></pre>    <p>getGoods传了三个参数,第一个Context对象,第二个是商品ID,第三个参数是一个函数,(goods:Goods)->Unit表示第三个参数的类型是一个参数为Goods类型并且返回Unit的函数。使用doAsync 模拟异步请求,请求成功后runOnUiThread 切换到UI线程。然后callbacks(data)将数据回调。这种使用方式比Java中回调优美好用太多了。</p>    <p>接下来就开始在回调成功后创建Observable</p>    <pre>  <code class="language-kotlin">fun getGoodsCallBack(id: Int): Observable<Goods> {          var subscrbe: ObservableEmitter<Goods>? = null          var o = Observable.create<Goods> {              subscrbe = it          }          //Kotlin特性          getGoods(this@MainActivity, id) {              subscrbe?.onNext(it)          }          return o      }      fun executeZipCallBack() {          Observable.zip(getGoodsCallBack(1).subscribeOn(Schedulers.newThread()),                  getGoodsCallBack(2).subscribeOn(Schedulers.newThread()),                  getGoodsCallBack(3).subscribeOn(Schedulers.newThread()),                  Function3<Goods, Goods, Goods, List<Goods>>                  { goods0, goods1, goods2 ->                      val list = ArrayList<Goods>()                      list.add(goods0)                      list.add(goods1)                      list.add(goods2)                      list                  }).subscribeOn(Schedulers.newThread())                  .observeOn(AndroidSchedulers.mainThread())                  .subscribe({                      Log.e(TAG, it.toString())                  }, {                      Log.e(TAG, it.toString())                  })      }</code></pre>    <p>ok,到这里回调情况下创建使用RxJava也介绍完毕,到此本篇文章就结束了,有问题欢迎指出,内容杂乱,多多担待,Hava a wonderful day.</p>    <p> </p>    <p>来自:https://juejin.im/entry/5925b8bcda2f60005d80e179</p>    <p> </p>