private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder(); private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; private final Map<Object, List<Class<?>>> typesBySubscriber; private final Map<Class<?>, Object> stickyEvents;
没看懂
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() { @Override protected PostingThreadState initialValue() { return new PostingThreadState(); } };
/** For ThreadLocal, much faster to set (and get multiple values). */ final static class PostingThreadState { final List<Object> eventQueue = new ArrayList<>(); boolean isPosting; boolean isMainThread; Subscription subscription; Object event; boolean canceled; }
五种线程模型,主线程,后台线程,异步线程,和当前线程一样,有顺序的
1 2 3 4 5 6
// @Nullable private final MainThreadSupport mainThreadSupport; // @Nullable private final Poster mainThreadPoster; private final BackgroundPoster backgroundPoster; private final AsyncPoster asyncPoster;
一个是订阅发现者,主要是为了获取订阅的数据,另外就是线程池
1 2
private final SubscriberMethodFinder subscriberMethodFinder; private final ExecutorService executorService;
一些final变量
1 2 3 4 5 6 7 8 9
private final boolean throwSubscriberException; private final boolean logSubscriberExceptions; private final boolean logNoSubscriberMessages; private final boolean sendSubscriberExceptionEvent; private final boolean sendNoSubscriberEvent; private final boolean eventInheritance;
private final int indexCount; private final Logger logger;
/** * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a * central bus, consider {@link #getDefault()}. */ public EventBus() { this(DEFAULT_BUILDER); }
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) { super(looper); this.eventBus = eventBus; this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); }
public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } }
@Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } } }
interface Poster {
/** * Enqueue an event to be posted for a particular subscription. * * @param subscription Subscription which will receive the event. * @param event Event that will be posted to subscribers. */ void enqueue(Subscription subscription, Object event); }
/** * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they * are no longer interested in receiving events. * <p/> * Subscribers have event handling methods that must be annotated by {@link Subscribe}. * The {@link Subscribe} annotation also allows configuration like {@link * ThreadMode} and priority. */ public void register(Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } }
if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null) { findUsingReflectionInSingleClass(findState); findState.moveToSuperclass(); } return getMethodsAndRelease(findState); } // private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; try { // This is faster than getMethods, especially when subscribers are fat classes like Activities methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149 methods = findState.clazz.getMethods(); findState.skipSuperClasses = true; } // 对于每一个方法,获取信息,首先 // 校验修饰符,过滤掉非 Public 的方法 // 以及 abstract,static,bridge or synthetic methods, for (Method method : methods) { int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { // 参数类型只能为一个 Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) { // 获取注解 Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { // 参数类型 Class<?> eventType = parameterTypes[0]; if (findState.checkAdd(method, eventType)) { // 注解的值 ThreadMode threadMode = subscribeAnnotation.threadMode(); // 添加到findState findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); } } }
// 初始化 FindState private FindState prepareFindState() { synchronized (FIND_STATE_POOL) { for (int i = 0; i < POOL_SIZE; i++) { FindState state = FIND_STATE_POOL[i]; if (state != null) { FIND_STATE_POOL[i] = null; return state; } } } return new FindState(); }
static class FindState { final List<SubscriberMethod> subscriberMethods = new ArrayList<>(); final Map<Class, Object> anyMethodByEventType = new HashMap<>(); final Map<String, Class> subscriberClassByMethodKey = new HashMap<>(); final StringBuilder methodKeyBuilder = new StringBuilder(128);
boolean checkAdd(Method method, Class<?> eventType) { // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required. // Usually a subscriber doesn't have methods listening to the same event type. Object existing = anyMethodByEventType.put(eventType, method); if (existing == null) { return true; } else { if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method) existing, eventType)) { // Paranoia check throw new IllegalStateException(); } // Put any non-Method object to "consume" the existing Method anyMethodByEventType.put(eventType, this); } return checkAddWithMethodSignature(method, eventType); } }
String methodKey = methodKeyBuilder.toString(); Class<?> methodClass = method.getDeclaringClass(); Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass); if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) { // Only add if not already found in a sub class return true; } else { // Revert the put, old class is further down the class hierarchy subscriberClassByMethodKey.put(methodKey, methodClassOld); return false; } }
void moveToSuperclass() { if (skipSuperClasses) { clazz = null; } else { clazz = clazz.getSuperclass(); String clazzName = clazz.getName(); /** Skip system classes, this just degrades performance. */ if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) { clazz = null; } } } }
/** * If true, delivers the most recent sticky event (posted with * {@link EventBus#postSticky(Object)}) to this subscriber (if event available). */ boolean sticky() default false;
/** Subscriber priority to influence the order of event delivery. * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of * delivery among subscribers with different {@link ThreadMode}s! */ int priority() default 0; }
// Must be called in synchronized block private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); // 首次为空 if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } // 会根据优先级加入到订阅的列表中 int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } }
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } // eventType 就是我们自定义的MeesageEvent,用于传递消息的 subscribedEvents.add(eventType);
if (subscriberMethod.sticky) { if (eventInheritance) { // Existing sticky events of all subclasses of eventType have to be considered. // Note: Iterating over all events may be inefficient with lots of sticky events, // thus data structure should be changed to allow a more efficient lookup // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>). Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }
/** Posts the given event to the event bus. */ public void post(Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event);
if (!postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; } } }
/** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */ private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); if (eventTypes == null) { eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } }
/** Recurses through super interfaces. */ static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } }