事件总线源码分析

jopen 9年前

基本概念

在安卓中处理不同组件之间的事件传递依靠广播机制,即Intent/BroadcastReceiver机制,其原理类似于传感网中的Ad hoc网络模式,所有组件处在一种无序状态;

事件总线机制则引入中心控制节点来集中管理事件,类似于移动通信网络中的基站功能。

总线这个概念来自于计算机,计算机中各种功能部件如CPU,显卡之类不会采用两两互联的方式,那样布线会非常复杂,实际是使用总线作为公共通信干线,挂载上所有的功能部件。

事件总线框架采用订阅/发布模型

在这个模型中包含以下元素

1.事件类,这是要传递的事件对象。
2.事件总线,是中心控制节点,负责管理事件的注册,接收发布的事件,并完成事件匹配以进行事件处理。
3.发布者,负责分发事件。
4.订阅者,负责事件处理并注册到事件总线。

事件总线模型比广播机制的优势在于

1.简化了组件之间的通信
2.实现了事件分发和事件处理的解耦,因此二者可以在不同线程中实现。
3.扩展了事件类,实际上事件类就是根类Object,而不必藏身在Intent中了。

事件总线模型不能完全替代广播机制,因为广播机制可以完成跨App间组件调用。

EventBus

EventBus采用发布/订阅模式,想要接收事件必须先订阅总线,这与手机要注册基站类似。EventBus的作用在于优化Activities, Fragments等之间的通信,并处理能够线程问题。

EventBus的使用非常简单,包括以下步骤

1.总线的建立。

EventBus eventBus = EventBus.getDefault();

毫无疑问,事件总线对象应该是单例实现,如果要对其进行初始化配置最好放在Applicaiton类中进行。
2.创建事件,事件就是普通的Java类,如

public class MessageEvent {      public final String message;        public MessageEvent(String message) {          this.message = message;      }  }

3.创建订阅方法。订阅者实际上是处理事件的方法,以onEvent来命名。

public void onEvent(MessageEvent event){      Toast.makeText(this, event.message, Toast.LENGTH_SHORT).show();  }

4.订阅者注册和取消。将包含订阅方法的类注册到总线。

protected void onCreate(Bundle savedInstanceState) {      //。。。      EventBus.getDefault().register(this,1);  }    @Override  protected void onDestroy() {      super.onDestroy();      EventBus.getDefault().register(this);  }

5.事件发布。

EventBus.getDefault().post(new MessageEvent("MessageEvent"));

线程切换

关于线程切换要注意在事件分发是处在当前线程中的,是比较容易控制的。EventBus主要简化的是事件处理所处的线程。共有四种线程切换方法,其区别在于事件处理方法的命名。

  • onEvent方法,为默认PostThread模式。处理线程就是分发线程。

  • onEventMainThread方法,主线程模式。处理线程最终为UI线程,与分发线程无关。

  • onEventBackgroundThread方法,背景线程模式。处理线程最终为背景线程。这意味着如果分发线程是UI线程,则将新建一背景线程作处理线程;如果分发线程不是UI线程,则分发线程就用作处理线程。

  • onEventAsync方法,异步模式。不管分发线程如何,处理线程都将新建一线程,底层采用线程池技术实现。

Otto

OttoSquare推出的事件总线框架,基于Guava框架。Guava框架查找订阅方法采用遍历类方法,Otto则是使用注解来查找,虽然EventBus在初始版本中也采用注解订阅方法,但因为性能问题改为按照方法名查找。

Otto的使用与EventBus类似,最大区别在于是否对订阅方法使用注解。

Bus bus = new Bus(ThreadEnforcer.MAIN);    @Subscribe   public void answerAvailable(AnswerAvailableEvent event){      // TODO: React to the event somehow!  }    bus.register(this);    bus.post(new AnswerAvailableEvent(42));

两种框架的对比

可以看到在功能和性能上EventBus都完胜Otto。

EventBus源码解析

首先要理解几个概念,先看看订阅方法

void onEvent(MessageEvent event)

订阅方法中包含一个具体事件类作参数,并通过重载实现不同的订阅方法。

  • SubscriberMethod类表示单个订阅方法,主要域包括

Method method;//订阅方法的反射表示,实现ding'y方法的  ThreadMode threadMode;//处理事件所用线程模型  Class<?> eventType;//具体事件类类型
  • Subscription类也表示订阅方法,是SubscriberMethod类的进一步封装,主要域包括

Object subscriber; //具体事件类对象  SubscriberMethod subscriberMethod;  int priority; //订阅方法优先级

总的说来,事件总线的原理是在总线中维持两个集合:一个表示订阅方法集合,一个表示事件类型集合。
注册订阅方法分为两步:首先查找所有出所有订阅方法;其次将订阅方法及其事件类型分别加入总线的两个集合中。
事件分发时,需要根据事件对象提取出事件类型,而后构建订阅方法,在总线两个集合中分别查找是否存在该事件;如果存在就按照线程模型分别执行。

  • PostingThreadState

List<Object> eventQueue = new ArrayList<Object>();  boolean isPosting;  boolean isMainThread;  Subscription subscription;//订阅类  Object event;              //事件类型  boolean canceled;

1.EventBus创建使用getDefault()方法采用单例模式,也可以通过buidler指定,需要同步创建。

2.订阅方法注册

EventBus.getDefault().register(this);

总的说来方法订阅包括两步:

这里注意形式上注册到总线的是MainActivity对象,并不是具体订阅方法,所以存在一个查找出活动对象中所有订阅方法的过程。

同时总线中要维持两个集合:订阅类集合事件类型集合。新的订阅方法要添加到这两个集合中。

private synchronized void register(Object subscriber, boolean sticky, int priority) {      //1.首先对活动类查找出其包含的订阅方法(SubscriberMethod)集合      List<SubscriberMethod> subscriberMethods = findSubscriberMethods(subscriber.getClass());      //2.而后将对每一个订阅方法(SubscriberMethod)注册      for (SubscriberMethod subscriberMethod : subscriberMethods) {          subscribe(subscriber, subscriberMethod, sticky, priority);      }  }

注册订阅方法(SubscriberMethod)简化版如下,不考虑striky情况,完成将一个订阅方法A插入总线集合中:

//Object subscriber 如 MainActivity  //SubscriberMethod  订阅方法,如onEvent  private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority) {      //获取订阅方法A中的事件类型A.eventType,如MessageEvent      Class<?> eventType = subscriberMethod.eventType;      //根据事件类型A.eventType获取总线中已经存在的订阅方法(Subscription)集合S      CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);      //根据订阅方法A创建其对应的封装订阅方法B      Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority);      //在订阅方法集合S中查找封装订阅方法B      //首先要处理S=NULL的情况,此时要创建S,并将B插入S。      //如果S已经包含B,抛出异常,即不能重复注册同一个订阅方法。      if (subscriptions == null) {              subscriptions = new CopyOnWriteArrayList<Subscription>();              subscriptionsByEventType.put(eventType, subscriptions);          } else {              if (subscriptions.contains(newSubscription)) {                  throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "                          + eventType);              }          }      //如果S不为空且B不在S中,要将B按照优先级顺序插入到S      int size = subscriptions.size();          for (int i = 0; i <= size; i++) {              if (i == size || newSubscription.priority > subscriptions.get(i).priority) {                  subscriptions.add(i, newSubscription);                  break;              }          }      //总线中维护一个事件类型集合,还需要将新事件类型A.eventType加入该集合      List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);      if (subscribedEvents == null) {              subscribedEvents = new ArrayList<Class<?>>();              typesBySubscriber.put(subscriber, subscribedEvents);          }          subscribedEvents.add(eventType);      }

3.事件分发方法简化版为

public void post(Object event) {      //分发事件时将具体事件入队      PostingThreadState postingState = currentPostingThreadState.get();      List<Object> eventQueue = postingState.eventQueue;      eventQueue.add(event);      //处理队列中优先级最高的事件      while (!eventQueue.isEmpty()) {          postSingleEvent(eventQueue.remove(0), postingState);      }  }
//Object event 事件类对象  private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {      //获取具体事件类对象event对应的事件类型E,如MessageEvent      Class<?> eventClass = event.getClass();      //在事件总线集合中查找是否存在具体事件类型E,如果不存在,则分发NoSubscriberEvent事件;如果存在,继续分发。      boolean subscriptionFound = false;      subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);  }
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {      //根据事件类型同步获取总线中的封装订阅方法集合S,这里要注意某个具体事件类型可能有多个线程版本      CopyOnWriteArrayList<Subscription> subscriptions;      //遍历S中订阅方法,进行事件处理      if (subscriptions != null && !subscriptions.isEmpty()) {          for (Subscription subscription : subscriptions) {              postToSubscription(subscription, event, postingState.isMainThread);          }          return true;      }      return false;  }

根据ThreadMode类型处理事件

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {      switch (subscription.subscriberMethod.threadMode) {          case PostThread:              invokeSubscriber(subscription, event);              break;          case MainThread:              if (isMainThread) {                  invokeSubscriber(subscription, event);              } else {                  mainThreadPoster.enqueue(subscription, event);              }              break;          case BackgroundThread:              if (isMainThread) {                  backgroundPoster.enqueue(subscription, event);              } else {                  invokeSubscriber(subscription, event);              }              break;          case Async:              asyncPoster.enqueue(subscription, event);              break;          default:              throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);      }  }

4.几种线程有关的事件处理方法
PostThread方式采用反射完成事件处理。

void invokeSubscriber(Subscription subscription, Object event) {      subscription.subscriberMethod.method.invoke(subscription.subscriber, event);  }

MainThread模式在异步情况下采用Handler完成事件处理,具体类为HandlerPoster类,这个类采用Looper.getMainLooper()构造以保证事件处理执行在主线程中。

mainThreadPoster.enqueue(subscription, event);
void enqueue(Subscription subscription, Object event) {      //构造一个PendingPost类并将其入队      PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);      queue.enqueue(pendingPost);  }

BackgroundThreadAsync模式采用线程池来执行,

eventBus.getExecutorService() = Executors.newCachedThreadPool();
public void enqueue(Subscription subscription, Object event) {      PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);      queue.enqueue(pendingPost);      eventBus.getExecutorService().execute(this);  }

参考文献

greenrobot-EventBus-HOWTO.md
EventBus for Android™
Otto
EventBus Comparison with Square's Otto
eventbus-for-android

来自: http://segmentfault.com/a/1190000004312745