EventBus 在发送了消息后,就会直接回调该消息类型的所有监听方法,回调操作是通过反射 method.invoke
来实现的,那么在回调之前也必须先拿到应用内所有的监听方法才行。EventBus 获取监听方法的方式有两种:
不配置注解处理器。在 subscriber 进行 register 时通过反射获取到 subscriber 的所有监听方法,这种方式是在运行时实现的
配置注解处理器。预先将所有的监听方法的方法签名信息保存到辅助文件中,在运行时就可以直接拿到所有的解析结果而不必依靠反射来获取,这种方式是在编译阶段实现的,相比第一种方式性能上会高很多
优点: 开销小,代码更优雅、简洁,解耦发送者和接收者,可动态设置事件处理线程和优先级。缺点: 每个事件必须自定义一个事件类,增加了维护成本。
注册 EventBus 通过 EventBus.register(Object)
方法来进行注册的。该方法会对 subscriber 进行解析,通过 SubscriberMethodFinder 的 findSubscriberMethods
方法将 subscriber 包含的所有声明了 @Subscribe
注解的方法的签名信息保存到内存中,当有消息被 Post 时就可以直接在内存中查找到目标方法了
1 2 3 4 5 6 7 8 9 public void register (Object subscriber) { Class subscriberClass = subscriber.getClass(); List subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this ) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } }
SubscriberMethod 包含了 @Subscribe
的参数信息以及对应的方法签名信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class SubscriberMethod { final Method method; final ThreadMode threadMode; final Class<?> eventType; final int priority; final boolean sticky; String methodString; ··· }
SubscriberMethodFinder 会将每次的查找结果缓存到 METHOD_CACHE
中,这对某些会先后经历 多次注册和反注册 操作的 subscriber 来说比较有用,因为每次查找可能需要依靠多次循环遍历和反射操作,会稍微有点消耗性能,但缓存也会占用一部分内存空间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap <>();List<SubscriberMethod> findSubscriberMethods (Class<?> subscriberClass) { List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null ) { return subscriberMethods; } if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException ("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation" ); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }
因为 ignoreGeneratedIndex
默认是 false,所以这里直接看 findUsingInfo(subscriberClass)
方法
其主要逻辑是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 private static final int POOL_SIZE = 4 ;private static final FindState[] FIND_STATE_POOL = new FindState [POOL_SIZE];private List<SubscriberMethod> findUsingInfo (Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null ) { findState.subscriberInfo = getSubscriberInfo(findState); if (findState.subscriberInfo != null ) { SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods(); for (SubscriberMethod subscriberMethod : array) { if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); } } } else { findUsingReflectionInSingleClass(findState); } findState.moveToSuperclass(); } return getMethodsAndRelease(findState); } private List<SubscriberMethod> getMethodsAndRelease (FindState findState) { List<SubscriberMethod> subscriberMethods = new ArrayList <>(findState.subscriberMethods); findState.recycle(); synchronized (FIND_STATE_POOL) { for (int i = 0 ; i < POOL_SIZE; i++) { if (FIND_STATE_POOL[i] == null ) { FIND_STATE_POOL[i] = findState; break ; } } } return subscriberMethods; } private FindState prepareFindState () { synchronized (FIND_STATE_POOL) { for (int i = 0 ; i < POOL_SIZE; i++) { FindState state = FIND_STATE_POOL[i]; if (state != null ) { FIND_STATE_POOL[i] = null ; return state; } } } return new FindState (); }
findUsingReflectionInSingleClass
是如何完成反射操作的
如果解析到的方法签名不符合要求,则在开启了 严格检查 的情况下直接抛出异常;如果方法签名符合要求,则会将方法签名保存到 subscriberMethods
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 private void findUsingReflectionInSingleClass (FindState findState) { Method[] methods; try { methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { try { methods = findState.clazz.getMethods(); } catch (LinkageError error) { String msg = "Could not inspect methods of " + findState.clazz.getName(); if (ignoreGeneratedIndex) { msg += ". Please consider using EventBus annotation processor to avoid reflection." ; } else { msg += ". Please make this class visible to EventBus annotation processor to avoid reflection." ; } throw new EventBusException (msg, error); } findState.skipSuperClasses = true ; } for (Method method : methods) { int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0 ) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1 ) { Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null ) { Class<?> eventType = parameterTypes[0 ]; if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod (method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException ("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException (methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract" ); } } }
通过反射的方式获取订阅者类中的所有声明方法,然后在这些方法里面寻找以 @Subscribe作为注解的方法进行处理。
在经过经过一轮检查,看看 findState.subscriberMethods是否存在,如果没有,将方法名,threadMode,优先级,是否为 sticky 方法等信息封装到 SubscriberMethod 对象中,最后添加到 subscriberMethods 列表中。
findUsingReflectionInSingleClass
方法的一个重点是 findState.checkAdd
方法。如果往简单了想,只要把 subscriber 每个声明了 Subscribe 注解的方法都给保存起来就可以了,可是还需要考虑一些特殊情况:
checkAdd
方法就用于进行上述判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 final Map<Class, Object> anyMethodByEventType = new HashMap <>();final Map<String, Class> subscriberClassByMethodKey = new HashMap <>();boolean checkAdd (Method method, Class<?> eventType) { Object existing = anyMethodByEventType.put(eventType, method); if (existing == null ) { return true ; } else { if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method) existing, eventType)) { throw new IllegalStateException (); } anyMethodByEventType.put(eventType, this ); } return checkAddWithMethodSignature(method, eventType); } } private boolean checkAddWithMethodSignature (Method method, Class<?> eventType) { methodKeyBuilder.setLength(0 ); methodKeyBuilder.append(method.getName()); methodKeyBuilder.append('>' ).append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); Class<?> methodClass = method.getDeclaringClass(); Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass); if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) { return true ; } else { subscriberClassByMethodKey.put(methodKey, methodClassOld); return false ; } }
进行上述操作后,就找到了 subscriber 包含的所有监听方法了,这些方法都会保存到 List<subscribermethod></subscribermethod>
中。拿到所有方法后, register
方法就需要对 subscriber 及其所有监听方法进行归类了
归类的目的是既是为了方便后续操作也是为了提高效率。 因为可能同时存在多个 subscriber 声明了多个对同种类型消息的监听方法,那么就需要将每种消息类型和其当前的所有监听方法对应起来,提高消息的发送效率。而且在 subscriber 解除注册时,也需要将 subscriber 包含的所有监听方法都给移除掉,那么也需要预先进行归类。监听方法也可以设定自己对消息处理的优先级顺序,所以需要预先对监听方法进行排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public void register (Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this ) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } } private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;private final Map<Object, List<Class<?>>> typesBySubscriber;private void subscribe (Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription (subscriber, subscriberMethod); CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null ) { subscriptions = new CopyOnWriteArrayList <>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException ("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } int size = subscriptions.size(); for (int i = 0 ; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break ; } } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null ) { subscribedEvents = new ArrayList <>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); if (subscriberMethod.sticky) { if (eventInheritance) { Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }
消息 注册 1 2 3 4 5 6 7 8 9 10 11 public static EventBus getDefault () { if (defaultInstance == null ) { synchronized (EventBus.class) { if (defaultInstance == null ) { defaultInstance = new EventBus (); } } } return defaultInstance; }
在getDefault()中使用了双重校验并加锁的单例模式来创建EventBus实例
1 2 3 4 5 private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder ();public EventBus () { this (DEFAULT_BUILDER); }
在EventBus的默认构造方法中又调用了它的另一个有参构造方法,将一个类型为EventBusBuilder的DEFAULT_BUILDER对象传递进去了。这里的EventBusBuilder很明显是一个EventBus的建造器,以便于EventBus能够添加自定义的参数和安装一个自定义的默认EventBus实例。
EventBusBuilder的构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;private final Map<Object, List<Class<?>>> typesBySubscriber;private final Map<Class<?>, Object> stickyEvents;EventBus(EventBusBuilder builder) { ... subscriptionsByEventType = new HashMap <>(); typesBySubscriber = new HashMap <>(); stickyEvents = new ConcurrentHashMap <>(); mainThreadSupport = builder.getMainThreadSupport(); mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this ) : null ; backgroundPoster = new BackgroundPoster (this ); asyncPoster = new AsyncPoster (this ); ... subscriberMethodFinder = new SubscriberMethodFinder (builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex); ... executorService = builder.executorService; }
注释1, 创建了一个subscriptionsByEventType对象,可以看到它是一个类型为HashMap的subscriptionsByEventType对象,并且其key为 Event 类型,value为 Subscription链表。这里的Subscription是一个订阅信息对象,它里面保存了两个重要的字段,一个是类型为 Object 的 subscriber,该字段即为注册的对象(在 Android 中时通常是 Activity对象);另一个是 类型为SubscriberMethod 的 subscriberMethod,它就是被@Subscribe注解的那个订阅方法,里面保存了一个重要的字段eventType,它是 Class<?> 类型的,代表了 Event 的类型。
注释2, 新建了一个类型为 Map 的typesBySubscriber对象,它的key为subscriber对象,value为subscriber对象中所有的 Event 类型链表,日常使用中仅用于判断某个对象是否注册过。
注释3, 新建了一个类型为ConcurrentHashMap的stickyEvents对象,它是专用于粘性事件处理的一个字段,key为事件的Class对象,value为当前的事件。
普通事件是先注册,然后发送事件才能收到;
粘性事件,在发送事件之后再订阅该事件也能收到。
粘性事件会保存在内存中,每次进入都会去内存中查找获取最新的粘性事件,除非手动解除注册。
注释4, 新建了三个不同类型的事件发送器,这里总结下:
mainThreadPoster: 主线程事件发送器,通过它的mainThreadPoster.enqueue(subscription, event)方法可以将订阅信息和对应的事件进行入队,然后通过 handler 去发送一个消息,在 handler 的 handleMessage 中去执行方法。
backgroundPoster: 后台事件发送器,通过它的enqueue() 将方法加入到后台的一个队列,最后通过线程池去执行,注意,它在 Executor的execute()方法 上添加了 synchronized关键字 并设立 了控制标记flag,保证任一时间只且仅能有一个任务会被线程池执行。
asyncPoster: 实现逻辑类似于backgroundPoster,不同于backgroundPoster的保证任一时间只且仅能有一个任务会被线程池执行的特性,asyncPoster则是异步运行的,可以同时接收多个任务。
注释5, 新建了一个subscriberMethodFinder对象,这是从EventBus中抽离出的订阅方法查询的一个对象,组合优于继承
注释6, 从builder中取出了一个默认的线程池对象,它由Executors的newCachedThreadPool()方法创建,它是一个有则用、无则创建、无数量上限的线程池。
消息的执行策略 在介绍消息的具体发送步骤前,先来了解下 EventBus 几种不同的消息执行策略。执行策略由枚举 ThreadMode 来定义,在 @Subscribe
注解中进行声明。执行策略决定了消息接收方是在哪一个线程接收到消息的
ThreadMode
执行线程
POSTING
在发送事件的线程中执行
直接调用消息接收方
MAIN
在主线程中执行
如果事件就是在主线程发送的,则直接调用消息接收方,否则通过 mainThreadPoster 进行处理
MAIN_ORDERED
在主线程中按顺序执行
通过 mainThreadPoster 进行处理,以此保证消息处理的有序性
BACKGROUND
在后台线程中按顺序执行
如果事件是在主线程发送的,则提交给 backgroundPoster 处理,否则直接调用消息接收方
ASYNC
提交给空闲的后台线程执行
将消息提交到 asyncPoster 进行处理
执行策略的具体细分逻辑是在 EventBus 类的 postToSubscription
方法完成的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void postToSubscription (Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break ; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break ; case MAIN_ORDERED: if (mainThreadPoster != null ) { mainThreadPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case ASYNC: asyncPoster.enqueue(subscription, event); break ; default : throw new IllegalStateException ("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
例如,对于 AsyncPoster 来说,其每接收到一个消息,都会直接在 enqueue
方法中将自己(Runnable)提交给线程池进行处理,而使用的线程池默认是 Executors.newCachedThreadPool()
,该线程池每接收到一个任务都会马上交由线程进行处理,所以 AsyncPoster 并不保证消息处理的有序性,但在消息处理的及时性方面会比较高,且每次提交给 AsyncPoster 的消息可能都是由不同的线程来处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class AsyncPoster implements Runnable , Poster { private final PendingPostQueue queue; private final EventBus eventBus; AsyncPoster(EventBus eventBus) { this .eventBus = eventBus; queue = new PendingPostQueue (); } public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this ); } public void run () { PendingPost pendingPost = queue.poll(); if (pendingPost == null ) { throw new IllegalStateException ("No pending post available" ); } eventBus.invokeSubscriber(pendingPost); } }
而 BackgroundPoster 会将任务依次缓存到 PendingPostQueue 中,每次只取出一个任务交由线程池来执行,所以 BackgroundPoster 会保证消息队列在处理时的有序性,但在消息处理的及时性方面相比 AsyncPoster 要低一些
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final class BackgroundPoster implements Runnable , Poster { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning; BackgroundPoster(EventBus eventBus) { this .eventBus = eventBus; queue = new PendingPostQueue (); } public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this ) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true ; eventBus.getExecutorService().execute(this ); } } } ··· }
而不管是使用什么消息处理策略,最终都是通过调用以下方法来反射调用监听方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void invokeSubscriber (PendingPost pendingPost) { Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event); } } void invokeSubscriber (Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException ("Unexpected exception" , e); } }
发送非黏性消息 EventBus.getDefault().post(Any)
方法用于发送非黏性消息。EventBus 会通过 ThreadLocal 为每个发送消息的线程维护一个 PostingThreadState 对象,用于为每个线程维护一个消息队列及其它辅助参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 final static class PostingThreadState { final List<Object> eventQueue = new ArrayList <>(); boolean isPosting; boolean isMainThread; Subscription subscription; Object event; boolean canceled; } private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal <PostingThreadState>() { @Override protected PostingThreadState initialValue () { return new PostingThreadState (); } }; public void post (Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting = true ; if (postingState.canceled) { throw new EventBusException ("Internal error. Abort state was not reset" ); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0 ), postingState); } } finally { postingState.isPosting = false ; postingState.isMainThread = false ; } } }
每次 post 进来的消息都会先存到消息队列 eventQueue
中,然后通过 while 循环进行处理,消息处理逻辑是通过 postSingleEvent
方法来完成的
其主要逻辑是:
假设 EventA 继承于 EventB,那么当发送的消息类型是 EventA 时,就需要考虑 EventB 的监听方法是否可以接收到 EventA,即需要考虑消息类型是否具有继承关系
具有继承关系。此时就需要拿到 EventA 的所有父类型,然后根据 EventA 本身和其父类型关联到的所有监听方法依次进行消息发送
不具有继承关系。此时只需要向 EventA 的监听方法进行消息发送即可
如果发送的消息最终没有找到任何接收者,且 sendNoSubscriberEvent 为 true,那么就主动发送一个 NoSubscriberEvent 事件,用于向外通知消息没有找到任何接收者
监听方法之间可以设定消息处理的优先级高低,高优先级的方法可以通过调用 cancelEventDelivery 方法来拦截事件,不再继续向下发送。但只有在 POSTING 模式下才能拦截事件,因为只有在这个模式下才能保证监听方法是按照严格的先后顺序被执行的
最终,发送的消息都会通过 postToSubscription
方法来完成,根据接收者方法不同的处理策略进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 private void postSingleEvent (Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false ; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0 ; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { logger.log(Level.FINE, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent (this , event)); } } } private boolean postSingleEventForEventType (Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this ) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null ; postingState.subscription = null ; postingState.canceled = false ; } if (aborted) { break ; } } return true ; } return false ; }
发送黏性消息 黏性消息的意义是为了使得在消息发出来后,即使是后续再进行 register
的 subscriber 也可以收到之前发送的消息,这需要将 @Subscribe
注解的 sticky
属性设为 true,即表明消息接收方希望接收黏性消息
EventBus.getDefault().postSticky(Any)
方法就用于发送黏性消息。黏性事件会被保存到 stickyEvents
这个 Map 中,key 是 event 的 Class 对象,value 是 event 本身,这也说明对于同一类型的黏性消息来说,只会保存其最后一个消息
1 2 3 4 5 6 7 8 9 10 11 12 13 private final Map<Class<?>, Object> stickyEvents;public void postSticky (Object event) { synchronized (stickyEvents) { stickyEvents.put(event.getClass(), event); } post(event); }
对于一个黏性消息,会有两种不同的时机被 subscriber 接收到
调用 postSticky 方法时,被其现有的 subscriber 直接接收到,这种方式通过在 postSticky 方法里调用 post 方法来实现
调用 register 方法时,新添加的 subscriber 会判断 stickyEvents 中是否存在关联的 event 需要进行分发
这里主要看第二种情况。register 操作会在 subscribe 方法里完成黏性事件的分发。和 post 操作一样,发送黏性事件时也需要考虑 event 的继承关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private void subscribe (Object subscriber, SubscriberMethod subscriberMethod) { ··· if (subscriberMethod.sticky) { if (eventInheritance) { Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } private void checkPostStickyEventToSubscription (Subscription newSubscription, Object stickyEvent) { if (stickyEvent != null ) { postToSubscription(newSubscription, stickyEvent, isMainThread()); } }
移除黏性事件 移除指定的黏性事件可以通过以下方法来实现,都是用于将指定事件从 stickyEvents
中移除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public <T> T removeStickyEvent (Class<T> eventType) { synchronized (stickyEvents) { return eventType.cast(stickyEvents.remove(eventType)); } } public boolean removeStickyEvent (Object event) { synchronized (stickyEvents) { Class<?> eventType = event.getClass(); Object existingEvent = stickyEvents.get(eventType); if (event.equals(existingEvent)) { stickyEvents.remove(eventType); return true ; } else { return false ; } } }
解除注册 解除注册的目的是为了避免内存泄露,EventBus 使用了单例模式,如果不主动解除注册的话,EventBus 就会一直持有 subscriber。解除注册是通过 unregister
方法来实现的,该方法逻辑也比较简单,只是将 subscriber 以及其关联的所有 method 对象从集合中移除而已
而此处虽然会将关于 subscriber 的信息均给移除掉,但是在 SubscriberMethodFinder 中的静态成员变量 METHOD_CACHE
依然会缓存着已经注册过的 subscriber 的信息,这也是为了在某些 subscriber 会先后多次注册 EventBus 时可以做到信息复用,避免多次循环反射
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public synchronized void unregister (Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber); if (subscribedTypes != null ) { for (Class<?> eventType : subscribedTypes) { unsubscribeByEventType(subscriber, eventType); } typesBySubscriber.remove(subscriber); } else { logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } private void unsubscribeByEventType (Object subscriber, Class<?> eventType) { List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null ) { int size = subscriptions.size(); for (int i = 0 ; i < size; i++) { Subscription subscription = subscriptions.get(i); if (subscription.subscriber == subscriber) { subscription.active = false ; subscriptions.remove(i); i--; size--; } } } }
APT注解处理器 使用注解处理器(Annotation Processing Tool)可以避免 subscriber 进行注册时的多次循环反射 操作,极大提升了 EventBus 的运行效率。注解处理器是一种注解处理工具,用来在编译期扫描和处理注解,通过注解来生成 Java 文件。即以注解作为桥梁,通过预先规定好的代码生成规则来自动生成 Java 文件。此类注解框架的代表有 ButterKnife、Dragger2、EventBus 等
Java API 已经提供了扫描源码并解析注解的框架,开发者可以通过继承 AbstractProcessor 类来实现自己的注解解析逻辑。APT 的原理就是在注解了某些代码元素(如字段、函数、类等)后,在编译时编译器会检查 AbstractProcessor 的子类,并且自动调用其 process()
方法,然后将添加了指定注解的所有代码元素作为参数传递给该方法,开发者再根据注解元素在编译期输出对应的 Java 代码
在 Kotlin 环境引入注解处理器的方法如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 apply plugin: 'kotlin-kapt' kapt { arguments { arg('eventBusIndex' , 'github.leavesc.demo.MyEventBusIndex' ) } } dependencies { implementation "org.greenrobot:eventbus:3.2.0" kapt "org.greenrobot:eventbus-annotation-processor:3.2.0" }
当中,MyEventBusIndex 就是在编译阶段将生成的辅助文件, github.leavesc.demo.MyEventBusIndex
就是生成的辅助文件的包名路径,可以自定义
原始文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class MainActivity : AppCompatActivity () { override fun onCreate (savedInstanceState: Bundle ?) { super .onCreate(savedInstanceState) setContentView(R.layout.activity_main) } @Subscribe fun fun1 (msg: String ) { } @Subscribe(threadMode = ThreadMode.MAIN, priority = 100) fun fun2 (msg: String ) { } }
编译过后生成的辅助文件如下所示。可以看出,MyEventBusIndex 文件中封装了 subscriber 和其所有监听方法的签名信息,这样我们就无需在运行时再来进行解析了,而是直接在编译阶段就生成好了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class MyEventBusIndex implements SubscriberInfoIndex { private static final Map, SubscriberInfo> SUBSCRIBER_INDEX; static { SUBSCRIBER_INDEX = new HashMap , SubscriberInfo>(); putIndex(new SimpleSubscriberInfo (MainActivity.class, true , new SubscriberMethodInfo [] { new SubscriberMethodInfo ("fun1" , String.class), new SubscriberMethodInfo ("fun2" , String.class, ThreadMode.MAIN, 100 , false ), })); } private static void putIndex (SubscriberInfo info) { SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info); } public SubscriberInfo getSubscriberInfo (Class subscriberClass) { SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass); if (info != null ) { return info; } else { return null ; } } }
需要注意的是,在生成了辅助文件后,还需要通过这些类文件来初始化 EventBus
1 EventBus.builder().addIndex(MyEventBusIndex()).installDefaultEventBus()
注入的辅助文件会被保存到 SubscriberMethodFinder 类的成员变量 subscriberInfoIndexes
中, findUsingInfo
方法会先尝试从辅助文件中获取 SubscriberMethod,只有在获取不到的时候才会通过性能较低的反射操作来完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private List<SubscriberMethod> findUsingInfo (Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null ) { findState.subscriberInfo = getSubscriberInfo(findState); if (findState.subscriberInfo != null ) { SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods(); for (SubscriberMethod subscriberMethod : array) { if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); } } } else { findUsingReflectionInSingleClass(findState); } findState.moveToSuperclass(); } return getMethodsAndRelease(findState); } private SubscriberInfo getSubscriberInfo (FindState findState) { if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null ) { SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo(); if (findState.clazz == superclassInfo.getSubscriberClass()) { return superclassInfo; } } if (subscriberInfoIndexes != null ) { for (SubscriberInfoIndex index : subscriberInfoIndexes) { SubscriberInfo info = index.getSubscriberInfo(findState.clazz); if (info != null ) { return info; } } } return null ; }
使用了注解处理器后也有一定的弊端。由于 MyEventBusIndex 是通过静态常量类型的 Map 来保存所有的方法签名信息,当在初始化 EventBus 时该 Map 就同时被初始化了,这就相当于在一开始就进行了全量加载,而某些 subscriber 我们可能不会使用到,这就造成了内存浪费。而如果是通过反射来获取,那就相当于在按需加载,只有 subscriber 进行注册了才会去缓存 subscriber 带有的监听方法
一些坑 奇怪的继承关系 子类可以继承父类的 Subscribe 方法。但有一个比较奇怪的地方是:如果子类重写了父类多个 Subscribe 方法的话,就会抛出 IllegalStateException。例如,在下面的例子中。父类 BaseActivity 声明了两个 Subscribe 方法,子类 MainActivity 重写了这两个方法,此时运行后就会抛出 IllegalStateException。而如果 MainActivity 不重写或者只重写一个方法的话,就可以正常运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 open class BaseActivity : AppCompatActivity() { open fun fun1 (msg: String) { } open fun fun2 (msg: String) { } } class MainActivity : BaseActivity() { override fun onCreate (savedInstanceState: Bundle?) { super .onCreate(savedInstanceState) setContentView(R.layout.activity_main) EventBus.getDefault().register(this ) } override fun onDestroy () { super .onDestroy() EventBus.getDefault().unregister(this ) } override fun fun1 (msg: String) { } override fun fun2 (msg: String) { } }
按道理来说,如果子类重写了父类一个 Subscribe 方法都可以正常使用的话,那么重写两个也应该可以正常使用才对。可是上述例子就表现得 EventBus 好像有 bug 似的。通过定位堆栈信息,可以发现是在 FindState
的 checkAdd
方法抛出了异常
其抛出异常的步骤是这样的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 boolean checkAdd (Method method, Class<?> eventType) { Object existing = anyMethodByEventType.put(eventType, method); if (existing == null ) { return true ; } else { if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method) existing, eventType)) { throw new IllegalStateException (); } anyMethodByEventType.put(eventType, this ); } return checkAddWithMethodSignature(method, eventType); } }
EventBus 有一个 issues 也反馈了这个问题:issues ,该问题在 2018 年时就已经存在了,EventBus 的作者也只是回复说: 只在子类进行方法监听
移除黏性消息 removeStickyEvent
方法会有一个比较让人误解的点:对于通过 EventBus.getDefault().postSticky(XXX)
方法发送的黏性消息无法通过 removeStickyEvent
方法来使现有的监听者拦截该事件
例如,假设下面的两个方法都已经处于注册状态了, postSticky
后,即使在 fun1
方法中移除了黏性消息, fun2
方法也可以接收到消息。这是因为 postSticky
方法最终也是要靠调用 post
方法来完成消息发送,而 post
方法并不受 stickyEvents
的影响
1 2 3 4 5 6 7 fun fun1 (msg: String) { EventBus.getDefault().removeStickyEvent(msg) } fun fun2 (msg: String) { }
而如果 EventBus 中已经存储了黏性事件,那么在上述两个方法刚 register 时, fun1
方法就可以拦截住消息使 fun2
方法接收不到消息。这是因为 register
方法是在 for 循环中遍历 method,如果之前的方法已经移除了黏性消息的话,那么后续方法就没有黏性消息需要处理了
1 2 3 4 5 6 7 8 9 10 public void register (Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this ) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } }
总结 原理
EventBus 包含 register 和 unregister 两个方法用于标记当前 subscriber 是否需要接收消息,内部对应向 CopyOnWriteArrayList 添加和移除元素这两个操作
每当有 event 被 post 出来时,就需要根据 eventClass 对象找到所有所有声明了 @Subscribe 注解且对这种消息类型进行监听的方法,这些方法都是在 subscriber 进行 register 的时候,从 subscriber 中获取到的
从 subscriber 中获取所有监听方法的方式有两种。第一种是在运行阶段通过反射来拿到,对应的是没有配置注解处理器的情况。第二种对应的是有配置注解处理器的情况,通过在编译阶段全局扫描 @Subscribe 注解并生成辅助文件,从而在 register 的时候省去效率低下的反射操作。不管是通过什么方式进行获取,拿到所有方法后都会将 methods 按照消息类型 eventType 进行归类,方便后续遍历
每当有消息被发送出来时,就根据 event 对应的 Class 对象找到相应的监听方法,然后通过反射的方式来回调方法。外部可以在初始化 EventBus 的时候选择是否要考虑 event 的继承关系,即在 event 被 Post 出来时,对 event 的父类型进行监听的方法是否需要被回调
核心 使用 FindState 复用池来复用 FindState 对象,在各处使用了 synchronized 关键字进行代码块同步的一些优化操作。
EventBus最核心的逻辑就是利用了 subscriptionsByEventType 这个重要的列表,将订阅对象,即接收事件的方法存储在这个列表,发布事件的时候在列表中查询出相对应的方法并执行。
设计模式 EventBus的观察者模式和一般的观察者模式不同,它使用了扩展的观察者模式对事件进行订阅和分发,其实这里的扩展就是指的使用了EventBus来作为中介者,抽离了许多职责
1. 每次在register之后,为什么都必须进行一次unregister
因为register是强引用,它会让对象无法得到内存回收,导致内存泄露。所以必须在unregister方法中释放对象所占的内存。
2. EventBus2.x的版本与EventBus3.x的版本有哪些区别
EventBus2.x使用的是运行时注解,它采用了反射的方式对整个注册的类的所有方法进行扫描来完成注册,因而会对性能有一定影响。
EventBus3.x使用的是编译时注解,Java文件会编译成.class文件,再对class文件进行打包等一系列处理。在编译成.class文件时,EventBus会使用EventBusAnnotationProcessor注解处理器读取@Subscribe()注解并解析、处理其中的信息,然后生成Java类来保存所有订阅者的订阅信息。这样就创建出了对文件或类的索引关系,并将其编入到apk中。
从EventBus3.0开始使用了对象池缓存减少了创建对象的开销。
3. 除了EventBus,现在比较流行的事件总线还有RxBus,与EventBus相比
RxJava的Observable有onError、onComplete等状态回调。
Rxjava使用组合而非嵌套的方式,避免了回调地狱。
Rxjava的线程调度设计的更加优秀,更简单易用。
Rxjava可使用多种操作符来进行链式调用来实现复杂的逻辑。
Rxjava的信息效率高于EventBus2.x,低于EventBus3.x。
4. 对待新项目的事件总线选型时,该如何考量
如果项目中使用了RxJava,则使用RxBus,否则使用EventBus3.x。