ReactiveX框架(基于RxJava)实现原理浅析
来自: http://blog.kifile.com/reactivex/2015/12/07/rx_introduce.html
前言
先简单介绍一下 ReactiveX.
ReactiveX 并不特指某种编程语言,他应该算是一种编程思维,反应式编程.
反应式编程的核心在于,当触发特定行为逻辑后(对于ReactiveX而言,就是调用了 subscribe 指令),根据进行指定操作,并根据操作结果执行特定操作. 这种编程思维特别适合用于交互式软件上,例如Android,iOS,通常用户触发某个条件(比如说点击操作)后,我们需要根据用户的操作行为, 可能接下来要执行一系列操作,最后再根据操作结果在ui界面上呈现给用户.而ReactiveX 为我们提供了这种交互流程的封装.
下面以RxJava为例,分析一下ReactiveX的框架实现原理.
RxJava执行流程
首先奉上一个最简单的 ReactiveX 的代码实现.
1 Observable.create(new Observable.OnSubscribe<String>() { 2 @Override 3 public void call(Subscriber<? super String> subscriber) { 4 subscriber.onNext("Sample"); 5 subscriber.onCompleted(); 6 } 7 }).subscribe(new Observer<String>() { 8 @Override 9 public void onCompleted() { 10 System.out.println("Complete"); 11 } 12 13 @Override 14 public void onError(Throwable e) { 15 e.printStackTrace(); 16 } 17 18 @Override 19 public void onNext(String s) { 20 System.out.println("Get:" + s); 21 } 22 });
运行上面的代码,我们可以看到以下输出.
Get:Sample Complete
代码执行完毕,让我们看看整个流程的实现逻辑.
1 private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { 2 ... 3 subscriber.onStart(); 4 ... 5 try { 6 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); 7 return hook.onSubscribeReturn(subscriber); 8 } catch (Throwable e) { 9 Exceptions.throwIfFatal(e); 10 try { 11 subscriber.onError(hook.onSubscribeError(e)); 12 } catch (Throwable e2) { 13 Exceptions.throwIfFatal(e2); 14 // if this happens it means the onError itself failed (perhaps an invalid function implementation) 15 // so we are unable to propagate the error correctly and will just throw 16 RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); 17 // TODO could the hook be the cause of the error in the on error handling. 18 hook.onSubscribeError(r); 19 // TODO why aren't we throwing the hook's return value. 20 throw r; 21 } 22 return Subscriptions.unsubscribed(); 23 } 24 }
在上面的代码里,会发现一个 hook 对象,这是个什么鬼?
追踪一下,发现原来他是一个RxJava每个方法都会返回一个Observable对象ExecutionHook对象,类图如下:
可以看出,RxJavaObservableExecutionHook中针对RxJava的subscribe流程进行注入,方便自己更改相关逻辑, 当然对于默认的RxJavaObservableExecutionHook,你会发现他并没有做任何处理,如果你想自己实现可以调用 RxJavaPlugin.getInstance() 设置自定义Hook.
看完上面的例子,给人感觉挺简单地啊,而且比较类似Android中的AsyncTask,都是属于执行任务后进行回调,那他相比AsyncTask有什么优势吗?
Operator
虽然从上面的例子中,看起来RxJava的确其貌不扬,但是ReactiveX也不止这点技法.
为了扩展ReactiveX的相关属性,在 RxJava 中使用代理模式实现了很多有用的逻辑,例如类型转换,遍历数据个数限制,定时响应等额外特性.
这些逻辑被称为操作(Operator),每一个Operator都继承了Func1(也就是内部会有一个call()方法),RxJava框架会调用lift方法将Operator包装成为Observable:
1 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { 2 return new Observable<R>(new OnSubscribe<R>() { 3 @Override 4 public void call(Subscriber<? super R> o) { 5 try { 6 Subscriber<? super T> st = hook.onLift(operator).call(o); 7 try { 8 // new Subscriber created and being subscribed with so 'onStart' it 9 st.onStart(); 10 onSubscribe.call(st); 11 } catch (Throwable e) { 12 // localized capture of errors rather than it skipping all operators 13 // and ending up in the try/catch of the subscribe method which then 14 // prevents onErrorResumeNext and other similar approaches to error handling 15 Exceptions.throwIfFatal(e); 16 st.onError(e); 17 } 18 } catch (Throwable e) { 19 Exceptions.throwIfFatal(e); 20 // if the lift function failed all we can do is pass the error to the final Subscriber 21 // as we don't have the operator available to us 22 o.onError(e); 23 } 24 } 25 }); 26 }
如果想了解更多详细的操作信息,可以点击这里: Operators
其实这里有个问题,既然大部分的Operator都需要这样封装,为什么不直接让Operator对象继承OnSubscribe对象,进而减少方法调用层级?
如果是为了防止OnSubscribe和Func1的方法重名,那么更改函数名就好了啊? 如果为了进行onLift回调,也可以在新类中增加回调调用位置啊?
同时为了避免代码冗余,对于这些方法,RxJava都使用了构造者模式的一种变体,每个方法都会返回一个Observable对象,保证其能够形成类似下面这样的操作链.
1 Observable.just("Hello", "Operator", "Chain").map(s -> s + " map" ) 2 .buffer(2).take(1).subscribe(new Subscriber<List<String>>() { 3 @Override 4 public void onCompleted() { 5 System.out.println("Complete"); 6 } 7 8 @Override 9 public void onError(Throwable e) { 10 11 } 12 13 @Override 14 public void onNext(List<String> strings) { 15 System.out.println("Get:" + strings); 16 } 17 });
输出结果如下:
Get:[Hello map, Operator map] Complete
可以看到,我们输入了三个String对象,但是只一次输入了两个String,并且两个String 都额外多了一个 map 后缀,当然这也是我想要的结果
使用Single替代Observable
其实在绝大部分的使用场景中,用户触发操作后,对于我们而言返回结果其实只有成功失败两种,而Observable中是有 onNext , onComplete , onError 三种状态,这样看来似乎不太满足需求.
当然ReactiveX中也考虑到了这种情况,它在Observable的基础上衍生出了Single类,这个类的实现机制同Observable近乎相同,只是不过订阅他的不再是 Subscriber 对象,而是 SingleSubscriber .
在 SingleSubscriber 中有 onSuccess 和 onError 两种结果状态,并且只会调用其中一个,恰好满足我们的需求.
使用Scheduler进行多线程调用
按照上面所说,不论是Observable还是Single,其实都是在同一个线程中,不断按照执行逻辑执行代码指令,也就是说他始终是在同一线程中进行执行的.
但是有时候,我们希望能够在异步线程中执行耗时操作,避免ui堵塞,这时候ReactiveX就为我们提供了 Scheduler 类.
Scheduler类其实并不负责异步线程处理,他只负责通过 createWorker() 类创建出一个 Worker 对象,真正负责任务的延时处理.
每一个Scheduler类,都会实现自己的Worker类,用于执行Scheduler任务.
我们可以使用ReactiveX中的 subscribeOn 和 observeOn 两个方法,分别设置获取数据的操作和分发消息的操作的执行Scheduler,从而实现数据的异步处理.
subscribeOn 和 observeOn 其实都是构建一个Operator对象,在call方法里,使用线程执行数据获取和分发操作.
总结
其实ReactiveX就是一个针对观察者模式的扩展,如果忽略掉ReactiveX框架为我们实现各种的Operator,那么它就是一个简单的设计模式而已.
单就这一点而言,Android的AsyncTask和LoaderManager框架是胜过ReactiveX的,因为他针对Android的生命周期做了处理.
但由于ReactiveX中增加了很多Operator,能够很方便的帮助我们对响应式任务进行操作,不论是类型转换还是异步执行.
反而导致看起来ReactiveX中的比AsyncTask要好用,但我觉得如果吧ReactiveX结合到AsyncTask和LoaderManager中应该会更加的完美.