每日一博丨深度解析 EventBus 事件總線原理

每日一博丨深度解析 EventBus 事件總線原理

關注開源中國OSC頭條號,獲取最新技術資訊

一、問題描述

在工作中,經常會遇見使用異步的方式來發送事件,或者觸發另外一個動作:經常用到的框架是MQ(分佈式方式通知)。如果是同一個jvm裡面通知的話,就可以使用EventBus。由於EventBus使用起來簡單、便捷,因此,工作中會經常用到。深入理解該框架的原理就很有必要。

二、框架解析

2.1、組織結構

eventbus的組織結構如下:

每日一博丨深度解析 EventBus 事件總線原理

eventbus主要有以下幾部分組成:

1、eventbus、asyncEventBus:事件發送器。

2、event:事件承載單元。

3、SubscriberRegistry:訂閱者註冊器,將訂閱者註冊到event上,即將有註解Subscribe的方法和event綁定起來。

4、Dispatcher:事件分發器,將事件的訂閱者調用來執行。

5、Subscriber、SynchronizedSubscriber:訂閱者,併發訂閱還是同步訂閱。

2.2、運行原理

1、eventbus是基於註冊監聽的方式來運行的,因此,首先需要將eventbus,然後才會有事件及監聽者。新建eventbus或者AsyncEventBus的方式如下:

 EventBus eventBus = new EventBus();

或者

 BlockingQueue workQueue = new LinkedBlockingQueue<>(20);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20,
30, TimeUnit.SECONDS, workQueue);
AsyncEventBus asyncEventBus = new AsyncEventBus(executor);

2、註冊監聽者。

eventBus.register(eventListener); 

底層就是將類eventListener中所有註解有Subscribe的方法與其Event對放在一個map中(一個event可以對應多個Subscribe的方法)。實現如下:

 void register(Object listener) {
Multimap, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry, Collection> entry : listenerMethods.asMap().entrySet()) {
Class> eventType = entry.getKey();
Collection eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}

3、事件發送:執行指定事件類型的訂閱者(包含了method),從訂閱者中獲取指定事件的訂閱者,然後按照規則(同步、異步)執行指定的方法。

 public void post(Object event) {
Iterator eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {

// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}

上述代碼說明,如果事件沒有監聽者,就當作死亡事件來對待。

 /** Dispatches {@code event} to this subscriber using the proper executor. */
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}

這裡就說明,最後就是被訂閱的方法被調用。

4、EventBus與AsyncEventBus的區別

從字面上看,AsyncEventBus是異步的EventBus,那麼EventBus應該就是同步的了。EventBus的executor為MoreExecutors.directExecutor(),其實現如下:

 public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
}
/** See {@link #directExecutor} for behavioral notes. */
private enum DirectExecutor implements Executor {
INSTANCE;
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}

其execute方法直接執行線程的run方法,即同步調用run方法執行。EventBus的dispatcher為PerThreadQueuedDispatcher。其dispatch方法如下:

 @Override
void dispatch(Object event, Iterator subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}

dispatchEvent的實現如下:

 final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}

因此,整個執行過程如下:

每日一博丨深度解析 EventBus 事件總線原理

整個過程都是同步方式執行,因此,EventBus是同步的。

AsyncEventBus的dispatcher為LegacyAsyncDispatcher,executor為自己指定的線程池。運行流程如下:

每日一博丨深度解析 EventBus 事件總線原理

虛線為線程池異步調度,因此,AsyncEventBus為異步方式。

5、AllowConcurrentEvents的作用

它所在的代碼為:

 static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)

? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
private static boolean isDeclaredThreadSafe(Method method) {
return method.getAnnotation(AllowConcurrentEvents.class) != null;
}

即如果訂閱者方法上有註解AllowConcurrentEvents,則返回Subscriber,否則,返回SynchronizedSubscriber。SynchronizedSubscriber的字面意思為同步訂閱者,它的實現代碼為:

 @Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
synchronized (this) {
super.invokeSubscriberMethod(event);
}
}

即沒有使用註解AllowConcurrentEvents的訂閱者,在併發環境中,都是串行執行。這在高併發環境中,會嚴重影響性能。

三、使用案例

3.1、eventbus定義

@Configuration
public class ConfigBean {
@Bean
public EventBus executorService() {
BlockingQueue workQueue = new LinkedBlockingQueue<>(20);
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20,
30, TimeUnit.SECONDS, workQueue);
return new AsyncEventBus(executor);
}
}

3.2、註冊與事件發送

@Service
public class TestService implements InitializingBean {
@Autowired
private EventListener eventListener ;
@Autowired
private EventBus eventBus ;
public void postEvent(){
eventBus.post(new LoginEvent("iwill","123456"));
}
@Override
public void afterPropertiesSet() throws Exception {
eventBus.register(eventListener);
}
}

3.3、訂閱者定義

package com.iwill.eventBus.listener;
import com.google.common.eventbus.Subscribe;
import com.iwill.eventBus.event.LoginEvent;
import com.iwill.eventBus.event.RegisterEvent;
import org.springframework.stereotype.Component;
@Component
public class EventListener {
@Subscribe
public void subscribeLoginEvent1(LoginEvent event){
System.out.println("method 1 : receive login event ");
}
@Subscribe
public void subscribeLoginEvent2(LoginEvent event){
System.out.println("method 2 : receive login event ");
}
@Subscribe
public void subscribeRegisterEvent(RegisterEvent event){
try{
Thread.sleep(10000L);
}catch (Exception exp){
exp.printStackTrace();
}
System.out.println("method : receive register event ");
}
}

四、注意事項

1、在高併發的環境下使用AsyncEventBus時,發送事件可能會出現異常,因為它使用的線程池,當線程池的線程不夠用時,會拒絕接收任務,就會執行線程池的拒絕策略,如果需要關注是否提交事件成功,就需要將線程池的拒絕策略設為拋出異常,並且try-catch來捕獲異常。如下:

 try {
eventBus.post(new LoginEvent("iwill", "123456"));
}catch (Exception exp){
//TODO 落表或者其他處理
}

2、本文用到的guava版本如下:

 
com.google.guava
guava
26.0-jre

每日一博欄目,每日為你推薦優秀博主的優質技術文章。同時歡迎用戶投稿,文章一旦被官方賬號收錄,我們會在網站首頁等位置進行推薦哦。關注開源中國OSC每日獲取優質推送,點擊“瞭解更多”閱讀原文章。


分享到:


相關文章: