淺談Spring 事件驅動機制

目錄:

1、聊這個問題之前,先說一下觀察者設計模式?

2、Spring 事件通知機制

3、Spring 事件怎樣使用?

4、怎樣實現異步到的Spring事件?

5、Spring 事件在項目中的應用

6、Spring事件的源碼分析

7、總結

聊這個問題之前,先說一下觀察者設計模式?

為什麼我們聊Spring事件機制需要了解 觀察者模式呢?

因為Spring 中的事件驅動也叫 發佈訂閱模式,是觀察者模式的一個典型的應用。觀察者模式是事件驅動模型在設計層面上的體現。

1、定義

定義對象的一種一對多的依賴關係。當一個對象的狀態發生變化時,所有依賴於它的對象得到通知並被自動更新

觀察者模式主要用於關聯行為之間建立一套 觸發機制的場景。

觀察者模式在生活中應用的也比較廣泛。比如 微信朋友圈通知、郵件通知、訂閱報紙等。

觀察者模式把多個訂閱者稱為觀察者:Observer,多個觀察者觀察的 對象被稱為目標:Subject

2、觀察者模式的結構和說明


淺談Spring 事件驅動機制

  • Subject:目標對象,具有以下功能

(1) 一個目標可以被多個觀察者觀察

(2) 目標提供觀察者的註冊和退訂的維護

(3) 當目標發生變化時,目標負責通知所有註冊的觀察者

  • Observver: 定義觀察者的接口,提供目標通知時對應的變更方法,這個變更方法進行相關的業務處理,可以在這個方法裡回調目標對象,以獲取目標對象的數據。
  • ConcreteSubject: 具體的目標實現對象,用來維護目標狀態,當目標對象的狀態發生改變時,通過所有註冊的、有效的觀察者,讓觀察者執行相應的處理。
  • ConcreteObserver: 觀察者的具體實現對象,用來接口目標的通知,並進行相應的後續處理

3、觀察者模式代碼的實現

(1) 定義目標對象

<code>/**
* 目標對象,它知道觀察它的觀察者,並提供註冊和刪除觀察者的接口

*/
public class Subject {
/**
* 用來保存註冊的觀察者對象
*/
private List<observer> observers = new ArrayList<>();
/**
* 註冊觀察者對象
* @param observer
*/
public void add(Observer observer){
observers.add(observer);
}
/**
* 刪除觀察者對象
* @param observer
*/
public void remove(Observer observer){
observers.remove(observer);
}
/**
* 通知所有註冊的觀察者
*/
protected void notifyObservers(){
observers.forEach(observer -> {
observer.update(this);
});
}
}/<observer>/<code>

(2) 具體目標對象的,代碼如下

<code>/**
* 具體的目標對象
*/
public class ConcreteSubject extends Subject {
private String state;

public String getState( ) {
return state;
}

public void setState( String state ) {
this.state = state;
//狀態發生變更,通知各個觀察者對象

this.notifyObservers();
}
}/<code>

(3) 觀察者的接口定義,代碼如下

<code>/**
* 觀察者接口,定義一個更新的接口
*/
public interface Observer {
/**
* 更新的接口
* @param subject 傳入目標對象,方便獲取相應的目標對象的狀態
*/
public void update(Subject subject);
}/<code>

(4) 觀察者的實現類

<code>/**
* 具體觀察者對象,實現更新的方法,使自身的狀態和目標的狀態一致
*/
public class ConcreteObserver implements Observer{
private String state;
/**
* 更新的接口
* @param subject 傳入目標對象,方便獲取相應的目標對象的狀態
*/
@Override
public void update( Subject subject ) {
state = ((ConcreteSubject)subject).getState();
}
}/<code>

Spring 事件通知機制

(1) 事件驅動的核心主要有以下幾個元素:事件源、事件監聽器、事件


淺談Spring 事件驅動機制

  • 事件源:負責產生事件 的對象
  • 事件監聽器:負責處理事件的對象
  • 事件:處理事件的對象,是事件源和監聽器之間的橋樑,是整個事件模型驅動的核心。

(2) Spring 事件涉及了三個部分

  • ApplicationEvent :表示事件本身,自定義事件需要繼承該類
  • ApplicationListener: 事件監聽器,需要實現ApplicationListener接口
  • ApplicationContext :實現了ApplicationEventPublisher 接口,具備發佈事件的功能

(3) ApplicationEventMulticaster 事件廣播器

廣播器是事件模型中的事件收集器,用於維護髮送者和接收者之間的關係。

具備以下功能:

① 事件多播器,負責把發佈的事件廣播給所有的監聽器。

② 維護監聽器的註冊、刪除。

③ 模式實現為 SimpleApplicationEventMulticaster

④ 子類實現了對事件監聽器的註冊、刪除和廣播事件,可以實現同步和異步的調用

(4) Spring 提供了一些標準的事件如下

① ContextRefreshedEvent:當容器其被實例化或者refresh的時候發佈

② ContextStartedEvent:當容器啟動時發佈,即調用start方法


③ ContextStoppedEvent: 當容器 停止時發佈,即調用stop方法

④ ContextClosedEvent:當容器關閉時發佈,即調用close方法。

Spring 事件怎樣使用?

我們模擬一個場景:用戶註冊成功後發送短信和郵件:

1、定義事件

<code>/**
* 用戶註冊事件
*/
public class RegistryEvent extends ApplicationEvent {
private String userName;
public RegistryEvent( Object source,String userName){
super(source);
this.userName = userName;
}
public String getUserName( ) {
return userName;
}
}/<code>

2、定義註冊監聽器

<code>/**
* 註冊的事件監聽器
*/
@Component
public class RegistryListener implements ApplicationListener<registryevent> {
@Override
public void onApplicationEvent( RegistryEvent event ) {
System.out.println("恭喜你["+event.getUserName()+"],註冊成功!");
System.out.println("發送短信驗證碼!");
System.out.println("發送郵件.......");
}
}/<registryevent>/<code>

3、註冊成功後,發佈事件

<code>   public static void main(String[] args) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(MainConfig.class);
//註冊邏輯,註冊成功後,發佈註冊事件
ctx.publishEvent(new RegistryEvent("註冊用戶","張三"));
}/<code>

怎樣實現異步到的Spring事件?

我們先從源碼上Spring 事件同步和異步的支持。

事件的發佈是依靠 ApplicationEventPublisher 接口的publishEvent方法進行發佈事件的,而publicEvent中又是通過 調用 ApplicationEventMulticaster 的 multicastEvent 進行事件廣播的。

ApplicationEventMulticaster 中保存了所有實現了ApplicaitonListener 接口的監聽器,ApplicationEventMulticaster的實現類 SimpleApplicationEventMulticaster 的源碼如下:

<code>public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
\tprivate Executor taskExecutor;
\tprivate ErrorHandler errorHandler;

\tpublic SimpleApplicationEventMulticaster() {
\t}

\tpublic SimpleApplicationEventMulticaster(BeanFactory beanFactory) {
\t\tsetBeanFactory(beanFactory);
\t}

\tpublic void setTaskExecutor(Executor taskExecutor) {
\t\tthis.taskExecutor = taskExecutor;
\t}

\tprotected Executor getTaskExecutor() {
\t\treturn this.taskExecutor;
\t}
\tpublic void setErrorHandler(ErrorHandler errorHandler) {
\t\tthis.errorHandler = errorHandler;
\t}

\t@Override
\tpublic void multicastEvent(ApplicationEvent event) {
\t\tmulticastEvent(event, resolveDefaultEventType(event));
\t}

\t@Override
\tpublic void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
\t\tResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
\t\tfor (final ApplicationListener> listener : getApplicationListeners(event, type)) {
\t\t\tExecutor executor = getTaskExecutor();
\t\t\tif (executor != null) {
\t\t\t\texecutor.execute(new Runnable() {
\t\t\t\t\t@Override
\t\t\t\t\tpublic void run() {
\t\t\t\t\t\tinvokeListener(listener, event);
\t\t\t\t\t}
\t\t\t\t});
\t\t\t}
\t\t\telse {
\t\t\t\tinvokeListener(listener, event);
\t\t\t}
\t\t}
\t}


}
/<code>

從上面可以看出,異步通知的時間主要依靠 Executor去實現了,如果這個變量不配置,則默認使用同步,否則就是異步;


淺談Spring 事件驅動機制

從上面調試的代碼看,executor為null,所以會執行同步。

那麼怎麼讓廣播事件的時候異步執行呢?有以下兩種方法:

① 直接繼承 SimpleApplicationEventMulticaster ,然後添加 taskExecutor異步執行器

<code>@Component(value = "applicationEventMulticaster")
public class YjMulticaster extends SimpleApplicationEventMulticaster {
public YjMulticaster() {
System.out.println("執行了...............");

setTaskExecutor(Executors.newSingleThreadExecutor());
}
}/<code>

有人可能要問,為什麼按照上面的寫法就可以異步了呢?我們還得從源碼說起?

下面我們看下怎樣獲取 ApplicationEventMulticaster ?

淺談Spring 事件驅動機制

有的人要問,在1處,為什麼有實例化好的bean.因為我們繼承了 SimpleApplicationEventMulticaster,所以會提前實例化。

還有一些Spring 容器提前進行實例化的

淺談Spring 事件驅動機制

② 實現BeanPostProcessor 接口,對 SimpleApplicationEventMulticaster bean進行攔截修改

<code>@Component
public class MultiCasterBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof SimpleApplicationEventMulticaster) {
System.out.println("我攔截到了");
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster= (SimpleApplicationEventMulticaster) bean;
simpleApplicationEventMulticaster.setTaskExecutor(Executors.newSingleThreadExecutor());
return bean;
}
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}

}
/<code>

BeanPostProcessor 執行時機實在Bean的實例化之後執行的,可以瞭解一下Spring的生命週期。

淺談Spring 事件驅動機制

③ 如果既要同步也要異步呢?

步驟如下:

  • 定義枚舉
<code>public enum  EventTypeEnum {
ASYNC, //異步
SYNC; //同步

}   /<code>
  • 定義註解類
<code>EventType    /<code>
  • 在監聽器上添加註解EventType
<code>/**
* 註冊的事件監聽器
*/
@Component
public class RegistryListener implements ApplicationListener<registryevent> {
@EventType(EventTypeEnum.ASYNC)
@Override
public void onApplicationEvent( RegistryEvent event ) {
System.out.println("恭喜你["+event.getUserName()+"],註冊成功!");
System.out.println("發送短信驗證碼!");
System.out.println("發送郵件.......");
}
}  /<registryevent>/<code>
  • 實現自定義多播器,繼承 SimpleApplicationEventMulticaster 類
  • 添加註解 @Component(value = "applicationEventMulticaster")
<code>@Component(value = "applicationEventMulticaster")
public class YjApplicationEventMulticaster extends SimpleApplicationEventMulticaster {
@Override
public void multicastEvent( final ApplicationEvent event) {
EventTypeEnum defaultEventType = EventTypeEnum.ASYNC;
for (final ApplicationListener> listener : getApplicationListeners()) {
Class> cls = listener.getClass();
try {
Method onApplicationEventMethod = cls.getMethod("onApplicationEvent",ApplicationEvent.class);
if(onApplicationEventMethod.isAnnotationPresent(EventType.class)){
EventType eventMethodAnnotation = onApplicationEventMethod.getAnnotation(EventType.class);
defaultEventType = eventMethodAnnotation.value();
}

if(defaultEventType == EventTypeEnum.ASYNC){
setTaskExecutor(Executors.newCachedThreadPool());
}
//找到
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeListener(listener, event);
}
});
}
else {
invokeListener(listener, event);
}
}catch (Exception e){
}

}
}
}/<code>

Spring 事件在項目中的應用

場景:在管理員修改權限的時候,要求將已經被修改的用戶的會話剔除

1、定義事件對象

<code>public class RoleChangeEvent extends ApplicationEvent {
private final Long userId;
public RoleChangeEvent( Object source, Long userId ) {
super(source);
this.userId = userId;
}
public Long getUserId( ) {
return userId;
}
}/<code>

2、定義監聽器

<code>public class RoleChangeListener {
@Autowired
private SessionRegistry sessionRegistry;

@Autowired
private ISysUserRoleService sysUserRoleService;

@EventListener(RoleChangeEvent.class)
@Async
public void permissionListenerPostProcess(RoleChangeEvent event){
Long userId = event.getUserId();
log.debug("receive permission changed event , eventClassName : [{}] ,userId : [{}]", event.getClass().getName(), userId);
//獲取在線用戶列表
List<activeuserdetails> onLineUserList = getOnlineUserList();
onLineUserList.stream().filter(userDetails -> userDetails.getUserId().equals(userId))
.forEach(userDetails -> {
//session剔除
sessionRegistry.getAllSessions(userDetails, false).forEach(SessionInformation::expireNow);
});

}
/**
* 獲取在線的userDetails列表
* @return
*/
private List<activeuserdetails> getOnlineUserList(){
return sessionRegistry.getAllPrincipals().stream().filter(p->p instanceof ActiveUserDetails)
.map(p-> ((ActiveUserDetails) p))
.collect(Collectors.toList());
}
}/<activeuserdetails>/<activeuserdetails>/<code>

Spring事件的源碼分析

1.org.springframework.context.support.AbstractApplicationContext#refresh

<code>\t\t// 初始化多播器
\t\t\t\tinitApplicationEventMulticaster();

\t\t\t\t// Initialize other special beans in specific context subclasses.
\t\t\t\tonRefresh();

\t\t\t\t// 註冊監聽器
\t\t\t\tregisterListeners();/<code>

① 多播器的初始化

org.springframework.context.support.AbstractApplicationContext#initApplicationEventMulticaster(初始化事件多播器)

<code>protected void initApplicationEventMulticaster() {
\t\tConfigurableListableBeanFactory beanFactory = getBeanFactory();
\t\t/判斷IOC容器中包含applicationEventMulticaster 事件多播器的Bean的name
\t\tif (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
\t\t /創建一個applicationEventMulticaster的bean放在IOC 容器中,bean的name 為applicationEventMulticaster
\t\t\tthis.applicationEventMulticaster =beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
\t\t\tif (logger.isDebugEnabled()) {
\t\t\t\tlogger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
\t\t\t}
\t\t}
\t\t/容器中不包含一個beanName 為applicationEventMulticaster的多播器組件
\t\telse {
\t\t //創建一個SimpleApplicationEventMulticaster 多播器
\t\t\tthis.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
\t\t\t//註冊到容器中
\t\t\tbeanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
\t\t\tif (logger.isDebugEnabled()) {
\t\t\t\tlogger.debug("Unable to locate ApplicationEventMulticaster with name '" +
\t\t\t\t\t\tAPPLICATION_EVENT_MULTICASTER_BEAN_NAME +
\t\t\t\t\t\t"': using default [" + this.applicationEventMulticaster + "]");
\t\t\t}
\t\t}
\t}​/<code>

把容器中的監聽器註冊到多播器上去 源碼解析

<code>protected void registerListeners() {
\t\t//把系統自帶的將挺起
\t\tfor (ApplicationListener> listener : getApplicationListeners()) {
\t\t\tgetApplicationEventMulticaster().addApplicationListener(listener);
\t\t}

//我們自己實現了ApplicationListener 的組件
\t\tString[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
\t\tfor (String listenerBeanName : listenerBeanNames) {

\t\t\tgetApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
\t\t}

\t //在這裡之前,我們早期想發佈的事件 由於沒有多播器沒有發佈,在這裡我們總算有了自己的多播器,可以在這裡發佈早期堆積的事件了.
\t\tSet<applicationevent> earlyEventsToProcess = this.earlyApplicationEvents;
\t\tthis.earlyApplicationEvents = null;
\t\tif (earlyEventsToProcess != null) {
\t\t\tfor (ApplicationEvent earlyEvent : earlyEventsToProcess) {
\t\t\t\tgetApplicationEventMulticaster().multicastEvent(earlyEvent);
\t\t\t}
\t\t}
\t}/<applicationevent>/<code>

③ 如何發佈

<code>public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
\t\tResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
\t\t//獲取到所有的監聽器
\t\tfor (final ApplicationListener> listener : getApplicationListeners(event, type)) {
\t\t //看spring 容器中是否支持線程池 異步發送事件
\t\t\tExecutor executor = getTaskExecutor();
\t\t\tif (executor != null) {
\t\t\t\texecutor.execute(new Runnable() {
\t\t\t\t\t@Override
\t\t\t\t\tpublic void run() {
\t\t\t\t\t
\t\t\t\t\t\tinvokeListener(listener, event);
\t\t\t\t\t}
\t\t\t\t});
\t\t\t}
\t\t\telse { //同步發送事件
\t\t\t\tinvokeListener(listener, event);
\t\t\t}
\t\t}
\t}\t
\t
\t
\tprivate void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
\t\ttry {
\t\t //調用對於listener的onApplicationEvent事件

\t\t\tlistener.onApplicationEvent(event);
\t\t}
\t\tcatch (ClassCastException ex) {
\t\t\tString msg = ex.getMessage();
\t\t\tif (msg == null || matchesClassCastMessage(msg, event.getClass())) {
\t\t\t\t// Possibly a lambda-defined listener which we could not resolve the generic event type for
\t\t\t\t// -> let's suppress the exception and just log a debug message.
\t\t\t\tLog logger = LogFactory.getLog(getClass());
\t\t\t\tif (logger.isDebugEnabled()) {
\t\t\t\t\tlogger.debug("Non-matching event type for listener: " + listener, ex);
\t\t\t\t}
\t\t\t}
\t\t\telse {
\t\t\t\tthrow ex;
\t\t\t}
\t\t}
\t}/<code>

總結

  1. Spring 事件 驅動模型的底層是通過 觀察者設計模式實現的。
  2. 通過ApplicationEvent 抽象類 和 ApplicationListener接口,可以實現Application 事件 處理
  3. 監聽器在處理Event的時候,需要使用instanceof關鍵字判斷是是否是自己需要的
  4. 廣播的時候可以同步也可以異步執行

本文由碼農的一天撰寫,如果你認同我的觀點的話,可以點贊加關注一下;如果你對本篇文章有其他見解的話,也歡迎在下方的評論區瀏覽討論!


分享到:


相關文章: