RxJava 使用场景小结

jopen 9年前

 

取数据先检查缓存的场景

取数据,首先检查内存是否有缓存

然后检查文件缓存中是否有

最后才从网络中取

前面任何一个条件满足,就不会执行后面的

final Observable memory = Observable.create(new Observable.OnSubscribe() {      @Override      public void call(Subscriber super String> subscriber) {          if (memoryCache != null) {              subscriber.onNext(memoryCache);          } else {              subscriber.onCompleted();          }      }  });  Observable disk = Observable.create(new Observable.OnSubscribe() {      @Override      public void call(Subscriber super String> subscriber) {          String cachePref = rxPreferences.getString("cache").get();          if (!TextUtils.isEmpty(cachePref)) {              subscriber.onNext(cachePref);          } else {              subscriber.onCompleted();          }      }  });    Observable network = Observable.just("network");    //主要就是靠concat operator来实现  Observable.concat(memory, disk, network)  .first()  .subscribeOn(Schedulers.newThread())  .subscribe(s -> {      memoryCache = "memory";      System.out.println("--------------subscribe: " + s);  });

finalObservablememory=Observable.create(newObservable.OnSubscribe(){      @Override      publicvoidcall(SubscribersuperString>subscriber){          if(memoryCache!=null){              subscriber.onNext(memoryCache);          }else{              subscriber.onCompleted();          }      }  });  Observable disk=Observable.create(newObservable.OnSubscribe(){      @Override      publicvoidcall(SubscribersuperString>subscriber){          StringcachePref=rxPreferences.getString("cache").get();          if(!TextUtils.isEmpty(cachePref)){              subscriber.onNext(cachePref);          }else{              subscriber.onCompleted();          }      }  });     Observable network=Observable.just("network");     //主要就是靠concat operator来实现  Observable.concat(memory,disk,network)  .first()  .subscribeOn(Schedulers.newThread())  .subscribe(s->{      memoryCache="memory";      System.out.println("--------------subscribe: "+s);  });

界面需要等到多个接口并发取完数据,再更新

//拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者  private void testMerge() {      Observable observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());      Observable observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());        Observable.merge(observable1, observable2)              .subscribeOn(Schedulers.newThread())              .subscribe(System.out::println);  }

//拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者  privatevoidtestMerge(){      Observableobservable1=DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());      Observableobservable2=DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());         Observable.merge(observable1,observable2)              .subscribeOn(Schedulers.newThread())              .subscribe(System.out::println);  }

一个接口的请求依赖另一个API请求返回的数据

举个例子,我们经常在需要登陆之后,根据拿到的token去获取消息列表。

这里用RxJava主要解决嵌套回调的问题,有一个专有名词叫 Callback hell

NetworkService.getToken("username", "password")      .flatMap(s -> NetworkService.getMessage(s))      .subscribe(s -> {          System.out.println("message: " + s);      });

NetworkService.getToken("username","password")      .flatMap(s->NetworkService.getMessage(s))      .subscribe(s->{          System.out.println("message: "+s);      }); 

界面按钮需要防止连续点击的情况

RxView.clicks(findViewById(R.id.btn_throttle))      .throttleFirst(1, TimeUnit.SECONDS)      .subscribe(aVoid -> {          System.out.println("click");      });

RxView.clicks(findViewById(R.id.btn_throttle))      .throttleFirst(1,TimeUnit.SECONDS)      .subscribe(aVoid->{          System.out.println("click");      });

响应式的界面

比如勾选了某个checkbox,自动更新对应的preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);  RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);    Preference checked = rxPreferences.getBoolean("checked", true);    CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);  RxCompoundButton.checkedChanges(checkBox)          .subscribe(checked.asAction());

SharedPreferences preferences=PreferenceManager.getDefaultSharedPreferences(this);  RxSharedPreferences rxPreferences=RxSharedPreferences.create(preferences);     Preference checked=rxPreferences.getBoolean("checked",true);     CheckBox checkBox=(CheckBox)findViewById(R.id.cb_test);  RxCompoundButton.checkedChanges(checkBox)          .subscribe(checked.asAction());

复杂的数据变换

Observable.just("1", "2", "2", "3", "4", "5")      .map(Integer::parseInt)      .filter(s -> s > 1)      .distinct()      .take(3)      .reduce((integer, integer2) -> integer.intValue() + integer2.intValue())      .subscribe(System.out::println);//9

Observable.just("1","2","2","3","4","5")      .map(Integer::parseInt)      .filter(s->s>1)      .distinct()      .take(3)      .reduce((integer,integer2)->integer.intValue()+integer2.intValue())      .subscribe(System.out::println);//9