ReactiveX框架(基于RxJava)实现原理浅析

AudryXHPV 9年前

来自: http://blog.kifile.com/reactivex/2015/12/07/rx_introduce.html

前言

先简单介绍一下 ReactiveX.

ReactiveX 并不特指某种编程语言,他应该算是一种编程思维,反应式编程.

反应式编程的核心在于,当触发特定行为逻辑后(对于ReactiveX而言,就是调用了 subscribe 指令),根据进行指定操作,并根据操作结果执行特定操作. 这种编程思维特别适合用于交互式软件上,例如Android,iOS,通常用户触发某个条件(比如说点击操作)后,我们需要根据用户的操作行为, 可能接下来要执行一系列操作,最后再根据操作结果在ui界面上呈现给用户.而ReactiveX 为我们提供了这种交互流程的封装.

下面以RxJava为例,分析一下ReactiveX的框架实现原理.

RxJava执行流程

首先奉上一个最简单的 ReactiveX 的代码实现.

 1 Observable.create(new Observable.OnSubscribe<String>() {   2     @Override   3     public void call(Subscriber<? super String> subscriber) {   4         subscriber.onNext("Sample");   5         subscriber.onCompleted();   6     }   7 }).subscribe(new Observer<String>() {   8     @Override   9     public void onCompleted() {  10         System.out.println("Complete");  11     }  12   13     @Override  14     public void onError(Throwable e) {  15         e.printStackTrace();  16     }  17   18     @Override  19     public void onNext(String s) {  20         System.out.println("Get:" + s);  21     }  22 });

运行上面的代码,我们可以看到以下输出.

Get:Sample  Complete

代码执行完毕,让我们看看整个流程的实现逻辑.

 1 private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {   2     ...   3     subscriber.onStart();   4     ...   5     try {   6         hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);   7         return hook.onSubscribeReturn(subscriber);   8     } catch (Throwable e) {   9         Exceptions.throwIfFatal(e);  10         try {  11             subscriber.onError(hook.onSubscribeError(e));  12         } catch (Throwable e2) {  13             Exceptions.throwIfFatal(e2);  14             // if this happens it means the onError itself failed (perhaps an invalid function implementation)  15             // so we are unable to propagate the error correctly and will just throw  16             RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);  17             // TODO could the hook be the cause of the error in the on error handling.  18             hook.onSubscribeError(r);  19             // TODO why aren't we throwing the hook's return value.  20             throw r;  21         }  22         return Subscriptions.unsubscribed();  23     }  24 }

在上面的代码里,会发现一个 hook 对象,这是个什么鬼?

追踪一下,发现原来他是一个RxJava每个方法都会返回一个Observable对象ExecutionHook对象,类图如下:

可以看出,RxJavaObservableExecutionHook中针对RxJava的subscribe流程进行注入,方便自己更改相关逻辑, 当然对于默认的RxJavaObservableExecutionHook,你会发现他并没有做任何处理,如果你想自己实现可以调用 RxJavaPlugin.getInstance() 设置自定义Hook.

看完上面的例子,给人感觉挺简单地啊,而且比较类似Android中的AsyncTask,都是属于执行任务后进行回调,那他相比AsyncTask有什么优势吗?

Operator

虽然从上面的例子中,看起来RxJava的确其貌不扬,但是ReactiveX也不止这点技法.

为了扩展ReactiveX的相关属性,在 RxJava 中使用代理模式实现了很多有用的逻辑,例如类型转换,遍历数据个数限制,定时响应等额外特性.

这些逻辑被称为操作(Operator),每一个Operator都继承了Func1(也就是内部会有一个call()方法),RxJava框架会调用lift方法将Operator包装成为Observable:

 1 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {   2     return new Observable<R>(new OnSubscribe<R>() {   3         @Override   4         public void call(Subscriber<? super R> o) {   5             try {   6                 Subscriber<? super T> st = hook.onLift(operator).call(o);   7                 try {   8                     // new Subscriber created and being subscribed with so 'onStart' it   9                     st.onStart();  10                     onSubscribe.call(st);  11                 } catch (Throwable e) {  12                     // localized capture of errors rather than it skipping all operators  13                     // and ending up in the try/catch of the subscribe method which then  14                     // prevents onErrorResumeNext and other similar approaches to error handling  15                     Exceptions.throwIfFatal(e);  16                     st.onError(e);  17                 }  18             } catch (Throwable e) {  19                 Exceptions.throwIfFatal(e);  20                 // if the lift function failed all we can do is pass the error to the final Subscriber  21                 // as we don't have the operator available to us  22                 o.onError(e);  23             }  24         }  25     });  26 }

如果想了解更多详细的操作信息,可以点击这里: Operators

其实这里有个问题,既然大部分的Operator都需要这样封装,为什么不直接让Operator对象继承OnSubscribe对象,进而减少方法调用层级?

如果是为了防止OnSubscribe和Func1的方法重名,那么更改函数名就好了啊? 如果为了进行onLift回调,也可以在新类中增加回调调用位置啊?

同时为了避免代码冗余,对于这些方法,RxJava都使用了构造者模式的一种变体,每个方法都会返回一个Observable对象,保证其能够形成类似下面这样的操作链.

 1 Observable.just("Hello", "Operator", "Chain").map(s -> s + " map" )   2     .buffer(2).take(1).subscribe(new Subscriber<List<String>>() {   3         @Override   4         public void onCompleted() {   5             System.out.println("Complete");   6         }   7    8         @Override   9         public void onError(Throwable e) {  10   11         }  12   13         @Override  14         public void onNext(List<String> strings) {  15             System.out.println("Get:" + strings);  16         }  17 });

输出结果如下:

Get:[Hello map, Operator map]  Complete

可以看到,我们输入了三个String对象,但是只一次输入了两个String,并且两个String 都额外多了一个 map 后缀,当然这也是我想要的结果

使用Single替代Observable

其实在绝大部分的使用场景中,用户触发操作后,对于我们而言返回结果其实只有成功失败两种,而Observable中是有 onNext , onComplete , onError 三种状态,这样看来似乎不太满足需求.

当然ReactiveX中也考虑到了这种情况,它在Observable的基础上衍生出了Single类,这个类的实现机制同Observable近乎相同,只是不过订阅他的不再是 Subscriber 对象,而是 SingleSubscriber .

在 SingleSubscriber 中有 onSuccess 和 onError 两种结果状态,并且只会调用其中一个,恰好满足我们的需求.

使用Scheduler进行多线程调用

按照上面所说,不论是Observable还是Single,其实都是在同一个线程中,不断按照执行逻辑执行代码指令,也就是说他始终是在同一线程中进行执行的.

但是有时候,我们希望能够在异步线程中执行耗时操作,避免ui堵塞,这时候ReactiveX就为我们提供了 Scheduler 类.

Scheduler类其实并不负责异步线程处理,他只负责通过 createWorker() 类创建出一个 Worker 对象,真正负责任务的延时处理.

每一个Scheduler类,都会实现自己的Worker类,用于执行Scheduler任务.

我们可以使用ReactiveX中的 subscribeOn 和 observeOn 两个方法,分别设置获取数据的操作和分发消息的操作的执行Scheduler,从而实现数据的异步处理.

subscribeOn 和 observeOn 其实都是构建一个Operator对象,在call方法里,使用线程执行数据获取和分发操作.

总结

其实ReactiveX就是一个针对观察者模式的扩展,如果忽略掉ReactiveX框架为我们实现各种的Operator,那么它就是一个简单的设计模式而已.

单就这一点而言,Android的AsyncTask和LoaderManager框架是胜过ReactiveX的,因为他针对Android的生命周期做了处理.

但由于ReactiveX中增加了很多Operator,能够很方便的帮助我们对响应式任务进行操作,不论是类型转换还是异步执行.

反而导致看起来ReactiveX中的比AsyncTask要好用,但我觉得如果吧ReactiveX结合到AsyncTask和LoaderManager中应该会更加的完美.