RxJava 教程第二部分:事件流基础之 转换数据流
bigpig1101
9年前
<p>Transformation of sequences</p> <p>本节介绍转换数据流中数据的方法。在真实世界中, Observable 中的数据可以是任意类型的,可能在你的应用中无法直接使用这些数据类型,你需要对这些数据对象进行一些转换。</p> <p>map 和 flatMap 是本节中操作函数的基础。 下面是三种转换方式的示意:</p> <p>– Ana(morphism) T –> IObservable – Cata(morphism) IObservable</p> <p>–> T</p> <p>– Bind IObservable –> IObservable</p> <p> </p> <p>本节还是继续使用上一结引入的自定义 Subscriber:</p> <pre> <code class="language-java">class PrintSubscriber extends Subscriber{ private final String name; public PrintSubscriber(String name) { this.name = name; } @Override public void onCompleted() { System.out.println(name + ": Completed"); } @Override public void onError(Throwable e) { System.out.println(name + ": Error: " + e); } @Override public void onNext(Object v) { System.out.println(name + ": " + v); } } </code></pre> <p>map</p> <p>最基础的转换函数就是 map。 map 使用一个转换的参数把源Observable 中的数据转换为另外一种类型的数据。返回的 Observable 中包含了转换后的数据。</p> <pre> <code class="language-java">public final <R> Observable<R> map(Func1<? super T,? extends R> func) </code></pre> <p><img src="https://simg.open-open.com/show/9b7774c49989ba2832ea97e7a68f335c.png"></p> <p>下面是把源 Observable 中的每个数据都加 3 然后再返回:</p> <pre> <code class="language-java">Observable<Integer> values = Observable.range(0,4); values .map(i -> i + 3) .subscribe(new PrintSubscriber("Map")); </code></pre> <p>结果:</p> <pre> <code class="language-java">Map: 3 Map: 4 Map: 5 Map: 6 Map: Completed </code></pre> <p>上面的代码只是示例 map 的使用,并没有太大的实际意义。下面是一个比较实际的转换方式:</p> <pre> <code class="language-java">Observable<Integer> values = Observable.just("0", "1", "2", "3") .map(Integer::parseInt); values.subscribe(new PrintSubscriber("Map")); </code></pre> <p>结果:</p> <pre> <code class="language-java">Map: 0 Map: 1 Map: 2 Map: 3 Map: Completed </code></pre> <p>源 Observable 发射的为 String 类型数据,而我们需要的是 int 类型,则可以通过 map 把 String 转换为 int。</p> <p>如果你认为这种转换太简单了, 完全可以在 Subscriber 中完成,这样在设计架构上并不合理,没有有效的区分职责。 代码设计每个部分都有各自的职责,使用 map 可以有效的确保职责清晰。方便后续修改。</p> <p>cast 和 ofType</p> <p>cast 是把一个对象强制转换为子类型的缩写形式。 假设源 Observable为 Observable</p> <pre> <code class="language-java">Observable<Object> values = Observable.just(0, 1, 2, 3); values .cast(Integer.class) .subscribe(new PrintSubscriber("Map")); </code></pre> <p>结果:</p> <pre> <code class="language-java">Map: 0 Map: 1 Map: 2 Map: 3 Map: Completed </code></pre> <p>如果遇到类型不一样的对象的话,就会抛出一个 error:</p> <pre> <code class="language-java">Observable<Object> values = Observable.just(0, 1, 2, "3"); values .cast(Integer.class) .subscribe(new PrintSubscriber("Map")); </code></pre> <p>结果:</p> <pre> <code class="language-java">Map: 0 Map: 1 Map: 2 Map: Error: java.lang.ClassCastException: Cannotcastjava.lang.String to java.lang.Integer </code></pre> <p>如果你不想处理类型不一样的对象,则可以用 ofType 。 该函数用来判断数据是否为 该类型,如果不是则跳过这个数据。</p> <pre> <code class="language-java">Observable<Object> values = Observable.just(0, 1, "2", 3); values .ofType(Integer.class) .subscribe(new PrintSubscriber("Map")); </code></pre> <p>结果:</p> <pre> <code class="language-java">Map: 0 Map: 1 Map: 3 Map: Completed </code></pre> <p>timestamp 和 timeInterval</p> <p>这两个函数可以给数据流中的数据添加额外的时间相关的信息。timestamp 把数据转换为 Timestamped 类型,里面包含了原始的数据和一个原始数据是何时发射的时间戳。</p> <pre> <code class="language-java">public final Observable<Timestamped<T>> timestamp() </code></pre> <pre> <code class="language-java">Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS); values.take(3) .timestamp() .subscribe(new PrintSubscriber("Timestamp")); </code></pre> <p>结果:</p> <pre> <code class="language-java">Timestamp: Timestamped(timestampMillis = 1428611094943, value = 0) Timestamp: Timestamped(timestampMillis = 1428611095037, value = 1) Timestamp: Timestamped(timestampMillis = 1428611095136, value = 2) Timestamp: Completed </code></pre> <p>从结果可以看到,上面的数据大概每隔100毫秒发射一个。</p> <p>如果你想知道前一个数据和当前数据发射直接的时间间隔,则可以使用 timeInterval 函数。</p> <pre> <code class="language-java">public final Observable<TimeInterval<T>> timeInterval() </code></pre> <pre> <code class="language-java">Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS); values.take(3) .timeInterval() .subscribe(new PrintSubscriber("TimeInterval")); </code></pre> <p>结果:</p> <pre> <code class="language-java">TimeInterval: TimeInterval [intervalInMilliseconds=131, value=0] TimeInterval: TimeInterval [intervalInMilliseconds=75, value=1] TimeInterval: TimeInterval [intervalInMilliseconds=100, value=2] TimeInterval: Completed </code></pre> <p>TimeInterval 中有个属性 intervalInMilliseconds 记录了两次数据发射直接的时间间隔。</p> <p>这两个函数中的时间对于记录日志和调试程序是非常有用的。这是用 Rx 的方式来获取异步调用的数据流信息。</p> <p>materialize 和 dematerialize</p> <p>materialize 对于记录日志也是很有用的。materialize 把数据转换为元数据发射出去:</p> <pre> <code class="language-java">public final Observable<Notification<T>> materialize() </code></pre> <p><img src="https://simg.open-open.com/show/4443b782c70db76e7c11313e45d8f36e.png"></p> <p>元数据中包含了源 Observable 所发射的动作,是调用 onNext 还是 onComplete。注意上图中,源 Observable 结束的时候, materialize 还会发射一个 onComplete 数据,然后才发射一个结束事件。</p> <pre> <code class="language-java">Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS); values.take(3) .materialize() .subscribe(new PrintSubscriber("Materialize")); </code></pre> <p>结果:</p> <pre> <code class="language-java">Materialize: [rx.Notification@a4c802e9 OnNext 0] Materialize: [rx.Notification@a4c802ea OnNext 1] Materialize: [rx.Notification@a4c802eb OnNext 2] Materialize: [rx.Notification@18d48ace OnCompleted] Materialize: Completed </code></pre> <p>Notification 类包含了一些判断每个数据发射类型的方法,如果出错了还可以获取错误信息 Throwable 对象。</p> <p>dematerialize 函数会把 materialize 转换后的Observable 再还原为 源 Observable。</p> <p>flatMap</p> <p>map 把一个数据转换为另外一个数据。而 flatMap 把源 Observable 中的一个数据替换为任意数量的数据,可以为 0 个,也可以为无限个。 flatMap 把源 Observable 中的一个数据转换为一个新的 Observable 发射出去。</p> <pre> <code class="language-java">public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func) </code></pre> <p><img src="https://simg.open-open.com/show/834d19497dbe3ee90fce20ee22768ebd.png"></p> <p>flatMap 的参数会把 源 Observable 中发射的每个数据转换为一个新的 Observable, 然后 flatMap 再把这些新的 Observable 中发射的数据发射出来。每个新的 Observable 数据都是按照他们产生的顺序发射出来,但是 Observable 之间数据的顺序可能会不一样。</p> <p>下面通过一个简单的例子来帮助理解 flatMap 。</p> <pre> <code class="language-java">Observable<Integer> values = Observable.just(2); values .flatMap(i -> Observable.range(0,i)) .subscribe(new PrintSubscriber("flatMap")); </code></pre> <p>结果:</p> <pre> <code class="language-java">flatMap: 0 flatMap: 1 flatMap: Completed </code></pre> <p>上面的示例中,values 这个 Observable 只发射一个值 2. 而 flatMap 参数把数据 2 转换为 Observable.range(0,2),其中 Lambda 表达式中的 i 为 values Observable 发射的数据,这里也就是 2. 然后订阅到 flatMap 生成的新 Observable 上。 而 Observable.range(0,2) 会发射 0 和 1 两个数据,所以结果就是 0、 1 、完成。</p> <p>从上面示例中可以看到, flatMap 把 源 Observable 中每个值都转换为一个新的 Observable 了。 比如:</p> <pre> <code class="language-java">Observable<Integer> values = Observable.range(1,3); values .flatMap(i -> Observable.range(0,i)) .subscribe(new PrintSubscriber("flatMap")); </code></pre> <p>这里 values 会发射 1 、 2、 3 三个数据。 然后 flatMap 把每个数据变为新的 Observable (Observable.range(0,i)),所以会有 3 个 Observable,这 3个 Observable 分别发射 [0], [0,1] 和 [0,1,2]。最终 flatMap 再把这 3 个新 Observable 发射的数据合并到一个 Observable 发射出去。所以上面的结果如下:</p> <pre> <code class="language-java">flatMap: 0 flatMap: 0 flatMap: 1 flatMap: 0 flatMap: 1 flatMap: 2 flatMap: Completed </code></pre> <p>再看一个示例,把 int 值转换为 字母:</p> <pre> <code class="language-java">Observable<Integer> values = Observable.just(1); values .flatMap(i -> Observable.just( Character.valueOf((char)(i+64)) )) .subscribe(new PrintSubscriber("flatMap")); </code></pre> <p>上面的示例,用 map 函数实现会更简单,这里是为了说明 flatMap 另外一种功能,如果你发现源 Observable 中发射的数据不符合你的要求,则你可以返回一个 空的 Observable。这就相当于过滤数据的作用, 例如:</p> <pre> <code class="language-java">Observable<Integer> values = Observable.range(0,30); values .flatMap(i -> { if (0 < i && i <= 26) return Observable.just(Character.valueOf((char)(i+64))); else return Observable.empty(); }) .subscribe(new PrintSubscriber("flatMap")); </code></pre> <p>结果:</p> <pre> <code class="language-java">flatMap: A flatMap: B flatMap: C ... flatMap: X flatMap: Y flatMap: Z flatMap: Completed </code></pre> <p>上面示例源 Observable 一共发射 0 到 29 这 30个数字。在 flatMap 中判断 如果数字大于 0 并且小于等于 26,则转换为字母用 并用 Observable.just 生成新的 Observable;其他数字都返回一个 Observable.empty() 空 Observable。</p> <p>注意,flatMap 是把几个新的 Observable 合并为一个 Observable 返回, 只要这些新的 Observable 有数据发射出来, flatMap 就会把数据立刻发射出去。所以如果这些新的 Observable 发射数据是异步的,那么 flatMap 返回的数据也是异步的。下面示例中使用 Observable.interval 来生成每个数据对应的新 Observable,由于 interval 返回的 Observable 是异步的,所以可以看到最终输出的结果是每当有 Observable 发射数据的时候, flatMap 就返回该数据。</p> <pre> <code class="language-java">Observable.just(100, 150) .flatMap(i -> Observable.interval(i, TimeUnit.MILLISECONDS) .map(v -> i) ) .take(10) .subscribe(new PrintSubscriber("flatMap")); </code></pre> <p>上面的 Lambda 表达式 先把参数 i (这里分别为 100 和 150 这两个数字)转换为 Observable.interval(i, TimeUnit.MILLISECONDS), 每隔 i 毫秒发射一个数字,这样两个 Observable.interval 都发射同样的数字,只不过发射的时间间隔不一样,所以为了区分打印的结果,我们再用 map(v -> i) 把结果转换为 i 。结果如下:</p> <pre> <code class="language-java">flatMap: 100 flatMap: 150 flatMap: 100 flatMap: 100 flatMap: 150 flatMap: 100 flatMap: 150 flatMap: 100 flatMap: 100 flatMap: 150 flatMap: Completed </code></pre> <p>可以两个新的 Observable 的数据交织在一起发射出来。</p> <p>concatMap</p> <p>如果你不想把新 Observable 中的数据交织在一起发射,则可以选择使用 concatMap 函数。</p> <p>该函数会等第一个新的 Observable 完成后再发射下一个 新的 Observable 中的数据。</p> <pre> <code class="language-java">Observable.just(100, 150) .concatMap(i -> Observable.interval(i, TimeUnit.MILLISECONDS) .map(v -> i) .take(3)) .subscribe( System.out::println, System.out::println, () -> System.out.println("Completed")); </code></pre> <p>结果:</p> <pre> <code class="language-java">100 100 100 150 150 150 Completed </code></pre> <p>所以 concatMap 要求新的Observable 不能是无限的,否则该无限 Observable 会阻碍后面的数据发射。为此,上面的示例使用 take 来结束 Observable。</p> <p>flatMapIterable</p> <p>flatMapIterable 和 flatMap 类似,区别是 flatMap 参数把每个数据转换为 一个新的 Observable,而 flatMapIterable 参数把一个数据转换为一个新的 iterable 对象。</p> <p>例如下面是一个把参数转换为 iterable 的函数:</p> <pre> <code class="language-java">public static Iterable<Integer> range(int start, int count) { List<Integer> list = new ArrayList<>(); for (int i=start ; i<start+count ; i++) { list.add(i); } return list; } </code></pre> <p>然后可以这样使用该函数作为 flatMapIterable 的参数:</p> <pre> <code class="language-java">Observable.range(1, 3) .flatMapIterable(i -> range(1, i)) .subscribe(System.out::println); </code></pre> <p>结果:</p> <pre> <code class="language-java">1 1 2 1 2 3 </code></pre> <p>flatMapIterable 把生成的 3 个 iterable 合并为一个 Observable 发射。</p> <p>作为 Rx 开发者,我们需要知道在 Rx 中应该使用 Observable 数据流来发射数据而不要混合使用传统的 iterable。但是如果你无法控制数据的来源,提供数据的一方只提供 iterable 数据,则依然可以直接使用这些数据。flatMapIterable 把多个 iterable 的数据按照顺序发射出来,不会交织发射。</p> <p>flatMapIterable 还有另外一个重载函数可以用源 Observable 发射的数据来处理新的 iterable 中的每个数据:</p> <pre> <code class="language-java">Observable.range(1, 3) .flatMapIterable( i -> range(1, i), (ori, rv) -> ori * (Integer) rv) .subscribe(System.out::println); </code></pre> <p>结果:</p> <pre> <code class="language-java">1 2 4 3 6 9 </code></pre> <p>注意,上面的 ori 参数取值为 源 Observable 发射出来的数据,也就是 1、 2、 3. 而 rv 参数取值为 range(1, i) 参数生成的 iterable 中的每个数据,也就是分别为 [1]、[1,2]、[1,2,3],所以最终的结果就是:[1 <em>1], [1</em> 2, 2 <em>2], [1</em> 3, 2 <em>3, 3</em> 3].</p> <p>来自: <a href="/misc/goto?guid=4959671549295740825" rel="nofollow">http://blog.chengyunfeng.com/?p=964</a></p>