自己动手实现 RxJava 理解其调用链
jerry007
8年前
<p>RxJava 拥有繁多的 API 和复杂的逻辑链,学习复杂的知识,一般从整体再到具体,为了学习 RxJava 的原理,参考其源码,自己动手实现一个简化的 RxJava,旨在理解调用链</p> <p>阅读本文,建议先下载代码 LittleRx ,毕竟在IDE里阅读代码比在网页上要清晰得多,也可以看下打印的日志</p> <p>最主要的4个类:Observable、OnSubscribe、Operator、Subscriber</p> <h3><strong>1、最简单的,创建一个Observable,然后订阅</strong></h3> <pre> <code class="language-java">Observable .create(new OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); } }) .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer integer) { System.out.println(integer); } }); public class Observable<T> { private OnSubscribe<T> onSubscribe; private Observable(OnSubscribe<T> onSubscribe) { this.onSubscribe = onSubscribe; } public final void subscribe(Subscriber<? super T> subscriber) { onSubscribe.call(subscriber); } public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) { return new Observable<>(onSubscribe); } }</code></pre> <p>这里可以看出 subscribe(subscriber)-->onSubscribe.call(subscriber),所以没有订阅动作就不会触发 OnSubscribe.call()</p> <h3><strong>2、map 和 lift</strong></h3> <pre> <code class="language-java">Observable .create(new OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); } }) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return "map" + integer; } }) .subscribe(new Subscriber<String>() { @Override public void onNext(String s) { System.out.println(s); } });</code></pre> <p>非链式的写法</p> <pre> <code class="language-java">OnSubscribe<Integer> onSubscribe = new OnSubscribe<>(); Observable<Integer> observable = Observable.create(onSubscribe); Func1<Integer, String> func = new Func1<>(); Observable<String> observable2 = observable.map(func); Subscriber<String> subscriber = new Subscriber<>(); observable2.subscribe(subscriber);</code></pre> <p>create() 跟之前一样,那么 map() 做了什么</p> <pre> <code class="language-java">public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); } public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }</code></pre> <p>lift() 根据上一个 observable 的 onSubscribe 创建一个新的 OnSubscribeLift 返回一个新的 observable2,上面我们说过 subscribe(subscriber)-->onSubscribe.call(subscriber),所以我们接着看 OnSubscribeLift</p> <pre> <code class="language-java">public class OnSubscribeLift<T, R> implements OnSubscribe<R> { final OnSubscribe<T> parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } // 先不用关心具体实现,下面讲到再看 @Override public void call(Subscriber<? super R> r) { Subscriber<? super T> st = operator.call(r); // 这个 operator 就是 OperatorMap parent.call(st); // parent 就是第一个 observable 的 onSubscribe } }</code></pre> <p>再看下 OperatorMap</p> <pre> <code class="language-java">public final class OperatorMap<T, R> implements Operator<R, T> { final Func1<? super T, ? extends R> transformer; public OperatorMap(Func1<? super T, ? extends R> transformer) { this.transformer = transformer; } // 先不用关心具体实现,下面讲到再看 @Override public Subscriber<? super T> call(final Subscriber<? super R> o) { return new MapSubscriber<T, R>(o, transformer); } private class MapSubscriber<T, R> extends Subscriber<T> { private Subscriber<? super R> actual; private Func1<? super T, ? extends R> transformer; public MapSubscriber(Subscriber<? super R> o, Func1<? super T, ? extends R> transformer) { this.actual = o; this.transformer = transformer; } // 先不用关心具体实现,下面讲到再看 @Override public void onNext(T t) { R r = transformer.call(t); actual.onNext(r); } } }</code></pre> <p>我们把 map() 和 lift() 都去掉,使用最基本的类来实现</p> <pre> <code class="language-java">OnSubscribe<Integer> onSubscribe = new OnSubscribe<>(); Observable<Integer> observable = new Observable<>(onSubscribe); Func1<Integer, String> func = new Func1<>(); OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func); OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap); Observable<String> observable2 = new Observable<>(onSubscribe2); Subscriber<String> subscriber = new Subscriber<>(); observable2.subscribe(subscriber);</code></pre> <p>到这里,清楚了如何把第一个 Observable<Integer> 转成 Observable<String>,包括 OnSubscribe<Integer> onSubscribe 和 OnSubscribeLift<Integer, String> onSubscribe2 的关系</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/9a44af7191e3d5e4d6dd3ee6b9a94cd9.png"></p> <p>那么最终的 subscribe() 如何调用到第一个 observable.call(Subscriber<Integer>) 里面的 Subscriber<Integer>.onNext(Integer) 又如何调用到最终的订阅者 subscriber<String>().onNext(String)</p> <p>1) observable2.subscribe(subscriber) --><br> 2) onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) --><br> 3) Subscriber<Integer> st = operatorMap.call(subscriber) 即<br> 4) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)<br> 5) parent.call(st) 即 onSubscribe.call(st) --><br> 6) st.onNext(1) 即 MapSubscriber.onNext(1) --><br> 7) String string = func.call(1)<br> 8) subscriber.onNext(string)</p> <p>至此 Observable.create().map().subscribe() 的调用链就分析完了</p> <p>很多操作符本质都是 lift(),以此类推,lift() 2次</p> <p style="text-align:center"><img src="https://simg.open-open.com/show/60421851426535118eb239b6ae5021b3.png"></p> <h3><strong>3、subscribeOn</strong></h3> <p>Scheduler 内部比较繁杂,我们简化下,把 subscribeOn(Scheduler) 简化成 subscribeOnIO()</p> <pre> <code class="language-java">Observable .create(new OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); } }) .subscribeOnIO() .subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer integer) { System.out.println(integer); } });</code></pre> <p>如何实现 subscribeOnIO() 让第一个 observable 的 onSubscribe 运行在子线程</p> <pre> <code class="language-java">public final Observable<T> subscribeOnIO() { return create(new OnSubscribeOnIO<T>(this)); } public final class OnSubscribeOnIO<T> implements OnSubscribe<T> { private static ExecutorService threadPool = Executors.newCachedThreadPool(); final Observable<T> source; public OnSubscribeOnIO(Observable<T> source) { this.source = source; } @Override public void call(final Subscriber<? super T> subscriber) { Runnable runnable = new Runnable() { @Override public void run() { source.subscribe(subscriber); // --> onSubscribe.call(subscriber) --> subscriber.onNext(1) } }; threadPool.submit(runnable); } }</code></pre> <p>从上面看出 subscribeOnIO() 新建了一个线程并加入 CachedThreadPool,在子线程里订阅上一个 Observable,后续的调用都在这个线程里完成</p> <p>再考虑下复杂点的,加入 map()</p> <pre> <code class="language-java">Observable .create(new OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { System.out.println(Thread.currentThread()); subscriber.onNext(1); } }) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { System.out.println(Thread.currentThread()); return "map" + integer; } }) .subscribeOnIO() .subscribe(new Subscriber<String>() { @Override public void onNext(String s) { System.out.println(Thread.currentThread()); System.out.println(s); } });</code></pre> <p>非链式的写法</p> <pre> <code class="language-java">OnSubscribe<Integer> onSubscribe = new OnSubscribe<>(); Observable<Integer> observable = new Observable<>(onSubscribe); Func1<Integer, String> func = new Func1<>(); OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func); OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap); Observable<String> observable2 = new Observable<>(onSubscribe2); OnSubscribeOnIO<String> onSubscribe3 = new OnSubscribeOnIO(observable2); Observable<String> observable3 = new Observable<>(onSubscribe3); Subscriber<String> subscriber = new Subscriber<>(); observable3.subscribe(subscriber);</code></pre> <p>1) observable3.subscribe(subscriber) --><br> 2) onSubscribe3.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) --><br> 3) 子线程 new Runnable(){} --> observable2.subscribe(subscriber)<br> 4) onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) --><br> 5) Subscriber<Integer> st = operatorMap.call(subscriber) 即<br> 6) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)<br> 7) parent.call(st) 即 onSubscribe.call(st) --><br> 8) st.onNext(1) 即 MapSubscriber.onNext(1) --><br> 9) String string = func.call(1)<br> 10) subscriber.onNext(string)</p> <p>那要是把 map() 与 subscribeOnIO() 换下位置呢</p> <pre> <code class="language-java">OnSubscribe<Integer> onSubscribe = new OnSubscribe<>(); Observable<Integer> observable = new Observable<>(onSubscribe); OnSubscribeOnIO<Integer> onSubscribe2 = new OnSubscribeOnIO(observable); Observable<Integer> observable2 = new Observable<>(onSubscribe2); Func1<Integer, String> func = new Func1<>(); OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func); OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscrib2, operatorMap); Observable<String> observable3 = new Observable<>(onSubscribe3); Subscriber<String> subscriber = new Subscriber<>(); observable3.subscribe(subscriber);</code></pre> <p>1) observable3.subscribe(subscriber) --><br> 2) onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) --><br> 3) Subscriber<Integer> st = operatorMap.call(subscriber) 即<br> 4) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)<br> 5) parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeOnIO.call(st) --><br> 6) 子线程 new Runnable(){} --> observable.subscribe(st) --><br> 7) onSubscribe.call(st) --><br> 7) st.onNext(1) 即 MapSubscriber.onNext(1) --><br> 8) String string = func.call(1)<br> 9) subscriber.onNext(string)</p> <p>看得出来,不管 subscribeOnIO() 在哪,第一个 onSubscribe.call() 总是运行在子线程</p> <h3>4、observeOn</h3> <p>先看下 demo 最终写法</p> <pre> <code class="language-java">Handler handler = new Handler(); Observable .create(new OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); } }) .observeOn(handler) .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return "map" + integer; } }) .subscribeOnIO() .subscribe(new Subscriber<String>() { @Override public void onNext(String s) { System.out.println(s); } }); handler.loop(); //队列没有消息时会挂起当前线程,直到收到新的消息</code></pre> <p>同样我们也自己实现一个简单的可以切换回主线程的 observeOn(Handler)</p> <pre> <code class="language-java">public class Observable<T> { ... public final Observable<T> observeOn(Handler handler) { return lift(new OperatorObserveOn<T>(handler)); } }</code></pre> <p>OperatorObserveOn</p> <pre> <code class="language-java">public final class OperatorObserveOn<T> implements Operator<T, T> { private Handler handler; public OperatorObserveOn(Handler handler) { this.handler = handler; } @Override public Subscriber<? super T> call(final Subscriber<? super T> subscriber) { Subscriber<T> s = new Subscriber<T>() { @Override public void onNext(final T t) { handler.post(new Runnable() { @Override public void run() { subscriber.onNext(t); } }); } }; return s; } }</code></pre> <p>自定义Handler</p> <pre> <code class="language-java">public class Handler { private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10); public void loop() { for (; ; ) { Runnable runnable; try { runnable = queue.take();// 没有数据则一直阻塞,直到有数据自动唤醒 } catch (InterruptedException e) { return; } if (runnable == null) { return; } runnable.run(); } } public void post(Runnable runnable) { try { queue.put(runnable);// 没有空间则一直阻塞,直到有空间 } catch (InterruptedException e) { return; } } }</code></pre> <p>非链式写法</p> <pre> <code class="language-java">OnSubscribe<Integer> onSubscribe = new OnSubscribe<>(); Observable<Integer> observable = new Observable<>(onSubscribe); OperatorObserveOn<Integer> operatorObserveOn = new OperatorObserveOn(handler); OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorObserveOn); Func1<Integer, String> func = new Func1<>(); OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func); OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscribe2, operatorMap); Observable<String> observable2 = new Observable<>(onSubscribe3); OnSubscribeOnIO<String> onSubscribe4 = new OnSubscribeOnIO(observable2); Observable<String> observable3 = new Observable<>(onSubscribe4); Subscriber<String> subscriber = new Subscriber<>(); observable3.subscribe(subscriber);</code></pre> <p>1) observable3.subscribe(subscriber) --><br> 2) onSubscribe4.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) --><br> 3) 子线程 new Runnable(){} --> observable2.subscribe(subscriber)<br> 4) onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) --><br> 5) Subscriber<Integer> st = operatorMap.call(subscriber) 即<br> 6) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)<br> 7) parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeLift.call(st)--><br> 8) Subscriber<Integer> st2 = operatorObserveOn.call(st) --><br> 9) parent.call(st2) 即 onSubscribe.call(st2) --><br> 8) st2.onNext(1) --> // onNext()里面切换到Handler所在线程<br> 9) st.onNext(1) --><br> 9) String string = func.call(1)<br> 10) subscriber.onNext(string)</p> <h3><strong>5、其他</strong></h3> <p>总的来说,调用链确实有点复杂,不过也还是可以接受的,一个调用链花点时间想想还是能清楚,只是每碰到一个调用链都要花点时间才能想清楚,还没能力能在几秒内就能想清楚,只能是多想多锻炼了。比如想想上面的,如果把 observeOn(handler) 放在 map() 后面呢</p> <p> </p> <p>来自:http://www.jianshu.com/p/6558ac156bae</p> <p> </p>