RxJava 教程第二部分:创建事件流
fjdsaj
9年前
<p> </p> <p>现在你应该了解 Rx 的概念了,是时候开始创建和操作事件流了。操作事件流的原始实现是基于 C# 的 <a href="/misc/goto?guid=4959671534434991882" rel="nofollow,noindex">LINQ</a> ,而 LINQ 是受到 functional programming 启发的。如果你了解 LINQ 更容易理解本节内容, 如果不了解也没关系。我们将从最简单的内容开始介绍。 大部分的 Rx 操作函数(operators )用来操作已经存在的事件流。在介绍操作函数之前,先来看看如何创建一个 Observable。</p> <p>创建一个事件流</p> <p>在第一部分示例中,我们使用 Subject 来手工的把数据推送给他并创建一个事件流。我们使用这种方式来示例一些核心的概念和Rx 中一些核心的函数 subscribe。大部分时候使用 Subject 都不是创建 Observable 的最佳方式。 下面将介绍如何创建 Observable 来发射事件流。</p> <p>简单的工厂方法</p> <p>Observable 有很多工厂方法可以创建一个事件流。</p> <p>Observable.just</p> <p>just 函数创建一个发射预定义好的数据的 Observable ,发射完这些数据后,事件流就结束了。</p> <pre> <code class="language-java">Observable<String> values = Observable.just("one", "two", "three"); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); </code></pre> <p>结果:</p> <pre> <code class="language-java">Received: one Received: two Received: three Completed </code></pre> <p>Observable.empty</p> <p>这个函数创建的 Observable 只发射一个 onCompleted 事件就结束了。</p> <pre> <code class="language-java">Observable<String> values = Observable.empty(); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); </code></pre> <p>结果:</p> <pre> <code class="language-java">Completed </code></pre> <p>Observable.never</p> <p>这个 Observable 将不会发射任何事件和数据。</p> <pre> <code class="language-java">Observable<String> values = Observable.never(); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); </code></pre> <p>上面的代码不会打印任何东西。但是这个代码并没有阻塞住,实际上上面的代码立刻就执行完了。</p> <p>Observable.error</p> <p>这个 Observable 将会发射一个 error 事件,然后结束。</p> <pre> <code class="language-java">Observable<String> values = Observable.error(new Exception("Oops")); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); </code></pre> <p>结果:</p> <pre> <code class="language-java">Error: java.lang.Exception: Oops </code></pre> <p>Observable.defer</p> <p>defer 并没有定义一个新的 Observable, defer 只是用来声明当 Subscriber 订阅到一个 Observable 上时,该 Observable 应该如何创建。例如,如果我们想创建一个发射当前时间然后就结束的 Observable, 发射一个数据然后结束,看起来用 just 实现即可:</p> <pre> <code class="language-java">Observable<Long> now = Observable.just(System.currentTimeMillis()); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println); </code></pre> <p>结果:</p> <pre> <code class="language-java">1431443908375 1431443908375 </code></pre> <p>注意上面两个 subscriber 相隔 1秒订阅这个 Observable,但是他们收到的时间数据是一样的!这是因为当订阅的时候,时间数据只调用一次。其实你希望的是,当 一个 subscriber 订阅的时候才去获取当前的时间。 defer 的参数是一个返回一个 Observable 对象的函数。该函数返回的 Observable 对象就是 defer 返回的 Observable 对象。 重点是,每当一个新的 Subscriber 订阅的时候,这个函数就重新执行一次。</p> <pre> <code class="language-java">Observable<Long> now = Observable.defer(() -> Observable.just(System.currentTimeMillis())); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println); </code></pre> <p>结果:</p> <pre> <code class="language-java">1431444107854 1431444108858 </code></pre> <p>Observable.create</p> <p>create 是非常强大的一个函数。可以创建任何你需要的 Observable。</p> <pre> <code class="language-java">static <T> Observable<T> create(Observable.OnSubscribe<T> f) </code></pre> <p>上面是 create 函数的定义,参数 Observable.OnSubscribe 看起来很简单。OnSubscribe 只有一个函数其参数为 Subscriber 。在该函数内我们可以手工的发射事件和数据到 subscriber。</p> <pre> <code class="language-java">Observable<String> values = Observable.create(o -> { o.onNext("Hello"); o.onCompleted(); }); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); </code></pre> <p>结果:</p> <pre> <code class="language-java">Received: Hello Completed </code></pre> <p>当有 Subscriber 订阅到这个 Observable 时(上面示例中的 values ),这个 Subscriber 对象就是你实现的函数中的参数 Subscriber。然后你可以在你的代码中把数据发射到这个 subscriber 中。注意,当数据发射完后,你需要手工的调用 onCompleted 来表明发射完成了。</p> <p>如果之前的所有方法都不满足你的要求时,这个函数应当作为你创建自定义 Observable 的最佳方式。其实现方式和 第一部分我们通过 Subject 来发射事件类似,但是有几点非常重要的区别。首先:数据源被封装起来了,并和不相关的代码隔离开了。其次:Subject 有一些不太明显的问题,通过使用 Subject 你自己在管理状态,并且任何访问该 Subject 对象的人都可以往里面发送数据然后改变事件流。</p> <p>还一个主要的区别是执行代码的时机,使用 create 创建的 Observable,当 Observable 创建的时候,你的函数还没有执行,只有当有 Subscriber 订阅的时候才执行。这就意味着每次当有 Subscriber 订阅的时候,该函数就执行一次。和 defer 的功能类似。结果和 ReplaySubject 类似, ReplaySubject 会缓存结果 当有新的 Subscriber 订阅的时候,把缓存的结果在发射给新的 Subscriber。如果要使用 ReplaySubject 来实现和 create 类似的功能,如果 create 中创建数据的函数是阻塞的话,则 ReplaySubject 在创建的时候线程会阻塞住知道 创建函数执行完。如果不想阻塞当前线程的话,则需要手工创建一个线程来初始化数据。其实 Rx 有更加优雅的方式解决这个问题。</p> <p>其实使用 Observable.create 可以实现 前面几个工厂方法的功能。比如 上面的 create 函数的功能和 Observable.just(“hello”) 的功能是一样的。</p> <p>Functional unfolds</p> <p>在 functional programming(函数式编程)中,创建一系列数字是非常常见的。 RxJava 也提供了一些工厂方法来创建这样的序列。</p> <p>Observable.range</p> <p>做过函数式编码的程序员都了解这个函数的意思。 该函数发射一个整数序列:</p> <pre> <code class="language-java">Observable<Integer> values = Observable.range(10, 15); </code></pre> <p>上面示例将生成一个从 10 到 24 的数字序列(从 10 开始,发射 15个数字)。</p> <p>Observable.interval</p> <p>创建一个无限的计时序列,每隔一段时间发射一个数字,从 0 开始:</p> <pre> <code class="language-java">Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); System.in.read(); </code></pre> <p>结果:</p> <pre> <code class="language-java">Received: 0 Received: 1 Received: 2 Received: 3 ... </code></pre> <p>如果我们不调用 unsubscribe 的话,这个序列是不会停止的。</p> <p>上面的代码在最后有个 System.in.read(); 阻塞语句,这个语句是有必要的,不然的话,程序不会打印任何内容就退出了。原因是我们的操作不是阻塞的:我们创建了一个每隔一段时间就发射数据的 Observable,然后我们注册了一个 Subscriber 来打印收到的数据。这两个操作都是非阻塞的,而 发射数据的计时器是运行在另外一个线程的,但是这个线程不会阻止 JVM 结束当前的程序,所以 如果没有 System.in.read(); 这个阻塞操作,还没发射数据则程序就已经结束运行了。</p> <p>Observable.timer</p> <p>Observable.timer 有两个重载函数。第一个示例创建了一个 Observable, 该 Observable 等待一段时间,然后发射数据 0 ,然后就结束了。</p> <pre> <code class="language-java">Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); </code></pre> <p>结果:</p> <pre> <code class="language-java">Received: 0 Completed </code></pre> <p>另外一个示例是,先等待一段时间,然后开始按照间隔的时间一直发射数据:</p> <pre> <code class="language-java">Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); </code></pre> <p>结果:</p> <pre> <code class="language-java">Received: 0 Received: 1 Received: 2 ... </code></pre> <p>上面的示例,先等待 2秒,然后每隔一秒开始发射数据。</p> <p>转换为 Observable</p> <p>已经有很多工具来处理序列数据、集合和异步事件了,但是这些工具可能无法直接在 Rx 中使用。下面来介绍几个方法可以把这些工具产生的结果转换到你的 Rx 代码中。</p> <p>如果你在使用类似 JavaFX 中的异步事件处理,则可以使用 Observable.create 把异步事件转换到 Observable 中:</p> <pre> <code class="language-java">Observable<ActionEvent> events = Observable.create(o -> { button2.setOnAction(new EventHandler<ActionEvent>() { @Override public void handle(ActionEvent e) { o.onNext(e) } }); }) </code></pre> <p>根据事件的不同,事件的类型(上面的示例中事件类型为 ActionEvent)可能适合作为 Observable 发射数据的类型。 如果你需要的是该事件类型的属性(比如事件发射的位置),则获取该属性并把获取到的结果发射给最终的 Subscriber ,Subscriber 接受到结果可能和事件发生时的数据不一样。为了避免这种问题,在 Observable 中传递的数据应该保持其状态不变。</p> <p>Observable.from</p> <p>在 Java 并发框架中经常使用 Future 来获取异步结果。 通过使用 from 可以把 Future 的结果发射到 Observable 中:</p> <pre> <code class="language-java">FutureTask<Integer> f = new FutureTask<Integer>(() -> { Thread.sleep(2000); return 21; }); new Thread(f).start(); Observable<Integer> values = Observable.from(f); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); </code></pre> <p>结果:</p> <pre> <code class="language-java">Received: 21 Completed </code></pre> <p>当 FutureTask 执行完后, Observable 发射 Future 获取到的结果然后结束。如果任务 取消了,则 Observable 会发射一个 java.util.concurrent.CancellationException 错误信息。</p> <p>你还可以对 Future 设置超时时间:</p> <pre> <code class="language-java">Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS); </code></pre> <p>当过了超时时间后, Future 还是没有返回结果, Observable 可以忽略其结果并发射一个 TimeoutException。</p> <p>Observable.from 还有重载的函数可以用一个数据集合或者一个可以遍历的iterable 的数据来生成一个 Observable。逐个发射集合中的数据,最后发射一个 onCompleted 事件。</p> <pre> <code class="language-java">Integer[] is = {1,2,3}; Observable<Integer> values = Observable.from(is); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); </code></pre> <p>结果:</p> <pre> <code class="language-java">Received: 1 Received: 2 Received: 3 Completed </code></pre> <p>Observable 和 Iterable 或者 Stream 是不能通用的。Observables 是基于 push 模型的,而 Iterable 是基于 pull 模型的。</p> <p>来自: <a href="/misc/goto?guid=4959671534514410173" rel="nofollow">http://blog.chengyunfeng.com/?p=959</a></p>