自己动手实现 RxJava 理解其调用链

jerry007 8年前
   <p>RxJava 拥有繁多的 API 和复杂的逻辑链,学习复杂的知识,一般从整体再到具体,为了学习 RxJava 的原理,参考其源码,自己动手实现一个简化的 RxJava,旨在理解调用链</p>    <p>阅读本文,建议先下载代码 LittleRx ,毕竟在IDE里阅读代码比在网页上要清晰得多,也可以看下打印的日志</p>    <p>最主要的4个类:Observable、OnSubscribe、Operator、Subscriber</p>    <h3><strong>1、最简单的,创建一个Observable,然后订阅</strong></h3>    <pre>  <code class="language-java">Observable          .create(new OnSubscribe<Integer>() {              @Override              public void call(Subscriber<? super Integer> subscriber) {                  subscriber.onNext(1);              }          })          .subscribe(new Subscriber<Integer>() {              @Override              public void onNext(Integer integer) {                  System.out.println(integer);              }          });    public class Observable<T> {      private OnSubscribe<T> onSubscribe;        private Observable(OnSubscribe<T> onSubscribe) {          this.onSubscribe = onSubscribe;      }        public final void subscribe(Subscriber<? super T> subscriber) {          onSubscribe.call(subscriber);      }        public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) {          return new Observable<>(onSubscribe);      }  }</code></pre>    <p>这里可以看出 subscribe(subscriber)-->onSubscribe.call(subscriber),所以没有订阅动作就不会触发 OnSubscribe.call()</p>    <h3><strong>2、map 和 lift</strong></h3>    <pre>  <code class="language-java">Observable          .create(new OnSubscribe<Integer>() {              @Override              public void call(Subscriber<? super Integer> subscriber) {                  subscriber.onNext(1);              }          })          .map(new Func1<Integer, String>() {              @Override              public String call(Integer integer) {                  return "map" + integer;              }          })          .subscribe(new Subscriber<String>() {              @Override              public void onNext(String s) {                  System.out.println(s);              }          });</code></pre>    <p>非链式的写法</p>    <pre>  <code class="language-java">OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();  Observable<Integer> observable = Observable.create(onSubscribe);  Func1<Integer, String> func = new Func1<>();  Observable<String> observable2 = observable.map(func);  Subscriber<String> subscriber = new Subscriber<>();  observable2.subscribe(subscriber);</code></pre>    <p>create() 跟之前一样,那么 map() 做了什么</p>    <pre>  <code class="language-java">public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {      return lift(new OperatorMap<T, R>(func));  }    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {      return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));  }</code></pre>    <p>lift() 根据上一个 observable 的 onSubscribe 创建一个新的 OnSubscribeLift 返回一个新的 observable2,上面我们说过 subscribe(subscriber)-->onSubscribe.call(subscriber),所以我们接着看 OnSubscribeLift</p>    <pre>  <code class="language-java">public class OnSubscribeLift<T, R> implements OnSubscribe<R> {        final OnSubscribe<T> parent;      final Operator<? extends R, ? super T> operator;        public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {          this.parent = parent;          this.operator = operator;      }        // 先不用关心具体实现,下面讲到再看      @Override      public void call(Subscriber<? super R> r) {          Subscriber<? super T> st = operator.call(r); // 这个 operator 就是 OperatorMap          parent.call(st); // parent 就是第一个 observable 的 onSubscribe      }  }</code></pre>    <p>再看下 OperatorMap</p>    <pre>  <code class="language-java">public final class OperatorMap<T, R> implements Operator<R, T> {        final Func1<? super T, ? extends R> transformer;        public OperatorMap(Func1<? super T, ? extends R> transformer) {          this.transformer = transformer;      }        // 先不用关心具体实现,下面讲到再看      @Override      public Subscriber<? super T> call(final Subscriber<? super R> o) {          return new MapSubscriber<T, R>(o, transformer);      }        private class MapSubscriber<T, R> extends Subscriber<T> {            private Subscriber<? super R> actual;          private Func1<? super T, ? extends R> transformer;            public MapSubscriber(Subscriber<? super R> o, Func1<? super T, ? extends R> transformer) {              this.actual = o;              this.transformer = transformer;          }            // 先不用关心具体实现,下面讲到再看          @Override          public void onNext(T t) {              R r = transformer.call(t);              actual.onNext(r);          }      }    }</code></pre>    <p>我们把 map() 和 lift() 都去掉,使用最基本的类来实现</p>    <pre>  <code class="language-java">OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();  Observable<Integer> observable = new Observable<>(onSubscribe);  Func1<Integer, String> func = new Func1<>();    OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);    OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap);    Observable<String> observable2 = new Observable<>(onSubscribe2);    Subscriber<String> subscriber = new Subscriber<>();  observable2.subscribe(subscriber);</code></pre>    <p>到这里,清楚了如何把第一个 Observable<Integer> 转成 Observable<String>,包括 OnSubscribe<Integer> onSubscribe 和 OnSubscribeLift<Integer, String> onSubscribe2 的关系</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/9a44af7191e3d5e4d6dd3ee6b9a94cd9.png"></p>    <p>那么最终的 subscribe() 如何调用到第一个 observable.call(Subscriber<Integer>) 里面的 Subscriber<Integer>.onNext(Integer) 又如何调用到最终的订阅者 subscriber<String>().onNext(String)</p>    <p>1) observable2.subscribe(subscriber) --><br> 2) onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) --><br> 3) Subscriber<Integer> st = operatorMap.call(subscriber) 即<br> 4) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)<br> 5) parent.call(st) 即 onSubscribe.call(st) --><br> 6) st.onNext(1) 即 MapSubscriber.onNext(1) --><br> 7) String string = func.call(1)<br> 8) subscriber.onNext(string)</p>    <p>至此 Observable.create().map().subscribe() 的调用链就分析完了</p>    <p>很多操作符本质都是 lift(),以此类推,lift() 2次</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/60421851426535118eb239b6ae5021b3.png"></p>    <h3><strong>3、subscribeOn</strong></h3>    <p>Scheduler 内部比较繁杂,我们简化下,把 subscribeOn(Scheduler) 简化成 subscribeOnIO()</p>    <pre>  <code class="language-java">Observable          .create(new OnSubscribe<Integer>() {              @Override              public void call(Subscriber<? super Integer> subscriber) {                  subscriber.onNext(1);              }          })          .subscribeOnIO()          .subscribe(new Subscriber<Integer>() {              @Override              public void onNext(Integer integer) {                  System.out.println(integer);              }          });</code></pre>    <p>如何实现 subscribeOnIO() 让第一个 observable 的 onSubscribe 运行在子线程</p>    <pre>  <code class="language-java">public final Observable<T> subscribeOnIO() {      return create(new OnSubscribeOnIO<T>(this));  }    public final class OnSubscribeOnIO<T> implements OnSubscribe<T> {        private static ExecutorService threadPool = Executors.newCachedThreadPool();        final Observable<T> source;        public OnSubscribeOnIO(Observable<T> source) {          this.source = source;      }        @Override      public void call(final Subscriber<? super T> subscriber) {          Runnable runnable = new Runnable() {              @Override              public void run() {                  source.subscribe(subscriber); // --> onSubscribe.call(subscriber) --> subscriber.onNext(1)              }          };          threadPool.submit(runnable);      }  }</code></pre>    <p>从上面看出 subscribeOnIO() 新建了一个线程并加入 CachedThreadPool,在子线程里订阅上一个 Observable,后续的调用都在这个线程里完成</p>    <p>再考虑下复杂点的,加入 map()</p>    <pre>  <code class="language-java">Observable          .create(new OnSubscribe<Integer>() {              @Override              public void call(Subscriber<? super Integer> subscriber) {                  System.out.println(Thread.currentThread());                  subscriber.onNext(1);              }          })          .map(new Func1<Integer, String>() {              @Override              public String call(Integer integer) {                  System.out.println(Thread.currentThread());                  return "map" + integer;              }          })          .subscribeOnIO()          .subscribe(new Subscriber<String>() {              @Override              public void onNext(String s) {                  System.out.println(Thread.currentThread());                  System.out.println(s);              }          });</code></pre>    <p>非链式的写法</p>    <pre>  <code class="language-java">OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();  Observable<Integer> observable = new Observable<>(onSubscribe);  Func1<Integer, String> func = new Func1<>();  OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);  OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap);  Observable<String> observable2 = new Observable<>(onSubscribe2);    OnSubscribeOnIO<String> onSubscribe3 = new OnSubscribeOnIO(observable2);  Observable<String> observable3 = new Observable<>(onSubscribe3);    Subscriber<String> subscriber = new Subscriber<>();  observable3.subscribe(subscriber);</code></pre>    <p>1) observable3.subscribe(subscriber) --><br> 2) onSubscribe3.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) --><br> 3) 子线程 new Runnable(){} --> observable2.subscribe(subscriber)<br> 4) onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) --><br> 5) Subscriber<Integer> st = operatorMap.call(subscriber) 即<br> 6) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)<br> 7) parent.call(st) 即 onSubscribe.call(st) --><br> 8) st.onNext(1) 即 MapSubscriber.onNext(1) --><br> 9) String string = func.call(1)<br> 10) subscriber.onNext(string)</p>    <p>那要是把 map() 与 subscribeOnIO() 换下位置呢</p>    <pre>  <code class="language-java">OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();  Observable<Integer> observable = new Observable<>(onSubscribe);    OnSubscribeOnIO<Integer> onSubscribe2 = new OnSubscribeOnIO(observable);  Observable<Integer> observable2 = new Observable<>(onSubscribe2);    Func1<Integer, String> func = new Func1<>();  OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);  OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscrib2, operatorMap);  Observable<String> observable3 = new Observable<>(onSubscribe3);  Subscriber<String> subscriber = new Subscriber<>();  observable3.subscribe(subscriber);</code></pre>    <p>1) observable3.subscribe(subscriber) --><br> 2) onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) --><br> 3) Subscriber<Integer> st = operatorMap.call(subscriber) 即<br> 4) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)<br> 5) parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeOnIO.call(st) --><br> 6) 子线程 new Runnable(){} --> observable.subscribe(st) --><br> 7) onSubscribe.call(st) --><br> 7) st.onNext(1) 即 MapSubscriber.onNext(1) --><br> 8) String string = func.call(1)<br> 9) subscriber.onNext(string)</p>    <p>看得出来,不管 subscribeOnIO() 在哪,第一个 onSubscribe.call() 总是运行在子线程</p>    <h3>4、observeOn</h3>    <p>先看下 demo 最终写法</p>    <pre>  <code class="language-java">Handler handler = new Handler();    Observable          .create(new OnSubscribe<Integer>() {              @Override              public void call(Subscriber<? super Integer> subscriber) {                  subscriber.onNext(1);              }          })          .observeOn(handler)          .map(new Func1<Integer, String>() {              @Override              public String call(Integer integer) {                  return "map" + integer;              }          })          .subscribeOnIO()          .subscribe(new Subscriber<String>() {              @Override              public void onNext(String s) {                  System.out.println(s);              }          });    handler.loop(); //队列没有消息时会挂起当前线程,直到收到新的消息</code></pre>    <p>同样我们也自己实现一个简单的可以切换回主线程的 observeOn(Handler)</p>    <pre>  <code class="language-java">public class Observable<T> {      ...      public final Observable<T> observeOn(Handler handler) {          return lift(new OperatorObserveOn<T>(handler));      }  }</code></pre>    <p>OperatorObserveOn</p>    <pre>  <code class="language-java">public final class OperatorObserveOn<T> implements Operator<T, T> {      private Handler handler;        public OperatorObserveOn(Handler handler) {          this.handler = handler;      }        @Override      public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {          Subscriber<T> s = new Subscriber<T>() {              @Override              public void onNext(final T t) {                  handler.post(new Runnable() {                      @Override                      public void run() {                          subscriber.onNext(t);                      }                  });              }          };          return s;      }  }</code></pre>    <p>自定义Handler</p>    <pre>  <code class="language-java">public class Handler {        private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);        public void loop() {          for (; ; ) {              Runnable runnable;              try {                  runnable = queue.take();// 没有数据则一直阻塞,直到有数据自动唤醒              } catch (InterruptedException e) {                  return;              }              if (runnable == null) {                  return;              }              runnable.run();          }      }        public void post(Runnable runnable) {          try {              queue.put(runnable);// 没有空间则一直阻塞,直到有空间          } catch (InterruptedException e) {              return;          }      }  }</code></pre>    <p>非链式写法</p>    <pre>  <code class="language-java">OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();  Observable<Integer> observable = new Observable<>(onSubscribe);    OperatorObserveOn<Integer> operatorObserveOn = new OperatorObserveOn(handler);  OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorObserveOn);    Func1<Integer, String> func = new Func1<>();  OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);  OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscribe2, operatorMap);  Observable<String> observable2 = new Observable<>(onSubscribe3);  OnSubscribeOnIO<String> onSubscribe4 = new OnSubscribeOnIO(observable2);  Observable<String> observable3 = new Observable<>(onSubscribe4);  Subscriber<String> subscriber = new Subscriber<>();  observable3.subscribe(subscriber);</code></pre>    <p>1) observable3.subscribe(subscriber) --><br> 2) onSubscribe4.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) --><br> 3) 子线程 new Runnable(){} --> observable2.subscribe(subscriber)<br> 4) onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) --><br> 5) Subscriber<Integer> st = operatorMap.call(subscriber) 即<br> 6) Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)<br> 7) parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeLift.call(st)--><br> 8) Subscriber<Integer> st2 = operatorObserveOn.call(st) --><br> 9) parent.call(st2) 即 onSubscribe.call(st2) --><br> 8) st2.onNext(1) --> // onNext()里面切换到Handler所在线程<br> 9) st.onNext(1) --><br> 9) String string = func.call(1)<br> 10) subscriber.onNext(string)</p>    <h3><strong>5、其他</strong></h3>    <p>总的来说,调用链确实有点复杂,不过也还是可以接受的,一个调用链花点时间想想还是能清楚,只是每碰到一个调用链都要花点时间才能想清楚,还没能力能在几秒内就能想清楚,只能是多想多锻炼了。比如想想上面的,如果把 observeOn(handler) 放在 map() 后面呢</p>    <p> </p>    <p>来自:http://www.jianshu.com/p/6558ac156bae</p>    <p> </p>