浅谈Spring 事件驱动机制

目录:

1、聊这个问题之前,先说一下观察者设计模式?

2、Spring 事件通知机制

3、Spring 事件怎样使用?

4、怎样实现异步到的Spring事件?

5、Spring 事件在项目中的应用

6、Spring事件的源码分析

7、总结

聊这个问题之前,先说一下观察者设计模式?

为什么我们聊Spring事件机制需要了解 观察者模式呢?

因为Spring 中的事件驱动也叫 发布订阅模式,是观察者模式的一个典型的应用。观察者模式是事件驱动模型在设计层面上的体现。

1、定义

定义对象的一种一对多的依赖关系。当一个对象的状态发生变化时,所有依赖于它的对象得到通知并被自动更新

观察者模式主要用于关联行为之间建立一套 触发机制的场景。

观察者模式在生活中应用的也比较广泛。比如 微信朋友圈通知、邮件通知、订阅报纸等。

观察者模式把多个订阅者称为观察者:Observer,多个观察者观察的 对象被称为目标:Subject

2、观察者模式的结构和说明


浅谈Spring 事件驱动机制

  • Subject:目标对象,具有以下功能

(1) 一个目标可以被多个观察者观察

(2) 目标提供观察者的注册和退订的维护

(3) 当目标发生变化时,目标负责通知所有注册的观察者

  • Observver: 定义观察者的接口,提供目标通知时对应的变更方法,这个变更方法进行相关的业务处理,可以在这个方法里回调目标对象,以获取目标对象的数据。
  • ConcreteSubject: 具体的目标实现对象,用来维护目标状态,当目标对象的状态发生改变时,通过所有注册的、有效的观察者,让观察者执行相应的处理。
  • ConcreteObserver: 观察者的具体实现对象,用来接口目标的通知,并进行相应的后续处理

3、观察者模式代码的实现

(1) 定义目标对象

<code>/**
* 目标对象,它知道观察它的观察者,并提供注册和删除观察者的接口

*/
public class Subject {
/**
* 用来保存注册的观察者对象
*/
private List<observer> observers = new ArrayList<>();
/**
* 注册观察者对象
* @param observer
*/
public void add(Observer observer){
observers.add(observer);
}
/**
* 删除观察者对象
* @param observer
*/
public void remove(Observer observer){
observers.remove(observer);
}
/**
* 通知所有注册的观察者
*/
protected void notifyObservers(){
observers.forEach(observer -> {
observer.update(this);
});
}
}/<observer>/<code>

(2) 具体目标对象的,代码如下

<code>/**
* 具体的目标对象
*/
public class ConcreteSubject extends Subject {
private String state;

public String getState( ) {
return state;
}

public void setState( String state ) {
this.state = state;
//状态发生变更,通知各个观察者对象

this.notifyObservers();
}
}/<code>

(3) 观察者的接口定义,代码如下

<code>/**
* 观察者接口,定义一个更新的接口
*/
public interface Observer {
/**
* 更新的接口
* @param subject 传入目标对象,方便获取相应的目标对象的状态
*/
public void update(Subject subject);
}/<code>

(4) 观察者的实现类

<code>/**
* 具体观察者对象,实现更新的方法,使自身的状态和目标的状态一致
*/
public class ConcreteObserver implements Observer{
private String state;
/**
* 更新的接口
* @param subject 传入目标对象,方便获取相应的目标对象的状态
*/
@Override
public void update( Subject subject ) {
state = ((ConcreteSubject)subject).getState();
}
}/<code>

Spring 事件通知机制

(1) 事件驱动的核心主要有以下几个元素:事件源、事件监听器、事件


浅谈Spring 事件驱动机制

  • 事件源:负责产生事件 的对象
  • 事件监听器:负责处理事件的对象
  • 事件:处理事件的对象,是事件源和监听器之间的桥梁,是整个事件模型驱动的核心。

(2) Spring 事件涉及了三个部分

  • ApplicationEvent :表示事件本身,自定义事件需要继承该类
  • ApplicationListener: 事件监听器,需要实现ApplicationListener接口
  • ApplicationContext :实现了ApplicationEventPublisher 接口,具备发布事件的功能

(3) ApplicationEventMulticaster 事件广播器

广播器是事件模型中的事件收集器,用于维护发送者和接收者之间的关系。

具备以下功能:

① 事件多播器,负责把发布的事件广播给所有的监听器。

② 维护监听器的注册、删除。

③ 模式实现为 SimpleApplicationEventMulticaster

④ 子类实现了对事件监听器的注册、删除和广播事件,可以实现同步和异步的调用

(4) Spring 提供了一些标准的事件如下

① ContextRefreshedEvent:当容器其被实例化或者refresh的时候发布

② ContextStartedEvent:当容器启动时发布,即调用start方法


③ ContextStoppedEvent: 当容器 停止时发布,即调用stop方法

④ ContextClosedEvent:当容器关闭时发布,即调用close方法。

Spring 事件怎样使用?

我们模拟一个场景:用户注册成功后发送短信和邮件:

1、定义事件

<code>/**
* 用户注册事件
*/
public class RegistryEvent extends ApplicationEvent {
private String userName;
public RegistryEvent( Object source,String userName){
super(source);
this.userName = userName;
}
public String getUserName( ) {
return userName;
}
}/<code>

2、定义注册监听器

<code>/**
* 注册的事件监听器
*/
@Component
public class RegistryListener implements ApplicationListener<registryevent> {
@Override
public void onApplicationEvent( RegistryEvent event ) {
System.out.println("恭喜你["+event.getUserName()+"],注册成功!");
System.out.println("发送短信验证码!");
System.out.println("发送邮件.......");
}
}/<registryevent>/<code>

3、注册成功后,发布事件

<code>   public static void main(String[] args) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(MainConfig.class);
//注册逻辑,注册成功后,发布注册事件
ctx.publishEvent(new RegistryEvent("注册用户","张三"));
}/<code>

怎样实现异步到的Spring事件?

我们先从源码上Spring 事件同步和异步的支持。

事件的发布是依靠 ApplicationEventPublisher 接口的publishEvent方法进行发布事件的,而publicEvent中又是通过 调用 ApplicationEventMulticaster 的 multicastEvent 进行事件广播的。

ApplicationEventMulticaster 中保存了所有实现了ApplicaitonListener 接口的监听器,ApplicationEventMulticaster的实现类 SimpleApplicationEventMulticaster 的源码如下:

<code>public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
\tprivate Executor taskExecutor;
\tprivate ErrorHandler errorHandler;

\tpublic SimpleApplicationEventMulticaster() {
\t}

\tpublic SimpleApplicationEventMulticaster(BeanFactory beanFactory) {
\t\tsetBeanFactory(beanFactory);
\t}

\tpublic void setTaskExecutor(Executor taskExecutor) {
\t\tthis.taskExecutor = taskExecutor;
\t}

\tprotected Executor getTaskExecutor() {
\t\treturn this.taskExecutor;
\t}
\tpublic void setErrorHandler(ErrorHandler errorHandler) {
\t\tthis.errorHandler = errorHandler;
\t}

\t@Override
\tpublic void multicastEvent(ApplicationEvent event) {
\t\tmulticastEvent(event, resolveDefaultEventType(event));
\t}

\t@Override
\tpublic void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
\t\tResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
\t\tfor (final ApplicationListener> listener : getApplicationListeners(event, type)) {
\t\t\tExecutor executor = getTaskExecutor();
\t\t\tif (executor != null) {
\t\t\t\texecutor.execute(new Runnable() {
\t\t\t\t\t@Override
\t\t\t\t\tpublic void run() {
\t\t\t\t\t\tinvokeListener(listener, event);
\t\t\t\t\t}
\t\t\t\t});
\t\t\t}
\t\t\telse {
\t\t\t\tinvokeListener(listener, event);
\t\t\t}
\t\t}
\t}


}
/<code>

从上面可以看出,异步通知的时间主要依靠 Executor去实现了,如果这个变量不配置,则默认使用同步,否则就是异步;


浅谈Spring 事件驱动机制

从上面调试的代码看,executor为null,所以会执行同步。

那么怎么让广播事件的时候异步执行呢?有以下两种方法:

① 直接继承 SimpleApplicationEventMulticaster ,然后添加 taskExecutor异步执行器

<code>@Component(value = "applicationEventMulticaster")
public class YjMulticaster extends SimpleApplicationEventMulticaster {
public YjMulticaster() {
System.out.println("执行了...............");

setTaskExecutor(Executors.newSingleThreadExecutor());
}
}/<code>

有人可能要问,为什么按照上面的写法就可以异步了呢?我们还得从源码说起?

下面我们看下怎样获取 ApplicationEventMulticaster ?

浅谈Spring 事件驱动机制

有的人要问,在1处,为什么有实例化好的bean.因为我们继承了 SimpleApplicationEventMulticaster,所以会提前实例化。

还有一些Spring 容器提前进行实例化的

浅谈Spring 事件驱动机制

② 实现BeanPostProcessor 接口,对 SimpleApplicationEventMulticaster bean进行拦截修改

<code>@Component
public class MultiCasterBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof SimpleApplicationEventMulticaster) {
System.out.println("我拦截到了");
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster= (SimpleApplicationEventMulticaster) bean;
simpleApplicationEventMulticaster.setTaskExecutor(Executors.newSingleThreadExecutor());
return bean;
}
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}

}
/<code>

BeanPostProcessor 执行时机实在Bean的实例化之后执行的,可以了解一下Spring的生命周期。

浅谈Spring 事件驱动机制

③ 如果既要同步也要异步呢?

步骤如下:

  • 定义枚举
<code>public enum  EventTypeEnum {
ASYNC, //异步
SYNC; //同步

}   /<code>
  • 定义注解类
<code>EventType    /<code>
  • 在监听器上添加注解EventType
<code>/**
* 注册的事件监听器
*/
@Component
public class RegistryListener implements ApplicationListener<registryevent> {
@EventType(EventTypeEnum.ASYNC)
@Override
public void onApplicationEvent( RegistryEvent event ) {
System.out.println("恭喜你["+event.getUserName()+"],注册成功!");
System.out.println("发送短信验证码!");
System.out.println("发送邮件.......");
}
}  /<registryevent>/<code>
  • 实现自定义多播器,继承 SimpleApplicationEventMulticaster 类
  • 添加注解 @Component(value = "applicationEventMulticaster")
<code>@Component(value = "applicationEventMulticaster")
public class YjApplicationEventMulticaster extends SimpleApplicationEventMulticaster {
@Override
public void multicastEvent( final ApplicationEvent event) {
EventTypeEnum defaultEventType = EventTypeEnum.ASYNC;
for (final ApplicationListener> listener : getApplicationListeners()) {
Class> cls = listener.getClass();
try {
Method onApplicationEventMethod = cls.getMethod("onApplicationEvent",ApplicationEvent.class);
if(onApplicationEventMethod.isAnnotationPresent(EventType.class)){
EventType eventMethodAnnotation = onApplicationEventMethod.getAnnotation(EventType.class);
defaultEventType = eventMethodAnnotation.value();
}

if(defaultEventType == EventTypeEnum.ASYNC){
setTaskExecutor(Executors.newCachedThreadPool());
}
//找到
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeListener(listener, event);
}
});
}
else {
invokeListener(listener, event);
}
}catch (Exception e){
}

}
}
}/<code>

Spring 事件在项目中的应用

场景:在管理员修改权限的时候,要求将已经被修改的用户的会话剔除

1、定义事件对象

<code>public class RoleChangeEvent extends ApplicationEvent {
private final Long userId;
public RoleChangeEvent( Object source, Long userId ) {
super(source);
this.userId = userId;
}
public Long getUserId( ) {
return userId;
}
}/<code>

2、定义监听器

<code>public class RoleChangeListener {
@Autowired
private SessionRegistry sessionRegistry;

@Autowired
private ISysUserRoleService sysUserRoleService;

@EventListener(RoleChangeEvent.class)
@Async
public void permissionListenerPostProcess(RoleChangeEvent event){
Long userId = event.getUserId();
log.debug("receive permission changed event , eventClassName : [{}] ,userId : [{}]", event.getClass().getName(), userId);
//获取在线用户列表
List<activeuserdetails> onLineUserList = getOnlineUserList();
onLineUserList.stream().filter(userDetails -> userDetails.getUserId().equals(userId))
.forEach(userDetails -> {
//session剔除
sessionRegistry.getAllSessions(userDetails, false).forEach(SessionInformation::expireNow);
});

}
/**
* 获取在线的userDetails列表
* @return
*/
private List<activeuserdetails> getOnlineUserList(){
return sessionRegistry.getAllPrincipals().stream().filter(p->p instanceof ActiveUserDetails)
.map(p-> ((ActiveUserDetails) p))
.collect(Collectors.toList());
}
}/<activeuserdetails>/<activeuserdetails>/<code>

Spring事件的源码分析

1.org.springframework.context.support.AbstractApplicationContext#refresh

<code>\t\t// 初始化多播器
\t\t\t\tinitApplicationEventMulticaster();

\t\t\t\t// Initialize other special beans in specific context subclasses.
\t\t\t\tonRefresh();

\t\t\t\t// 注册监听器
\t\t\t\tregisterListeners();/<code>

① 多播器的初始化

org.springframework.context.support.AbstractApplicationContext#initApplicationEventMulticaster(初始化事件多播器)

<code>protected void initApplicationEventMulticaster() {
\t\tConfigurableListableBeanFactory beanFactory = getBeanFactory();
\t\t/判断IOC容器中包含applicationEventMulticaster 事件多播器的Bean的name
\t\tif (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
\t\t /创建一个applicationEventMulticaster的bean放在IOC 容器中,bean的name 为applicationEventMulticaster
\t\t\tthis.applicationEventMulticaster =beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
\t\t\tif (logger.isDebugEnabled()) {
\t\t\t\tlogger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
\t\t\t}
\t\t}
\t\t/容器中不包含一个beanName 为applicationEventMulticaster的多播器组件
\t\telse {
\t\t //创建一个SimpleApplicationEventMulticaster 多播器
\t\t\tthis.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
\t\t\t//注册到容器中
\t\t\tbeanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
\t\t\tif (logger.isDebugEnabled()) {
\t\t\t\tlogger.debug("Unable to locate ApplicationEventMulticaster with name '" +
\t\t\t\t\t\tAPPLICATION_EVENT_MULTICASTER_BEAN_NAME +
\t\t\t\t\t\t"': using default [" + this.applicationEventMulticaster + "]");
\t\t\t}
\t\t}
\t}​/<code>

把容器中的监听器注册到多播器上去 源码解析

<code>protected void registerListeners() {
\t\t//把系统自带的将挺起
\t\tfor (ApplicationListener> listener : getApplicationListeners()) {
\t\t\tgetApplicationEventMulticaster().addApplicationListener(listener);
\t\t}

//我们自己实现了ApplicationListener 的组件
\t\tString[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
\t\tfor (String listenerBeanName : listenerBeanNames) {

\t\t\tgetApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
\t\t}

\t //在这里之前,我们早期想发布的事件 由于没有多播器没有发布,在这里我们总算有了自己的多播器,可以在这里发布早期堆积的事件了.
\t\tSet<applicationevent> earlyEventsToProcess = this.earlyApplicationEvents;
\t\tthis.earlyApplicationEvents = null;
\t\tif (earlyEventsToProcess != null) {
\t\t\tfor (ApplicationEvent earlyEvent : earlyEventsToProcess) {
\t\t\t\tgetApplicationEventMulticaster().multicastEvent(earlyEvent);
\t\t\t}
\t\t}
\t}/<applicationevent>/<code>

③ 如何发布

<code>public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
\t\tResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
\t\t//获取到所有的监听器
\t\tfor (final ApplicationListener> listener : getApplicationListeners(event, type)) {
\t\t //看spring 容器中是否支持线程池 异步发送事件
\t\t\tExecutor executor = getTaskExecutor();
\t\t\tif (executor != null) {
\t\t\t\texecutor.execute(new Runnable() {
\t\t\t\t\t@Override
\t\t\t\t\tpublic void run() {
\t\t\t\t\t
\t\t\t\t\t\tinvokeListener(listener, event);
\t\t\t\t\t}
\t\t\t\t});
\t\t\t}
\t\t\telse { //同步发送事件
\t\t\t\tinvokeListener(listener, event);
\t\t\t}
\t\t}
\t}\t
\t
\t
\tprivate void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
\t\ttry {
\t\t //调用对于listener的onApplicationEvent事件

\t\t\tlistener.onApplicationEvent(event);
\t\t}
\t\tcatch (ClassCastException ex) {
\t\t\tString msg = ex.getMessage();
\t\t\tif (msg == null || matchesClassCastMessage(msg, event.getClass())) {
\t\t\t\t// Possibly a lambda-defined listener which we could not resolve the generic event type for
\t\t\t\t// -> let's suppress the exception and just log a debug message.
\t\t\t\tLog logger = LogFactory.getLog(getClass());
\t\t\t\tif (logger.isDebugEnabled()) {
\t\t\t\t\tlogger.debug("Non-matching event type for listener: " + listener, ex);
\t\t\t\t}
\t\t\t}
\t\t\telse {
\t\t\t\tthrow ex;
\t\t\t}
\t\t}
\t}/<code>

总结

  1. Spring 事件 驱动模型的底层是通过 观察者设计模式实现的。
  2. 通过ApplicationEvent 抽象类 和 ApplicationListener接口,可以实现Application 事件 处理
  3. 监听器在处理Event的时候,需要使用instanceof关键字判断是是否是自己需要的
  4. 广播的时候可以同步也可以异步执行

本文由码农的一天撰写,如果你认同我的观点的话,可以点赞加关注一下;如果你对本篇文章有其他见解的话,也欢迎在下方的评论区浏览讨论!


分享到:


相關文章: