Architecting Android with RxJava

jopen 9年前

 

最近,我抽出了几个晚上的时间,把咖啡和啤酒变成了代码与文字。

引子

三个月以来,我翻译了一些关于 RxJava 的文章,说实话这些翻译,真的搞得我很头疼,那么现在是时候回来写点什么了。

最近,我在看两本书, 《Learning Reactive Programming with Java 8》《RxJava Essentials》 ,不过,没关系,我已经买到了电子版,我会在文章结尾附上网盘链接和密码,但我还是希望你将文章继续读下去,因为那是文章结尾的事。

其实关于 RxJava 的文章和消息远不止我们能了解到的,但又拜英语所赐,所以它看起来又没那么多。好在,国内有许多优秀的开发专家 hi大头鬼hiBlackSwift程序亦非猿Drakeet ,扔物线,流火枫林等等在为之做着贡献,以及简直不能更优秀的文章 《给 Android 开发者的 RxJava 详解》

但是,现在,我不得不再次做啰嗦一下, RxJava 究竟会改变我们什么。

响应式编程Reactive Programming

什么是响应式编程呢?在Java程序中:

int a = 4;  int b = 5;  int c = a + b;  System.out.println(c); // 9  a = 6;  System.out.println(c);  // 9 again, but if 'c' was tracking the changes of 'a' and 'b',  // it would've been 6 + 5 = 11

当我们改变“a”和“b”的值时,“c”并没有改变。话句话说就是,“a”和“b”的改变并没有响应到“c”。总结起来就是: 程序以流的形式,传递数据的改变

那我,我们又为什么需要响应式呢?

以下翻译自《Learning Reactive Programming with Java 8》

10-15年前,对于网站开发来说,最平常的日常工作就是进行维护和缩短响应时间,那么今天,一切程序都应该保证七天二十四小时不间断运行,并 且能够极快的做出响应;如果你的网站响应慢或者宕机,那么用户将会对你们真爱一秒变备胎,转而选择其他网站服务。当今的慢意味着不可用甚至是有故障的。如 今的互联网是在和大数据打交道,所以我们需要快速的处理数据。

过去的几年中HTTP错误已经不是什么新鲜事了,但是现在,我们不得不进行容错机制,还要提供用户易读以及合理的消息更新。

在过去,我们写简单的桌面应用,但如今我们写能够做出快速的响应Web应用。多数情况下,这些应用要与大量的远程服务器进行数据传递。

如果我们想让自己的软件保持竞争性,就不得不实现这些新需求,所以,换言之就是我们应该这样做:

  • 模块的/动态的:用这种方式,我们就能够拥有一个七天二十四小时的系统了,因为这些模块能够在不停止整个系统的情况下进行脱机和联机。另外,随着系统的不断庞大,还能帮助我们更好地组织应用结构,同时还能管理底层代码。
  • 可扩展的:用这种方式,我们就能够处理大量的数据和用户请求了。
  • 容错性:用这种方式,能够为用户提供稳定的系统。
  • 响应式:这不仅意味着快速,还意味着可用性强。

让我们思考如何实现它:

  • 如果我们的系统是事件驱动型的,那就把它模块化。我们可以将系统分成多个彼此之间通过通知进行交互的微服务/组件/模块。这样,我们就能够以通知为代表,响应系统的数据流了。
  • 可扩展意味着能够应对日益增长的数据,在负载的情况下不会崩溃。
  • 对故障/错误做出及时的响应,能够提高系统的容错性。
  • 响应意味着对能够对用户操作及时的做出反应。

如果应用是事件驱动型的,那么,它就能够解耦成多个自包含组件。这能够帮我们更好的实现扩展性,因为我们总是可以在不停掉或者打断系统的情况下 添加新组建或者移除旧组件。如果错误和故障传递给正确的组件,把它们当做通知来处理并作出响应,那么应用能变得更具有容错性和弹性。所以,如果把系统构建 成事件驱动型的。我们可以更容易的实现扩展性和容错性,而且一个具有扩展性,低耦合和防错的应用能够快速的响应用户操作。

Architecting Android with RxJava

Reactive Manifesto 文档定义了我们刚刚提到的四点响应式准则。每一个响应式系统都应该是消息驱动型(事件驱动型)的。这样它不仅能变得低耦合,而且扩展性和容错性将更高,这就意味着它可靠和具有响应式。

要注意的是, Reactive Manifesto 只是讲述了一个响应式系统,并不是对响应式编程的定义。当然,你也可以不使用任何响应式类库或者语言,打造一款弹性可扩展,具有消息驱动的响应式应用。

应用程序中数据的变化,以通知的方式传递给正确的Handler。所以,使用响应式写应用

回调地狱

如果你是一个能够时刻保持头脑清醒,逻辑清晰和思维缜密的人,是个Callback高手,善用并且能够用好FutureTask。

那么在Android中你的代码可能会频繁的使用async+callbacks,或者service composition+error handing,developer productivity。

那么关于异步回调的逻辑,你会写成这样getData(Callback<T>)、这样Future<T> getData(),还是这样Future<List<T>> getData(),甚至这样Future<List<Future<T>>> getData(),嗷!拜托,我简直不能再举例下去了,这简直就是 Callback Hell ,这样的程序或许写起来很舒服,但是如何测试和维护呢。

如果哪天你的程序出了问题而必须马上修复,但你能马上赶来或者需要别人协助(这在很多公司是很常见的),或者他人在review你的代码,那么,是时候拿出这张图了。

Architecting Android with RxJava

然而使用 RxJava 的操作符,我们可以避免这些烦人甚至糟糕的回调,让结构和思路看起来更清晰,通过组合API,只需要约定最终的结果Observable<T>就行了。

并且scheduler的出现,不仅解放了线程的切换,让UI线程与工作线程间的跳转变得简单,而且,它的API很丰,也提供了很多使用常见的建议,比如,适用计算任务的 Schedulers.computation( );处理密集IO任务的Schedulers.io( );以及 Schedulers.trampoline( )能够有效避免StackOverflowError,所以非常适合函数的递归调用。好了,我不再举例 了,因为 官方文档 已经给出了很详细的解释了,但是值得一提的是,如果使用Schedulers的工厂方法创建的Worker,一旦任务执行完毕,都应该调用worker.unsubscribe( )方法,然后转向之前定义的Scheduler实例上来。

当然 RxJava 的出现并不仅仅是为了解决回调地狱的。

这是我通过学习和不断地练习,一路走来很辛苦,总结的一些经验,分享给大家:

1 . error handling

2 . lifecycle changes

3 . caching (roation)

   @Override public void onCreate(Bundle savedInstanceState) {    super.onCreate(savedInstanceState);    setRetainInstance(true);      /*.cache()操作符:      当第一个subscribe订阅的时候,才会连接原始Observable,缓存事件,重发给后续订阅的subscribe      值得注意的事,它和使用了.replay().publish()操作符的ConnectableObservable的不同。      另外,为了避免内存开销,不建议缓存大量事件*/    cacheObservable = weatherManager.getWeather().cache();  }    @Override public void onViewCreated(View view, Bundle savedInstanceState) {    super.onViewCreated(view, savedInstanceState);    cacheObservable.subscribe(/*your subscribe*/);  }

4 . composing multiple calls

5 . more robust interface than asyncTask

6 . easy to do complex threading

7 . functional nature is more expressive

    /*一个数组,每个元素乘以2,然后筛选小于10的元素,放入集合中*/  Integer[] integers = { 0, 1, 2, 3, 4, 5 };    /*一般写法,看上去并不是那么的“函数”*/  Integer[] doubles = new Integer[integers.length];  for (int i = 0; i < integers.length; i++) {    doubles[i] = integers[i] * 2;  }  List<Integer> integerList = new ArrayList<>(doubles.length);  for (Integer integer : doubles) {    if (integer < 10) integerList.add(integer);  }      /*Observable写法,一切都好多了*/  List<Integer> funactionalList = Observable.from(integers).map(new Func1<Integer, Integer>() {    @Override public Integer call(Integer integer) {      return integer * 2;    }  }).filter(new Func1<Integer, Boolean>() {    @Override public Boolean call(Integer integer) {      return integer < 10;    }  }).toList().toBlocking().first();

8 . async unit testing

9 . fluent API

10 . easy debugging

//值得一提的是,关于@RxLogSubscriber要放在继承自Subscriber的类上  @RxLogSubscriber class MySubscriber extends Subscriber<Void> {    @Override public void onCompleted() {    }    @Override public void onError(Throwable e) {    }    @Override public void onNext(Void aVoid) {    }        }  //而不是实现Observer接口的类上  @RxLogSubscriber class MySubscriber implements Observer<Void> {    @Override public void onCompleted() {    }    @Override public void onError(Throwable e) {    }    @Override public void onNext(Void aVoid) {    }        }

当然,随着学习的深入,你会发现,收益不止如此。

在响应式编程中,应该牢记以下两点:

  • everything is a stream(一切皆流)

    Architecting Android with RxJava

  • don't break the chain(不要打断链式结构)

    Architecting Android with RxJava

谈谈Backpressure

Android这种嵌入式系统,尤其是生产者-消费者( producer-consumer )模式中,一定要小心Backpressure(背压,反压)的出现。一个宽泛的解释就是:事件产生的速度比消费快。一旦发生 overproducing,当你的链式结构不能承受数据压力的时候,就会抛出MissingBackpressureException异常。

在Android中最容易出现的Backpressure就是连续快速点击跳转界面、数据库查询、键盘输入,甚至联网等操作都有可能造成 Backpressure,可能有些情况并不会导致程序崩溃,但是会造成一些我们不想见到的小麻烦。那么一起来看看如何用RxJava解决 Backpressure ,OK,让我们的程序变得健壮起来吧。

groupBy操作符

在写这篇文章的时候,刚好看到一段代码,看来有必要说一说这个操作符了。

Architecting Android with RxJava

.groupBy( ) ,分组操作符,虽然目前这个项目中没有用到,但是我还是蛮喜欢它的,而且我看到很多人在使用,将原始Observable根据不同的key分组成多个GroupedObservable,由原始Observable发射(原始Observable的泛型将变成这样Observable<GroupedObservable<K, T>>),每一个GroupedObservable既是事件本身也是一个独立的Observable,每一个GroupedObservable发射一组原始Observable的事件子集。

引用自: GroupBy中文翻译

注意:groupBy将原始Observable分解为一个发射多个GroupedObservable的Observable,一旦有订阅, 每个GroupedObservable就开始缓存数据。因此,如果你忽略这些GroupedObservable中的任何一个,这个缓存可能形成一个潜 在的内存泄露。因此,如果你不想观察,也不要忽略GroupedObservable。你应该使用像take(0)这样会丢弃自己的缓存的操作符。如果你 取消订阅一个GroupedObservable,那个Observable将会终止。如果之后原始的Observable又发射了一个与这个 Observable的Key匹配的数据,groupBy将会为这个Key创建一个新的GroupedObservable。

那么问题恰恰出在 .take(n) 操作符上。

Architecting Android with RxJava

只返回前面指定的n项数据,然后发送完成通知,忽略后面的事件。

那么看一下这个例子:

Observable.just(0, 1, 2, 3, 4, 5).groupBy(new Func1<Integer, Boolean>() {    @Override public Boolean call(Integer integer) {      return integer % 2 == 0;    }  }).flatMap(new Func1<GroupedObservable<Boolean, Integer>, Observable<Integer>>() {    @Override    public Observable<Integer> call(GroupedObservable<Boolean, Integer> groupedObservable) {        return groupedObservable.getKey() ? groupedObservable.take(1) : groupedObservable;    }  }).subscribe(new Action1<Integer>() {    @Override public void call(Integer i) {      System.out.println(i);    }  });

输出结果:

然而在 1.0.0-RC5 之前的版本中,在GroupedObservable上使用.take(n)操作符将会在发送完n个事件后,对GroupedObservable进行 unsubscribe。并且GroupedObservable内部将会记录这个unsubscribed状态,然后忽略后面的事件。所以输出结果将是 这样的:

而在这之后的版本,使用.take(n)操作符,虽然也会发生unsubscribe,但是当原始再次Observable发送一个满足key的事件后,将会重新创建一个GroupedObservable,然后发送这个GroupedObservable,不会发生之前那样的,忽略后续事件的现象。

当然,不要忘记,对不感兴趣的GroupedObservable使用.take(0),来避免泄露。

所以,我的建议是,在使用RxJava之前看看官方文档或者change log。

关于RxWeather

我尽量减少对这个工程的文字描述。因为代码才是最好的老师。

通过对 Android技术栈,1#架构译文 )和 Android架构演化之路译文 )的解读和学习,按照架构和思路进行了实现,并且加入了RxBus。

关于REST API,我选择了 和风天气 ,而放弃了 Openweathermap 的理由如下:

  1. Openweathermap免费用户所在的服务器不稳定。

  2. 付费方面,和风天气更经济实惠。

但是和风天气目前并不支持同时查询多个地区的天气预报,也不支持根据经纬度查询天气预报。但是以后的事情谁又能说的准呢?

由于应用并不支持动态的上拉加载。所以,所有的列表展示结果,取决于 city.txt 文件。

我从Openweathermap给出的 资源 (下载city.list.json)中,整理需要的城市Json字符串,整合了经纬度,以备不时之需。

找到了一个通过Location查询所在地的API。

就这样基本实现了列表展示页 ListActivity 的功能:

  1. 根据Loaction查询所在地城市名称,然后查询当地天气。

  2. 读取domain->assets->city.txt,然后依次查询每个城市的天气,所以,这个文件不建议放入太多json。

  3. 整合1和2的并发请求结果,显示界面。

详情页 DetailActivity 通过 RxBus 发送黏性事件接收列表页传递过来的数据,然后进行展示。这里会有七天内的天气以及穿衣建议。由于我么并没有找到一个正确的算法,所以当进入详情页后,旋转 屏幕之后的退出动画会有所不同。这个类涉及的代码大部分都是动画(注意Hardware Layer的使用)以及对屏幕旋转的处理,所以代码看起有点多。 ForkView 使用了一个简单的自定义Behavior。

搜索界面 SearchActivity ,输入的关键字请不要以市、区结尾,例如,北京而不是北京市,因为API不支持,我也没办法 :( 。

启动页

我认为,出彩的引导页是对细节的重视,但是我实在不能忍受,在启动页等太久。注意:不要混淆这两种场景。

所以,我在看了 正确使用启动页 之后,决定采取这种方式实现 SplashActivity 。而且不建议使用大图,一个icon足以。

Code

All the code is available on Github .

片尾Tips:

文章开头提到的资料,需要pdf或者kindle版本的请自行选择下载,不得用于商业用途。

Learning Reactive Programming with Java 8 - pdf版,提取密码:2d88。

Learning Reactive Programming with Java 8 - kindle版,提取密码:5nec。

RxJava Essentials - pdf版,提取密码:z3r8。

RxJava Essentials - kindle版,提取密码:l67e。