消息总线EventBus源码分析以及与Otto框架对比

jopen 9年前

(一).前言:

上一篇我们对EventBus的简介和基本使用做了说明,今天我们主要深入的使用EventBus,同时会从源码的角度对于订阅和发送消息做分析,以及和另外的消息总线框架Otto在性能等方面做一个对比分析。

FastDev4Android框架项目地址: https://github.com/jiangqqlmj/FastDev4Android

(二).框架简单说明:

通过上一篇文章的介绍,EventBus的使用步骤如下:

  •   定义一个事件,用于EventBus的分发。
  •   定义订阅者,把该订阅者加入到EventBus中。
  •   通过EventBus.post来进行分发事件,告诉订阅者有事情发生了。订阅者接收到信息进行相应处理。
  •   使用完成之后,订阅者需要反注册取消订阅。

具体原理图如下:

订阅者接收到通知的时候会调用相应的函数进行处理事件,在EventBus中一般有以下四种方法来让我们进行处理:

  1. onEvent
  2. onEventMainThread
  3. onEventBackground
  4. onEventAsync

这四个订阅方法有很多的相似之处,但是功能上面还是有点不同的,EventBus会通过调用post方法来进行分发消息,让订阅者进行接收,订阅者接收到事件消息是通过上面几个方法来进行接收和处理的。下面我们来对这四个方法的具体使用场景做一个介绍:

  • onEvent:使用该方法作为订阅函数表示post消息事件和接收消息事件在同一个线程中。
  • onEventMainThread:  该方法会在UI  Main线程中运行,接收事件同时会在UI线程中运行,这样我们就可以在该方法中直接更新UI
  • onEventBackground:使用该方法,如果事件是在UI Main线程发出来,该方法会在子线程中执行,如果事件是从子线程中发出来,该onEventBackground方法会在子线程中执行。
  • onEventAsync:使用该方法,会在创建新的子线程中执行onEventAsync

那么现在订阅的函数方法有四个,我们怎么会知道具体调用哪个方法呢?OK我们看一篇文章:我们会先创建一个事件类,然后进行post发送该对象,在订阅方法中接收,注入哪个函数的参数就是该发送过来的对象。这样我们应该清楚了吧,那是根据传进来的事件对象参数来进行判断的。具体我们看实例:

(三).调用实例:

3.1.实现需求:我们现在创建三个Event事件类,第二个Activity中进行发送,在订阅者Activity中进行接收订阅方法如下:

/**  * 收到消息 进行相关处理  * @param event  */  public void onEventMainThread(TestEventFirst event) {  Log.d("zttjiangqq","onEventMainThread收到消息:"+event.getMsg());  textView_one.setText(event.getMsg());  //showToastMsgShort(event.getMsg());  }  /**  * 收到消息 进行相关处理  * @param event  */  public void onEventMainThread(TestEventSecond event) {  Log.d("zttjiangqq","onEventMainThread收到消息:"+event.getMsg());  textView_two.setText(event.getMsg());  //showToastMsgShort(event.getMsg());  }  /**  * 收到消息 进行相关处理  * @param event  */  public void onEventMainThread(TestEventThird event) {  Log.d("zttjiangqq","onEventMainThread收到消息:"+event.getMsg());  textView_third.setText(event.getMsg());  //showToastMsgShort(event.getMsg());  }

3.2.演示效果如下:

(四).源码解析:

以上主要为EventBus的主要使用,现在开始我们对于EventBus的注册和发送两个模块从源码的角度来走一下。

4.1.EventBus对象获取:我们一般使用单例模式获取。保证对象唯一性。

/**  * 采用单例模式获取EventBus实例对象  一般我们获取EventBus对象 就是采用这种方式,不建议直接new  * @return  */  public static EventBus getDefault() {  if (defaultInstance == null) {  synchronized (EventBus.class) {  if (defaultInstance == null) {  defaultInstance = new EventBus();  }  }  }  return defaultInstance;  }

4.2.订阅模块:入口,进行订阅注册

public void register(Object subscriber) {  register(subscriber, false, 0);  }
  •   subscriber:需要注册的订阅者,
  •   sticky:是否为粘性,这边默认为false,
  •   priority:事件的优先级,默认为0

下面我们来具体看一下register(subscriber, false, 0)方法具体实现的功能:

private synchronized void register(Object subscriber, boolean sticky, int priority) {  List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriber.getClass());  for (SubscriberMethod subscriberMethod : subscriberMethods) {  subscribe(subscriber, subscriberMethod, sticky, priority);  }  }

该函数中会通过findSubscriberMethods()来获取所有订阅的方法,具体主要的步骤我这边已经进行注释了

/**  * 进行查找订阅者中所有订阅的方法  * @param subscriberClass  * @return  所有订阅的方法的集合  */  List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {  String key = subscriberClass.getName();  List<SubscriberMethod> subscriberMethods;  //从缓存中获取订阅的方法,第一次使用肯定缓存中不存在  synchronized (methodCache) {  subscriberMethods = methodCache.get(key);  }  if (subscriberMethods != null) {  return subscriberMethods;  }  //订阅方法的集合  subscriberMethods = new ArrayList<SubscriberMethod>();  Class<?> clazz = subscriberClass;  HashMap<String, Class> eventTypesFound = new HashMap<String, Class>();  StringBuilder methodKeyBuilder = new StringBuilder();  while (clazz != null) {  String name = clazz.getName();  if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {  // Skip system classes, this just degrades performance  // 这边直接跳过了系统类,因为系统类中 普通开发者不会使用在系统类中使用EventBus,所以就忽略处理了,不然会降低性能  break;  }  // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)  try {  // This is faster than getMethods, especially when subscribers a fat classes like Activities  // 通过反射来获取当前类中的所有方法  Method[] methods = clazz.getDeclaredMethods();  // 正式开始查询所有订阅的方法  filterSubscriberMethods(subscriberMethods, eventTypesFound, methodKeyBuilder, methods);  } catch (Throwable th) {  th.printStackTrace();  // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149 Method[] methods = subscriberClass.getMethods();  subscriberMethods.clear();  eventTypesFound.clear();  filterSubscriberMethods(subscriberMethods, eventTypesFound, methodKeyBuilder, methods);  break;  }  clazz = clazz.getSuperclass();  }  //抛出异常,订阅者没有实现onEvent开头的公共方法  if (subscriberMethods.isEmpty()) {  throw new EventBusException("Subscriber " + subscriberClass + " has no public methods called "  + ON_EVENT_METHOD_NAME);  } else {  //订阅的方法存在,同时加入缓存  synchronized (methodCache) {  methodCache.put(key, subscriberMethods);  }  return subscriberMethods;  }  }

然后调用filterSubscriberMethods()进行过滤,把订阅方法加入到集合中

/**  * 查询订阅的方法,查到方法,方法加入到subScriberMethods  * @param subscriberMethods  * @param eventTypesFound  * @param methodKeyBuilder  * @param methods  */  private void filterSubscriberMethods(List<SubscriberMethod> subscriberMethods,  HashMap<String, Class> eventTypesFound, StringBuilder methodKeyBuilder,  Method[] methods) {  //遍历类中的所有方法  for (Method method : methods) {  String methodName = method.getName();  //过滤onEvent开头的方法  if (methodName.startsWith(ON_EVENT_METHOD_NAME)) {  //返回方法修饰符 例如 public,private,protected  int modifiers = method.getModifiers();  Class<?> methodClass = method.getDeclaringClass();  if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {  //订阅方法必须为public类型  //进行获取方法的参数类型  Class<?>[] parameterTypes = method.getParameterTypes();  if (parameterTypes.length == 1) {  //进行获取线程模式类型  ThreadMode threadMode = getThreadMode(methodClass, method, methodName);  if (threadMode == null) {  continue;  }  //取出当前传入的订阅者  Class<?> eventType = parameterTypes[0];  //methodKeyBuilder key ="0"."methodName".">"."eventType_Name"  methodKeyBuilder.setLength(0);  methodKeyBuilder.append(methodName);  methodKeyBuilder.append('>').append(eventType.getName());  String methodKey = methodKeyBuilder.toString();  Class methodClassOld = eventTypesFound.put(methodKey, methodClass);  if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {  // Only add if not already found in a sub class  // 构建一个订阅方法的对象(里面存入方法名,线程模式类型,事件类型),加入到订阅方法集合中  subscriberMethods.add(new SubscriberMethod(method, threadMode, eventType));  } else {  // Revert the put, old class is further down the class hierarchy  eventTypesFound.put(methodKey, methodClassOld);  }  }  } else if (!skipMethodVerificationForClasses.containsKey(methodClass)) {  Log.d(EventBus.TAG, "Skipping method (not public, static or abstract): " + methodClass + "."  + methodName);  }  }  }  }

上面已经进行获取了所有的订阅函数,那么现在开始就可以进行订阅了,让我们来查看subscribe()方法做的功能操作:

/**  * 开始进行为订阅者 注册相关的订阅方法  * @param subscriber  订阅者  * @param subscriberMethod  订阅的方法  * @param sticky    是否为粘性  * @param priority  优先级  */  private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority) {  //通过订阅方法中进行获取订阅方法的类型  Class<?> eventType = subscriberMethod.eventType;  //通过订阅事件的类型 进行获取所有的订阅信息(有订阅者对象,订阅方法,优先级)  CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);  //进行创建一个订阅者  Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority);  if (subscriptions == null) {  //如果当前的事件类型不存在订阅信息,那么就创建一个订阅信息集合  subscriptions = new CopyOnWriteArrayList<Subscription>();  //同时把当前的订阅信息加入到该订阅中  subscriptionsByEventType.put(eventType, subscriptions);  } else {  if (subscriptions.contains(newSubscription)) {  //抛出异常,该订阅者已经注册过该事件类中  throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "  + eventType);  }  }  // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)  // subscriberMethod.method.setAccessible(true);  // 优先级判断,进行排序  int size = subscriptions.size();  for (int i = 0; i <= size; i++) {  if (i == size || newSubscription.priority > subscriptions.get(i).priority) {  subscriptions.add(i, newSubscription);  break;  }  }  //将当前的事件加入到订阅者列表中  List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);  if (subscribedEvents == null) {  subscribedEvents = new ArrayList<Class<?>>();  typesBySubscriber.put(subscriber, subscribedEvents);  }  subscribedEvents.add(eventType);  //是否粘性判断  if (sticky) {  if (eventInheritance) {  // Existing sticky events of all subclasses of eventType have to be considered.  // Note: Iterating over all events may be inefficient with lots of sticky events,  // thus data structure should be changed to allow a more efficient lookup  // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).  Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();  for (Map.Entry<Class<?>, Object> entry : entries) {  Class<?> candidateEventType = entry.getKey();  if (eventType.isAssignableFrom(candidateEventType)) {  Object stickyEvent = entry.getValue();  checkPostStickyEventToSubscription(newSubscription, stickyEvent);  }  }  } else {  Object stickyEvent = stickyEvents.get(eventType);  checkPostStickyEventToSubscription(newSubscription, stickyEvent);  }  }  }

OK完成以上步骤,我们就大体完成了订阅注册工作,下面就是需要分析一下post的流程:

主要先看post主函数:

/**  * 向EventBus中发送消息事件对象  * @param event  */  public void post(Object event) {  PostingThreadState postingState = currentPostingThreadState.get();  //把消息加入到事件队列中  List<Object> eventQueue = postingState.eventQueue;  eventQueue.add(event);  if (!postingState.isPosting) {  postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();  postingState.isPosting = true;  if (postingState.canceled) {  throw new EventBusException("Internal error. Abort state was not reset");  }  try {  //当消息队列不为空的时候,进行这正式发送消息,采用循环,把队列中所有的消息发送出去  while (!eventQueue.isEmpty()) {  postSingleEvent(eventQueue.remove(0), postingState);  }  } finally {  postingState.isPosting = false;  postingState.isMainThread = false;  }  }  }

然后进行发送功能,调用postSingleEvent()函数方法:

//消息发送:发送单个事件消息  private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {  Class<?> eventClass = event.getClass();  boolean subscriptionFound = false;  if (eventInheritance) {  List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);  int countTypes = eventTypes.size();  for (int h = 0; h < countTypes; h++) {  Class<?> clazz = eventTypes.get(h);  subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);  }  } else {  subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);  }  if (!subscriptionFound) {  if (logNoSubscriberMessages) {  Log.d(TAG, "No subscribers registered for event " + eventClass);  }  if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&  eventClass != SubscriberExceptionEvent.class) {  post(new NoSubscriberEvent(this, event));  }  }  }

接着进行消息过滤postSingleEventForEventType()方法

/**  * 进行该特定的Event发送相应的消息  * @param event  事件消息  * @param postingState  * @param eventClass  * @return  */  private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {  CopyOnWriteArrayList<Subscription> subscriptions;  synchronized (this) {  subscriptions = subscriptionsByEventType.get(eventClass);  }  if (subscriptions != null && !subscriptions.isEmpty()) {  for (Subscription subscription : subscriptions) {  postingState.event = event;  postingState.subscription = subscription;  boolean aborted = false;  try {  //发生消息给订阅者  postToSubscription(subscription, event, postingState.isMainThread);  aborted = postingState.canceled;  } finally {  postingState.event = null;  postingState.subscription = null;  postingState.canceled = false;  }  if (aborted) {  break;  }  }  return true;  }  return false;  }

最终这边有一个核心的方法:postToSubscription()来进行post消息

/**  * 进行发送消息,同时根据发送过来的线程类型类型,发送消息给特定的订阅方法来进行执行  * @param subscription  订阅者  * @param event   执行事件  * @param isMainThread 是否为主线程  */  private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {  switch (subscription.subscriberMethod.threadMode) {  case PostThread:  //直接在本线程中调用订阅函数  invokeSubscriber(subscription, event);  break;  case MainThread:  if (isMainThread) {  //如果是主线程,直接调用订阅函数  invokeSubscriber(subscription, event);  } else {  //如果不是主线程,通过handler进行处理  mainThreadPoster.enqueue(subscription, event);  }  break;  case BackgroundThread:  if (isMainThread) {  //如果是主线程,采用runnable 中调用  backgroundPoster.enqueue(subscription, event);  } else {  //子线程,直接调用  invokeSubscriber(subscription, event);  }  break;  case Async:  //加入到子线程中进行发送  asyncPoster.enqueue(subscription, event);  break;  default:  throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);  }  }

OK,到这边基本上完成EventBus的register和post的流程的讲解,关于这个核心类EventBus的注释过的类文件已经上传了大家可以通过该地址进行下载: EventBus注释过的类文件

(五).和Otto消息总线框架对比:

Otto是Android中另外一个消息总线库,它其实是EventBus的变种,该和EventBus有一些相同的方法(例如:register,post,unregister…),但是这两者之间也有一些不同之处如下:

EventBus

Otto

声明事件处理方法 命名约定 注解
事件继承

YES

YES

订阅继承

YES

NO

缓存事件 YES,sticky events NO
事件生产

NO

YES

子线程事件传输 YES(Default)

YES

主线程事件传输

YES

NO

后台线程事件传输

YES

NO

异步线程事件传输

YES

NO

除了以上功能不同以外,这边还有性能上面的差异。为了测试性能问题,我们clone当前EventBus项目的时候,会发现有一个EventBusPerformance项目,我们可以使用的不同场景来比较。

基于下表结果显示,每一个测试方面 EventBus的性能都大于Otto

EventBus

Otto

在Android2.3模拟器发送1000个事件 快70%
S3 Android4.0系统,发送1000个事件 快110%
Android2.3模拟器,注册1000个订阅者 快10%
S3 Android4.0系统,注册1000个订阅者 快70%
Android2.3模拟器,注册订阅者冷启动 快350%
S3 Android4.04 注册订阅者冷启动 几乎一样

通过对比发现EventBus无论在功能上面还是性能上面,远远超过Otto消息总线框架,所以我们建议使用EventBus消息总线框架。

到此我们的EventBus专题内容已经全部讲完了,相信大家在这个专题中能对EventBus会有一个比较全面的了解,同时也能够简单的了解实现的原理以及相关逻辑。

我们的项目已经配置集成了消息总线EventBus的例子.欢迎大家去Github站点进行clone或者下载浏览: https://github.com/jiangqqlmj/FastDev4Android 同时欢迎大家star和fork整个开源快速开发框架项目~

尊重原创,转载请注明:From Sky丶清(http://www.lcode.org) 侵权必究!

关注我的订阅号(codedev123),每天分享移动开发技术(Android/IOS),项目管理以及博客文章!(欢迎关注,第一时间推送精彩文章)

关注我的微博,可以获得更多精彩内容

</div>

来自: http://www.lcode.org/消息总线eventbus源码分析以及与otto框架对比二/