RxWeekend——RxJava周末狂欢
作者地址: MrFu Blog--RxWeekend
周五的时候就打算这个周末就看 RxJava 了,于是利用一个周末的时间把咖啡变成了文字,对,就是咖啡,不是啤酒和炸鸡,周六把 RxJava Essentials 英文版再看了一遍,顺便看了一遍翻译版,周日把小鄧子的博客以及他引述的其他文章全部看了一遍。
Part1 部分主要是 RxJava Essentials 的操作符
Part2 部分主要是一些 tips
对于Part1我更建议你先去看 RxJava Essentials 这本书,再回过头来看这部分。我这里的解释可能是非常抽象的,都是一些总结性的解释。这里有一个实例,和 Tips7 有关:RxFace,喜欢就 star,不要犹豫 ^^
Part 1: RxJava Essentials -- Operators
Basic
-
just()
方法可以传入1到9个参数,它们会按照传入的参数的顺序来发射它们。 -
Observable.empty()
需要一个 Oservable 但是什么都不发射 -
Observable.never()
传一个不发射数据并永远不会结束的 Observable -
Observable.throw()
创建一个不发射数据并且以错误结束的 Observable -
repeat()
-
defer()
在观察者订阅时创建 Observable,而不是创建后立即执行,这篇文章有着更棒的解释:小鄧子:使用RxJava实现延迟订阅 -
range()
从一个指定的数字开始发射 N 个数字 -
interval(3, TimeUnit.SECONDES)
轮询时用:参数:指定两次发射时间间隔,时间单位。 -
timer()
一段时间后才发射 Observable
Filtering
-
filter()
,take()
,takeLast()
-
distinct()
去掉序列中重复项,是作用于一个完整的序列的 -
distinctUntilChanged()
在一个存在的序列上来创建一个新的不重复发射元素的序列
</div>
-
first()
,last()
,firstOrDefault()
,lastOrDefault()
-
skip()
,skipLast()
跳过前几个或者最后几个元素 -
elementAt()
发射指定元素。但如果元素不足可以使用:elementAtOrDefault()
-
sample(30,TimeUnit.SECONDS)
指定的时间间隔里发射最近一次的数值
</div>
-
throttleFirst()
定时发射第一个元素 -
timeout()
限时,在指定时间间隔 Observable 不发射值的话, 就会触发onError()
函数 -
debounce()
过滤发射速率过快的数据,即:在一个时间间隔过去之后,仍然没有发射的话,则发射最后的那个
Transforming
-
map()
接收到的对象应用到每个发射的值上 -
flatMap()
将发射的序列转换成另外一种对象的 Observable 序列,注意:它允许交叉,即flatMap()
不保证最终生成的 Observable 和源 Observable 发射序列相同。 FlatMap -
concatMap()
解决了flatMap()
交叉的问题,提供了 能把发射值连续在一起的铺平函数,而非合并它们。
关于
flatMap()
和concatMap()
必须看这篇文章: 小鄧子-RxJava变换操作符:.concatMap( )与.flatMap( )的比较
-
flatMapInterable()
类似于flatMap()
只是它将源数据两两结成对并生成 Iterable,而不是原始数据项和生成的 Observables -
switchMap()
和flatMap()
区别在于每当源 Observable 发射一个新的数据项时,将取消订阅并停止监视之前那个数据项产生的 Observable,并开始监视当前发射的这个。 -
scan()
累加器,对原始Observable 发射的每项数据都应用一个函数,计算出函数的结果值,并填充回可观测序列,等待下一次发射的数据一起使用。 -
scan(R, Func2)
用初始值作为第一个发射的值 -
groupBy()
引用小鄧子的一段话来说是这样的:去这里看更详细的解释,会恍然大悟的:小鄧子-Architecting Android with RxJava
将原始Observable根据不同的key分组成多个
GroupedObservable
,由原始Observable
发射(原始Observable
的泛型将变成这样Observable<GroupedObservable<K, T>>
),每一个GroupedObservable
既是事件本身也是一个独立的Observable
,每一个GroupedObservable
发射一组原始Observable
的事件子集。
-
buffer()
将得到一个新的 Observable,这个 Observable 每次发射一组列表值而不是单个发射,你还可以指定它的 skip 值和 timespan 项数据 -
window()
类似于buffer()
,但它发射的是 Observable 而不是列表 -
cast()
将源 Observable 中每一项数据都转换成新的类型,转成了一个不同的 Class。
Combining
-
merge()
多个序列合并在一个最终发射的 Observable.mergeDelayError()
当所有的 Observable 都完成时,再处理有 error 的情况,发射onError()
-
zip()
合并两个或多个 Observables 发射出的数据项,根据指定的函数 Func* 变换它们,并发射一个新值 -
join()
基于时间窗口将两个 Observables 发射的数据结合在一起,组成一个新的 Observable。它可以控制每个 Observable 产生结果的生命周期,在每个结果的生命周期内,可以与另一个 Observable 产生的结果按照一定的规则进行合并!
</div>
join方法的用法如下:
observableA.join(observableB,
observableA产生结果生命周期控制函数,
observableB产生结果生命周期控制函数,
observableA产生的结果与observableB产生的结果的合并规则)
蓝线和粉色的线表示对应的Observable 上的元素的生命周期。Android RxJava使用介绍(四) RxJava的操作符
combineLatest()
像zip()
的特殊形式,zip()
作用于最近未打包的两个 Observables,相反combineLatest()
作用于最近发射的数据项
</div>
and()
,then()
,when()
: 如下:
Pattern2<O1, O2> pattern = JoinObservable.from(obserable1).and(obserable2); Plan0<O1> plan = pattern.then(this::updateTitle); JoinObservable.when(plan).toObservable().observeOn(…).subscribe(…);
解释:两个发射序列 obserable1 和 obserable2 通过 and 链接。使用 pattern 对象创建 Plan 对象,然后使用 when...(好吧,我想不到使用场景...)
</div>
-
switch()
将一个发射多个 Observables 的 Observable 转换成另一个单独的 Observable,后者发射那些 Observables 最近发射的数据项,注:当源 Observable 发射一个新的 Observable 时,switch()
会立即取消订阅前一个发射数据的 Observable,然后订阅一个新的 Observable,并开始发射它的数据。 -
startWith()
与concat()
对应,通过传一个参数来先发射一个数据序列
Part 2: Tips
Tips1
// Our sources (left as an exercise for the reader) Observable<Data> memory = ...; Observable<Data> disk = ...; Observable<Data> network = ...; // Retrieve the first source with data Observable<Data> source = Observable .concat(memory, disk, network) .first(); //先取 memory 中的数据,如果有,就取出,然后停止检索队列;没有就取 disk 的数据,有就取出,然后停止检索队列;最后才是网络请求
//持久化数据or缓存数据 Observable<Data> networkWithSave = network.doOnNext(new Action1<Data>() { @Override public void call(Data data) { saveToDisk(data); cacheInMemory(data); } }); Observable<Data> diskWithCache = disk.doOnNext(new Action1<Data>() { @Override public void call(Data data) { cacheInMemory(data); } }); //现在,如果你使用 networkWithSave 和 diskWithCache,数据将会在加载后自动保存
//处理陈旧数据 Observable<Data> source = Observable .concat(memory, diskWithCache, networkWithSave) .first(new Func1<Data, Boolean>() { @Override public Boolean call(Data data) { return data.isUpToDate();//需要 update 的话,则筛选掉该数据源,检索下一个数据源 } });//注:first() 和 takeFirst() 区别在于,如果没有符合的数据源,first() 会抛 NoSuchElementException 异常
Tips2
-
.subsribeOn()
操作符可以改变Observable应该在哪个调度器上执行任务。 -
.observeOn()
操作符可以改变Observable将在哪个调度器上发送通知。 -
另外,默认情况下,链上的操作符将会在调用
.subsribeOn()
的那个线程上执行任务。如下:
Observable.just(1,2,3) .subscribeOn(Schedulers.newThread()) .flatMap(/** 与UI线程无关的逻辑**//)//会在 subscribeOn() 指定的线程上执行任务 .observeOn(AndroidSchedulers.mainThread()) .subscribe();
Tips3
Backpressure(背压): 事件产生的速度比消费快(在 producer-consumer(生产者-消费者) 模式中)。发生 overproucing 后,当链式结构不能承受数据压力时,就会抛出 MissingBackpressureException
异常。
最常见的 Backpressure 就是连续快速点击按钮....
Tips4
再重用操作符的方式上,使用 compose()
,而不是 flatMap()
:
</div>
Tips5
Schedulers:
将一个耗时的操作,通过 Scehdulers.io()
放到 I/O 线程中去处理
public static void storeBitmap(Context context, Bitmap bitmap, String filename){ Schedulers.io().createWorker().schedule(() -> { blockingStoreBitmap(context, bitmap, filename); }) }
Tips6
-
subject
可以同时是一个 Observable 也可以是一个 Observer,一个 Subject 可以订阅一个 Observable,就像一个观察者,并发射新数据,或者传递它接受到的数据,就像一个 Observable。see more -
对于空的 subscribe() 意为仅仅是为了开启 Observable,而不用管已发出的值。
-
在
subscriber.onNext
或subscriber.onCompleted()
前检测观察者的订阅情况,使代码更高效,因为如果没有观察者等待时我们就不生成没必要的数据项。就像这样:
if (!subscriber.isUnsubscribed()){//避免生成不必要的数据项 return; } subscriber.onNext(); if (!subscriber.isUnsubscribed()){ subscriber.onCompleted(); }
Tips7
我觉得这个 Tips 是最有用的
先祭出两个工具类
对于 SchedulersCompat
类,我们的目的,是为了写出这样的代码:
.compose(SchedulersCompat.<SomeEntity>applyExecutorSchedulers());
场景是这样的:work thread 中处理数据,然后 UI thread 中处理结果。当然,我们知道是要使用 subscribeOn()
和 observeOn()
进行处理。最常见的场景是,调server 的 API 接口取数据的时候,那么,那么多接口,反复写这两个操作符是蛋疼的,为了避免这种情况,我们可以通过 compse()
操作符来实现复用,上面这段代码就实现了这样的功能。
SchedulersCompat
类中有这么一段 Schedulers.from(ExecutorManager.eventExecutor)
,哇喔,这里ExecutorManager
类里维护了一个线程池!目的呢!避免线程反复创建,实现线程复用!!!这样,我就不需要每次都通过Schedulers.newThread()
来实现了!!
如果你想了解更多,关于 compose()
操作符,可以看这里:小鄧子-避免打断链式结构:使用.compose( )操作符
对于这个 Tips, 我给出一个项目实例:RxFace,这是我在做一个人脸识别的 demo 的时候所写的,用了 RxJava
, retrofit
, Okhttp
。我在v1.1版本的时候增加通过compose()
操作符复用 subscribeOn()
和 observeOn()
的逻辑。觉得还 OK 的话,可以点个 star 喔,哈哈
/** * 这个类是 小鄧子 提供的! */ public class SchedulersCompat { private static final Observable.Transformer computationTransformer = new Observable.Transformer() { @Override public Object call(Object observable) { return ((Observable) observable).subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()); } }; private static final Observable.Transformer ioTransformer = new Observable.Transformer() { @Override public Object call(Object observable) { return ((Observable) observable).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } }; private static final Observable.Transformer newTransformer = new Observable.Transformer() { @Override public Object call(Object observable) { return ((Observable) observable).subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()); } }; private static final Observable.Transformer trampolineTransformer = new Observable.Transformer() { @Override public Object call(Object observable) { return ((Observable) observable).subscribeOn(Schedulers.trampoline()) .observeOn(AndroidSchedulers.mainThread()); } }; private static final Observable.Transformer executorTransformer = new Observable.Transformer() { @Override public Object call(Object observable) { return ((Observable) observable).subscribeOn(Schedulers.from(ExecutorManager.eventExecutor)) .observeOn(AndroidSchedulers.mainThread()); } }; /** * Don't break the chain: use RxJava's compose() operator */ public static <T> Observable.Transformer<T, T> applyComputationSchedulers() { return (Observable.Transformer<T, T>) computationTransformer; } public static <T> Observable.Transformer<T, T> applyIoSchedulers() { return (Observable.Transformer<T, T>) ioTransformer; } public static <T> Observable.Transformer<T, T> applyNewSchedulers() { return (Observable.Transformer<T, T>) newTransformer; } public static <T> Observable.Transformer<T, T> applyTrampolineSchedulers() { return (Observable.Transformer<T, T>) trampolineTransformer; } public static <T> Observable.Transformer<T, T> applyExecutorSchedulers() { return (Observable.Transformer<T, T>) executorTransformer; } }
/** * 这个类也是 小鄧子 提供的!! */ public class ExecutorManager { public static final int DEVICE_INFO_UNKNOWN = 0; public static ExecutorService eventExecutor; //private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CPU_COUNT = ExecutorManager.getCountOfCPU(); private static final int CORE_POOL_SIZE = CPU_COUNT + 1; private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 1; private static final BlockingQueue<Runnable> eventPoolWaitQueue = new LinkedBlockingQueue<>(128); private static final ThreadFactory eventThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(@NonNull Runnable r) { return new Thread(r, "eventAsyncAndBackground #" + mCount.getAndIncrement()); } }; private static final RejectedExecutionHandler eventHandler = new ThreadPoolExecutor.CallerRunsPolicy(); static { eventExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, eventPoolWaitQueue, eventThreadFactory, eventHandler); } /** * Linux中的设备都是以文件的形式存在,CPU也不例外,因此CPU的文件个数就等价与核数。 * Android的CPU 设备文件位于/sys/devices/system/cpu/目录,文件名的的格式为cpu\d+。 * * 引用:http://www.jianshu.com/p/f7add443cd32#,感谢 liangfeizc :) * https://github.com/非死book/device-year-class */ public static int getCountOfCPU() { if (Build.VERSION.SDK_INT <= Build.VERSION_CODES.GINGERBREAD_MR1) { return 1; } int count; try { count = new File("/sys/devices/system/cpu/").listFiles(CPU_FILTER).length; } catch (SecurityException | NullPointerException e) { count = DEVICE_INFO_UNKNOWN; } return count; } private static final FileFilter CPU_FILTER = new FileFilter() { @Override public boolean accept(File pathname) { String path = pathname.getName(); if (path.startsWith("cpu")) { for (int i = 3; i < path.length(); i++) { if (path.charAt(i) < '0' || path.charAt(i) > '9') { return false; } } return true; } return false; } }; }