11.29 Spring異步處理@Async的使用以及原理、源碼分析(@EnableAsync)

原文:https://cloud.tencent.com/developer/article/1497604

在開發過程中,我們會遇到很多使用線程池的業務場景,例如異步短信通知、異步記錄操作日誌。大多數使用線程池的場景,就是會將一些可以進行異步操作的業務放在線程池中去完成。

例如在生成訂單的時候給用戶發送短信,生成訂單的結果不應該被髮送短信的成功與否所左右,也就是說生成訂單這個主操作是不依賴於發送短信這個操作,所以我們就可以把發送短信這個操作置為異步操作。

那麼本文就是來看看Spring中提供的優雅的異步處理方案:在Spring3中,Spring中引入了一個新的註解@Async,這個註解讓我們在使用Spring完成異步操作變得非常方便

需要注意的是這些功能都是Spring Framework提供的,而非SpringBoot。因此下文的講解都是基於Spring Framework的工程

Spring中用@Async註解標記的方法,稱為異步方法,它會在調用方的當前線程之外的獨立的線程中執行,其實就相當於我們自己new Thread(()-> System.out.println("hello world !"))這樣在另一個線程中去執行相應的業務邏輯。

Demo

// @Async 若把註解放在類上或者接口上,那麼他所有的方法都會異步執行了~~~~(包括私有方法)
public interface HelloService {
Object hello();
}

@Service
public class HelloServiceImpl implements HelloService {

@Async // 注意此處加上了此註解
@Override
public Object hello() {
System.out.println("當前線程:" + Thread.currentThread().getName());
return "service hello";
}
}

然後只需要在配置裡,開啟對異步的支持即可:

@Configuration
@EnableAsync // 開啟異步註解的支持
public class RootConfig {

}

輸出如下(當前線程名):

當前線程:SimpleAsyncTaskExecutor-1

可以很明顯的發現,它使用的是線程池SimpleAsyncTaskExecutor,這也是Spring默認給我們提供的線程池(其實它不是一個真正的線程池,後面會有講述)。下面原理部分講解後,你就能知道怎麼讓它使用我們自定義的線程池了~~~

@Async註解使用細節

  1. @Async註解一般用在方法上,如果用在類上,那麼這個類所有的方法都是異步執行的;
  2. @Async可以放在任何方法上,哪怕你是private的(
    若是同類調用,請務必注意註解失效的情況~~~)
  3. 所使用的@Async註解方法的類對象應該是Spring容器管理的bean對象
  4. @Async可以放在接口處(或者接口方法上)。但是隻有使用的是JDK的動態代理時才有效,CGLIB會失效。因此建議:統一寫在實現類的方法上
  5. 需要註解@EnableAsync來開啟異步註解的支持
  6. 若你希望得到異步調用的返回值,請你的返回值用Futrue變量包裝起來

需要額外導入哪些Jar包?

它的依賴包非常簡單,只依賴一些Spring的核心包外加spring-aop,但是如果你已經導入了spring-webmvc這個jar,那就什麼不需要額外導入了,因為都有了:

Spring異步處理@Async的使用以及原理、源碼分析(@EnableAsync)

備註:它雖然依賴於Spring AOP,但是它並不需要導入aspectjweaver,因為它和AspectJ沒有半毛錢關係

原理、源碼解析

@EnableAsync

它位於的包名為org.springframework.scheduling.annotation,jar名為:spring-context

@EnableXXX這種設計模式之前有分析過多次,這個註解就是它的入口,因此本文也一樣,從入口處一層一層的剖析:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

\t //默認情況下,要開啟異步操作,要在相應的方法或者類上加上@Async註解或者EJB3.1規範下的@Asynchronous註解。
\t //這個屬性使得開發人員可以自己設置開啟異步操作的註解(可謂非常的人性化了,但是大多情況下用Spring的就足夠了)
\tClass extends Annotation> annotation() default Annotation.class;
\t// true表示啟用CGLIB代理
\tboolean proxyTargetClass() default false;
\t
\t// 代理方式:默認是PROXY 採用Spring的動態代理(含JDK動態代理和CGLIB)

\t// 若改為:AdviceMode.ASPECTJ表示使用AspectJ靜態代理方式。
\t// 它能夠解決同類內方法調用不走代理對象的問題,但是一般情況下都不建議這麼去做,不要修改這個參數值
\tAdviceMode mode() default AdviceMode.PROXY;
\t// 直接定義:它的執行順序(因為可能有多個@EnableXXX)
\tint order() default Ordered.LOWEST_PRECEDENCE;
}

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {

\t// May be used to determine the target executor to be used when executing this method
\t// 意思是這個value值是用來指定執行器的(寫入執行器BeanName即可採用特定的執行器去執行此方法)
\tString value() default "";

}

最重要的,還是上面的@Import註解導入的類:AsyncConfigurationSelector

AsyncConfigurationSelector

public class AsyncConfigurationSelector extends AdviceModeImportSelector<enableasync> {

\t// 這類 我也不知道在哪?是用於支持AspectJ這種靜態代理Mode的,忽略吧~~~~
\tprivate static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
\t\t\t"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

\t@Override
\t@Nullable
\tpublic String[] selectImports(AdviceMode adviceMode) {
\t\t// 這裡AdviceMode 進行不同的處理,從而向Spring容器注入了不同的Bean~~~

\t\tswitch (adviceMode) {
\t\t\t// 大多數情況下都走這裡,ProxyAsyncConfiguration會被注入到Bean容器裡面~~~
\t\t\tcase PROXY:
\t\t\t\treturn new String[] { ProxyAsyncConfiguration.class.getName() };
\t\t\tcase ASPECTJ:
\t\t\t\treturn new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
\t\t\tdefault:
\t\t\t\treturn null;
\t\t}
\t}

}/<enableasync>

AdviceModeImportSelector父類的這個抽象更加的重要。它的實現類至少有如下兩個:

Spring異步處理@Async的使用以及原理、源碼分析(@EnableAsync)

備註:TransactionManagementConfigurationSelector需要額外導入jar包:spring-tx

這個父類抽象得非常的好,它的作用:抽象實現支持了AdviceMode,並且支持通用的@EnableXXX模式。

//@since 3.1 它是一個`ImportSelector`
public abstract class AdviceModeImportSelector

改抽象提供了支持AdviceMode的較為通用的實現,若我們自己想自定義,可以考慮實現此類。

由此可議看出,@EnableAsync最終是向容器內注入了ProxyAsyncConfiguration這個Bean。由名字可議看出,它是一個配置類。

ProxyAsyncConfiguration

// 它是一個配置類,角色為ROLE_INFRASTRUCTURE 框架自用的Bean類型
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

\t// 它的作用就是諸如了一個AsyncAnnotationBeanPostProcessor,它是個BeanPostProcessor
\t@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
\t@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
\tpublic AsyncAnnotationBeanPostProcessor asyncAdvisor() {
\t\tAssert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
\t\tAsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
\t\t
\t\t// customAsyncAnnotation:自定義的註解類型
\t\t// AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation") 為拿到該註解該字段的默認值
\t\tClass extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
\t

\t\t// 相當於如果你指定了AsyncAnnotationType,那就set進去吧
\t\tif (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
\t\t\tbpp.setAsyncAnnotationType(customAsyncAnnotation);
\t\t}
\t\t
\t\t// 只有自定義了AsyncConfigurer的實現類,自定義了一個線程執行器,這裡才會有值
\t\tif (this.executor != null) {
\t\t\tbpp.setExecutor(this.executor);
\t\t}
\t\t// 同上,異步線程異常的處理器~~~~~
\t\tif (this.exceptionHandler != null) {
\t\t\tbpp.setExceptionHandler(this.exceptionHandler);
\t\t}
\t\t
\t\t// 這兩個參數,就不多說了。
\t\t// 可以看到,order屬性值,最終決定的是BeanProcessor的執行順序的
\t\tbpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
\t\tbpp.setOrder(this.enableAsync.<integer>getNumber("order"));
\t\treturn bpp;
\t}

}

// 它的父類:
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {

\t// 此註解@EnableAsync的元信息
\tprotected AnnotationAttributes enableAsync;
\t// 異步線程池
\tprotected Executor executor;
\t// 異步異常的處理器
\tprotected AsyncUncaughtExceptionHandler exceptionHandler;

\t@Override
\tpublic void setImportMetadata(AnnotationMetadata importMetadata) {
\t\t// 拿到@EnableAsync註解的元數據信息~~~
\t\tthis.enableAsync = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));

\t\tif (this.enableAsync == null) {
\t\t\tthrow new IllegalArgumentException("@EnableAsync is not present on importing class " + importMetadata.getClassName());
\t\t}
\t}

\t/**
\t * Collect any {@link AsyncConfigurer} beans through autowiring.
\t */
\t // doc說得很明白。它會把所有的`AsyncConfigurer`的實現類都蒐集進來,然後進行類似屬性的合併
\t // 備註 雖然這裡用的是Collection 但是AsyncConfigurer的實現類只允許有一個
\t@Autowired(required = false)
\tvoid setConfigurers(Collection<asyncconfigurer> configurers) {

\t\tif (CollectionUtils.isEmpty(configurers)) {
\t\t\treturn;
\t\t}
\t//AsyncConfigurer用來配置線程池配置以及異常處理器,而且在Spring環境中最多隻能有一個
\t//在這裡我們知道了,如果想要自己去配置線程池,只需要實現AsyncConfigurer接口,並且不可以在Spring環境中有多個實現AsyncConfigurer的類。
\t\tif (configurers.size() > 1) {
\t\t\tthrow new IllegalStateException("Only one AsyncConfigurer may exist");
\t\t}
\t\t// 拿到唯一的AsyncConfigurer ,然後賦值~~~~ 默認的請參照這個類:AsyncConfigurerSupport(它並不會被加入進Spring容器裡)
\t\tAsyncConfigurer configurer = configurers.iterator().next();
\t\tthis.executor = configurer.getAsyncExecutor();
\t\tthis.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
\t}

}/<asyncconfigurer>/<integer>

從上可知,真正做文章的最終還是AsyncAnnotationBeanPostProcessor這個後置處理器,下面我們來重點看看它

AsyncAnnotationBeanPostProcessor

AsyncAnnotationBeanPostProcessor這個BeanPostBeanPostProcessor很顯然會對帶有能夠引發異步操作的註解(比如@Async)的Bean進行處理

Spring異步處理@Async的使用以及原理、源碼分析(@EnableAsync)

從該類的繼承體系可以看出,大部分功能都是在抽象類裡完成的,它不關乎於@Async,而是這一類技術都是這樣子處理的。

首先,ProxyProcessorSupport這裡就不用多說了,在講解自動代理創建器的時候有說過。 小家Spring】Spring AOP的核心類:AbstractAdvisorAutoProxy自動代理創建器深度剖析(AnnotationAwareAspectJAutoProxyCreator)

按照我一貫的下關,我還是喜歡從底部往上進行分析,這樣能夠更無阻礙些。

AbstractAdvisingBeanPostProcessor

從這個名字也能看出來。它主要處理AdvisingBean,也就是處理Advisor和Bean的關係的

// 它繼承自,ProxyProcessorSupport,說明它也擁有AOp的通用配置
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {

\t@Nullable
\tprotected Advisor advisor;
\tprotected boolean beforeExistingAdvisors = false;
\t
\t// 緩存合格的Bean們
\tprivate final Map<class>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);

\t// 當遇到一個pre-object的時候,是否把該processor所持有得advisor放在現有的增強器們之前執行
\t// 默認是false,會放在最後一個位置上的
\tpublic void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
\t\tthis.beforeExistingAdvisors = beforeExistingAdvisors;
\t}
\t
\t// 不處理
\t@Override
\tpublic Object postProcessBeforeInitialization(Object bean, String beanName) {
\t\treturn bean;
\t}

\t// Bean已經實例化、初始化完成之後執行。
\t@Override
\tpublic Object postProcessAfterInitialization(Object bean, String beanName) {
\t\t// 忽略AopInfrastructureBean的Bean,並且如果沒有advisor也會忽略不處理~~~~~

\t\tif (bean instanceof AopInfrastructureBean || this.advisor == null) {
\t\t\t// Ignore AOP infrastructure such as scoped proxies.
\t\t\treturn bean;
\t\t}
\t\t
\t\t// 如果這個Bean已經被代理過了(比如已經被AOP切中了),那本處就無需再重複創建代理了嘛
\t\t// 直接向裡面添加advisor就成了
\t\tif (bean instanceof Advised) {
\t\t\tAdvised advised = (Advised) bean;
\t\t\t// 注意此advised不能是已經被凍結了的。且源對象必須是Eligible合格的
\t\t\tif (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
\t\t\t\t// Add our local Advisor to the existing proxy's Advisor chain...
\t\t\t\t// 把自己持有的這個advisor放在首位(如果beforeExistingAdvisors=true)
\t\t\t\tif (this.beforeExistingAdvisors) {
\t\t\t\t\tadvised.addAdvisor(0, this.advisor);
\t\t\t\t}
\t\t\t\t// 否則就是尾部位置
\t\t\t\telse {
\t\t\t\t\tadvised.addAdvisor(this.advisor);
\t\t\t\t}
\t\t\t\t// 最終直接返回即可,因為已經沒有必要再創建一次代理對象了
\t\t\t\treturn bean;
\t\t\t}
\t\t}

\t\t// 如果這個Bean事合格的(此方法下面有解釋) 這個時候是沒有被代理過的
\t\tif (isEligible(bean, beanName)) {
\t\t\t// 以當前的配置,創建一個ProxyFactory
\t\t\tProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
\t\t\t// 如果不是使用CGLIB常見代理,那就去分析出它所實現的接口們 然後放進ProxyFactory 裡去
\t\t\tif (!proxyFactory.isProxyTargetClass()) {

\t\t\t\tevaluateProxyInterfaces(bean.getClass(), proxyFactory);
\t\t\t}
\t\t\t
\t\t\t// 切面就是當前持有得advisor
\t\t\tproxyFactory.addAdvisor(this.advisor);
\t\t\t// 留給子類,自己還可以對proxyFactory進行自定義~~~~~
\t\t\tcustomizeProxyFactory(proxyFactory);
\t\t\t// 最終返回這個代理對象~~~~~
\t\t\treturn proxyFactory.getProxy(getProxyClassLoader());
\t\t}

\t\t// No async proxy needed.(相當於沒有做任何的代理處理,返回原對象)
\t\treturn bean;
\t}
\t
\t// 檢查這個Bean是否是合格的
\tprotected boolean isEligible(Object bean, String beanName) {
\t\treturn isEligible(bean.getClass());
\t}
\tprotected boolean isEligible(Class> targetClass) {
\t\t// 如果已經被緩存著了,那肯定靠譜啊
\t\tBoolean eligible = this.eligibleBeans.get(targetClass);
\t\tif (eligible != null) {
\t\t\treturn eligible;
\t\t}
\t\t// 如果沒有切面(就相當於沒有給配置增強器,那鐵定是不合格的)
\t\tif (this.advisor == null) {
\t\t\treturn false;
\t\t}
\t
\t\t// 這個重要了:看看這個advisor是否能夠切入進targetClass這個類,能夠切入進取的也是合格的
\t\teligible = AopUtils.canApply(this.advisor, targetClass);
\t\tthis.eligibleBeans.put(targetClass, eligible);
\t\treturn eligible;
\t}

\t// 子類可以複寫。比如`AbstractBeanFactoryAwareAdvisingPostProcessor`就複寫了這個方法~~~
\tprotected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
\t\tProxyFactory proxyFactory = new ProxyFactory();
\t\tproxyFactory.copyFrom(this);
\t\tproxyFactory.setTarget(bean);
\t\treturn proxyFactory;
\t}

\t// 子類複寫~
\tprotected void customizeProxyFactory(ProxyFactory proxyFactory) {
\t}

}/<class>

看看它的繼承圖:

Spring異步處理@Async的使用以及原理、源碼分析(@EnableAsync)

MethodValidationPostProcessor屬於JSR-303校驗方面的範疇,不是本文的內容,因此不會講述

AbstractBeanFactoryAwareAdvisingPostProcessor

從名字可以看出,它相較於父類,就和BeanFactory有關了,也就是和Bean容器相關了~~~

public abstract class AbstractBeanFactoryAwareAdvisingPostProcessor extends AbstractAdvisingBeanPostProcessor
\t\timplements BeanFactoryAware {

\t// Bean工廠
\t@Nullable
\tprivate ConfigurableListableBeanFactory beanFactory;

\t// 如果這個Bean工廠不是ConfigurableListableBeanFactory ,那就set一個null
\t// 我們的`DefaultListableBeanFactory`顯然就是它的子類~~~~~
\t@Override
\tpublic void setBeanFactory(BeanFactory beanFactory) {
\t\tthis.beanFactory = (beanFactory instanceof ConfigurableListableBeanFactory ?
\t\t\t\t(ConfigurableListableBeanFactory) beanFactory : null);
\t}

\t
\t@Override
\tprotected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
\t\t// 如果Bean工廠是正常的,那就把這個Bean expose一個特殊的Bean,記錄下它的類型
\t\tif (this.beanFactory != null) {
\t\t\tAutoProxyUtils.exposeTargetClass(this.beanFactory, beanName, bean.getClass());
\t\t}

\t\tProxyFactory proxyFactory = super.prepareProxyFactory(bean, beanName);
\t\t
\t\t// 這裡創建代理也是和`AbstractAutoProxyCreator`差不多的邏輯。
\t\t// 如果沒有顯示的設置為CGLIB,並且toProxyUtils.shouldProxyTargetClass還被暴露過時一個特殊的Bean,那就強制使用CGLIB代理吧 這裡一般和Scope無關的話,都返回false了
\t\tif (!proxyFactory.isProxyTargetClass() && this.beanFactory != null &&
\t\t\t\tAutoProxyUtils.shouldProxyTargetClass(this.beanFactory, beanName)) {

\t\t\tproxyFactory.setProxyTargetClass(true);
\t\t}
\t\treturn proxyFactory;
\t}

}

下面就可以看看具體的兩個實現類了:

AsyncAnnotationBeanPostProcessor

該實現類就是具體和@Async相關的一個類了~~~

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {

\t// 建議換成AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME 這樣語意更加的清晰些
\tpublic static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;

\t// 註解類型
\t@Nullable
\tprivate Class extends Annotation> asyncAnnotationType;
\t// 異步的執行器
\t@Nullable
\tprivate Executor executor;
\t// 異步異常處理器
\t@Nullable
\tprivate AsyncUncaughtExceptionHandler exceptionHandler;

\t// 此處特別注意:這裡設置為true,也就是說@Async的Advisor會放在首位
\tpublic AsyncAnnotationBeanPostProcessor() {
\t\tsetBeforeExistingAdvisors(true);
\t}

\t// 可以設定需要掃描哪些註解類型。默認只掃描@Async以及`javax.ejb.Asynchronous`這個註解
\tpublic void setAsyncAnnotationType(Class extends Annotation> asyncAnnotationType) {
\t\tAssert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
\t\tthis.asyncAnnotationType = asyncAnnotationType;

\t}

\t// 如果沒有指定。那就將執行全局得默認查找。在上下文中查找唯一的`TaskExecutor`類型的Bean,或者一個名稱為`taskExecutor`的Executor
\t// 當然,如果上面途徑都沒找到。那就會採用一個默認的任務池
\tpublic void setExecutor(Executor executor) {
\t\tthis.executor = executor;
\t}
\tpublic void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
\t\tthis.exceptionHandler = exceptionHandler;
\t}


\t// 重寫了父類的方法。然後下面:自己new了一個AsyncAnnotationAdvisor ,傳入executor和exceptionHandler
\t// 並且最終this.advisor = advisor
\t// 因此可議看出:AsyncAnnotationAdvisor 才是重點了。它定義了它的匹配情況~~~~
\t@Override
\tpublic void setBeanFactory(BeanFactory beanFactory) {
\t\tsuper.setBeanFactory(beanFactory);

\t\tAsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
\t\tif (this.asyncAnnotationType != null) {
\t\t\tadvisor.setAsyncAnnotationType(this.asyncAnnotationType);
\t\t}
\t\tadvisor.setBeanFactory(beanFactory);
\t\tthis.advisor = advisor;
\t}

}

AsyncAnnotationAdvisor

可以看出,它是一個PointcutAdvisor,並且Pointcut是一個AnnotationMatchingPointcut,因此是為註解來匹配的

public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware { 


\tprivate AsyncUncaughtExceptionHandler exceptionHandler;
\t// 增強器
\tprivate Advice advice;
\t// 切點
\tprivate Pointcut pointcut;

\t// 兩個都為null,那就是都會採用默認的方案
\tpublic AsyncAnnotationAdvisor() {
\t\tthis(null, null);
\t}

\t// 創建一個AsyncAnnotationAdvisor實例,可以自己指定Executor 和 AsyncUncaughtExceptionHandler
\t@SuppressWarnings("unchecked")
\tpublic AsyncAnnotationAdvisor(@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {
\t\t// 這裡List長度選擇2,應為絕大部分情況下只會支持這兩種@Async和@Asynchronous
\t\tSet<class>> asyncAnnotationTypes = new LinkedHashSet<>(2);
\t\tasyncAnnotationTypes.add(Async.class);
\t\ttry {
\t\t\tasyncAnnotationTypes.add((Class extends Annotation>)
\t\t\t\t\tClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
\t\t}
\t\tcatch (ClassNotFoundException ex) {
\t\t\t// If EJB 3.1 API not present, simply ignore.
\t\t}

\t\tif (exceptionHandler != null) {
\t\t\tthis.exceptionHandler = exceptionHandler;
\t\t}
\t\t// 若沒指定,那就使用默認的SimpleAsyncUncaughtExceptionHandler(它僅僅是輸出了一句日誌而已)
\t\telse {
\t\t\tthis.exceptionHandler = new SimpleAsyncUncaughtExceptionHandler();
\t\t}
\t\t
\t\t// 這兩個方法是重點,下面會重點介紹
\t\tthis.advice = buildAdvice(executor, this.exceptionHandler);
\t\tthis.pointcut = buildPointcut(asyncAnnotationTypes);
\t}


\t// 如果set了Executor,advice會重新構建。
\tpublic void setTaskExecutor(Executor executor) {
\t\tthis.advice = buildAdvice(executor, this.exceptionHandler);
\t}

\t// 這裡注意:如果你自己指定了註解類型。那麼將不再掃描其餘兩個默認的註解,因此pointcut也就需要重新構建了
\tpublic void setAsyncAnnotationType(Class extends Annotation> asyncAnnotationType) {
\t\tAssert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
\t\tSet<class>> asyncAnnotationTypes = new HashSet<>();
\t\tasyncAnnotationTypes.add(asyncAnnotationType);
\t\tthis.pointcut = buildPointcut(asyncAnnotationTypes);
\t}
\t
\t// 如果這個advice也實現了BeanFactoryAware,那就也把BeanFactory放進去
\t@Override
\tpublic void setBeanFactory(BeanFactory beanFactory) {
\t\tif (this.advice instanceof BeanFactoryAware) {
\t\t\t((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
\t\t}
\t}

\t@Override
\tpublic Advice getAdvice() {
\t\treturn this.advice;
\t}
\t@Override
\tpublic Pointcut getPointcut() {
\t\treturn this.pointcut;
\t}

\t// 這個最終又是委託給`AnnotationAsyncExecutionInterceptor`,它是一個具體的增強器,有著核心內容
\tprotected Advice buildAdvice(@Nullable Executor executor, AsyncUncaughtExceptionHandler exceptionHandler) {
\t\treturn new AnnotationAsyncExecutionInterceptor(executor, exceptionHandler);
\t}

\t// Calculate a pointcut for the given async annotation types, if any
\tprotected Pointcut buildPointcut(Set<class>> asyncAnnotationTypes) {
\t\t// 採用一個組合切面:ComposablePointcut (因為可能需要支持多個註解嘛)

\t\tComposablePointcut result = null;
\t\tfor (Class extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
\t\t\t// 這裡為何new出來兩個AnnotationMatchingPointcut??????
\t\t\t// 第一個:類匹配(只需要類上面有這個註解,所有的方法都匹配)this.methodMatcher = MethodMatcher.TRUE;
\t\t\t// 第二個:方法匹配。所有的類都可議。但是隻有方法上有這個註解才會匹配上
\t\t\tPointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
\t\t\tPointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
\t\t\tif (result == null) {
\t\t\t\tresult = new ComposablePointcut(cpc);
\t\t\t}
\t\t\telse {
\t\t\t\tresult.union(cpc);
\t\t\t}
\t\t\t// 最終的結果都是取值為並集的~~~~~~~
\t\t\tresult = result.union(mpc);
\t\t}
\t\t// 最後一個處理厲害了:也就是說你啥類型都木有的情況下,是匹配所有類的所有方法~~~
\t\treturn (result != null ? result : Pointcut.TRUE);
\t}

}/<class>/<class>/<class>

從上的源碼可議看出,默認是支持@Asycn以及EJB的那個異步註解的。但是最終的增強行為,委託給了AnnotationAsyncExecutionInterceptor

AnnotationAsyncExecutionInterceptor:@Async攔截器

Spring異步處理@Async的使用以及原理、源碼分析(@EnableAsync)

可知,它是一個MethodInterceptor,並且繼承自AsyncExecutionAspectSupport

AsyncExecutionAspectSupport

從類名就可以看出,它是用來支持處理異步線程執行器的,若沒有指定,靠它提供一個默認的異步執行器。

public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {

\t// 這是備選的。如果找到多個類型為TaskExecutor的Bean,才會備選的再用這個名稱去找的~~~
\tpublic static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";
\t// 緩存~~~AsyncTaskExecutor是TaskExecutor的子接口
\t// 從這可以看出:不同的方法,對應的異步執行器還不一樣咯~~~~~~
\tprivate final Map<method> executors = new ConcurrentHashMap<>(16);
\t// 默認的線程執行器
\t@Nullable
\tprivate volatile Executor defaultExecutor;
\t// 異步異常處理器
\tprivate AsyncUncaughtExceptionHandler exceptionHandler;
\t// Bean工廠
\t@Nullable
\tprivate BeanFactory beanFactory;

\tpublic AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
\t\tthis(defaultExecutor, new SimpleAsyncUncaughtExceptionHandler());
\t}
\tpublic AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
\t\tthis.defaultExecutor = defaultExecutor;
\t\tthis.exceptionHandler = exceptionHandler;
\t}
\tpublic void setExecutor(Executor defaultExecutor) {
\t\tthis.defaultExecutor = defaultExecutor;
\t}

\tpublic void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
\t\tthis.exceptionHandler = exceptionHandler;
\t}
\t@Override
\tpublic void setBeanFactory(BeanFactory beanFactory) {
\t\tthis.beanFactory = beanFactory;
\t}
\t
\t// 該方法是找到一個異步執行器,去執行這個方法~~~~~~
\t@Nullable
\tprotected AsyncTaskExecutor determineAsyncExecutor(Method method) {
\t\t// 如果緩存中能夠找到該方法對應的執行器,就立馬返回了
\t\tAsyncTaskExecutor executor = this.executors.get(method);
\t\tif (executor == null) {
\t\t\tExecutor targetExecutor;
\t\t\t
\t\t\t// 抽象方法:`AnnotationAsyncExecutionInterceptor`有實現。就是@Async註解的value值
\t\t\tString qualifier = getExecutorQualifier(method);
\t\t\t// 現在知道@Async直接的value值的作用了吧。就是制定執行此方法的執行器的(容器內執行器的Bean的名稱)
\t\t\t// 當然有可能為null。注意此處是支持@Qualified註解標註在類上來區分Bean的
\t\t\t// 注意:此處targetExecutor仍然可能為null
\t\t\tif (StringUtils.hasLength(qualifier)) {
\t\t\t\ttargetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
\t\t\t}
\t\t\t
\t\t\t// 註解沒有指定value值,那就去找默認的執行器
\t\t\telse {
\t\t\t\ttargetExecutor = this.defaultExecutor;
\t\t\t\tif (targetExecutor == null) {
\t\t\t\t\t// 去找getDefaultExecutor()~~~
\t\t\t\t\tsynchronized (this.executors) {
\t\t\t\t\t\tif (this.defaultExecutor == null) {
\t\t\t\t\t\t\tthis.defaultExecutor = getDefaultExecutor(this.beanFactory);
\t\t\t\t\t\t}
\t\t\t\t\t\ttargetExecutor = this.defaultExecutor;

\t\t\t\t\t}
\t\t\t\t}
\t\t\t}
\t\t\t
\t\t\t// 若還未null,那就返回null吧
\t\t\tif (targetExecutor == null) {
\t\t\t\treturn null;
\t\t\t}
\t\t\t
\t\t\t// 把targetExecutor 包裝成一個AsyncTaskExecutor返回,並且緩存起來。
\t\t\t// TaskExecutorAdapter就是AsyncListenableTaskExecutor的一個實現類
\t\t\texecutor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
\t\t\t\t\t(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
\t\t\tthis.executors.put(method, executor);
\t\t}
\t\treturn executor;
\t}

\t// 子類去複寫此方法。也就是拿到對應的key,從而方便找bean吧(執行器)
\t@Nullable
\tprotected abstract String getExecutorQualifier(Method method);

\t// @since 4.2.6 也就是根據
\t@Nullable
\tprotected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {
\t\tif (beanFactory == null) {
\t\t\tthrow new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
\t\t\t\t\t" to access qualified executor '" + qualifier + "'");
\t\t}
\t\treturn BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
\t}

\t// @since 4.2.6
\t// Retrieve or build a default executor for this advice instance 檢索或者創建一個默認的executor
\t@Nullable
\tprotected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
\t\tif (beanFactory != null) {
\t\t\t// 這個處理很有意思,它是用用的try catch的技巧去處理的
\t\t\ttry {
\t\t\t\t// 如果容器內存在唯一的TaskExecutor(子類),就直接返回了

\t\t\t\treturn beanFactory.getBean(TaskExecutor.class);
\t\t\t}
\t\t\tcatch (NoUniqueBeanDefinitionException ex) {
\t\t\t\t// 這是出現了多個TaskExecutor類型的話,那就按照名字去拿 `taskExecutor`且是Executor類型
\t\t\t\ttry {
\t\t\t\t\treturn beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
\t\t\t\t}
\t\t\t\t// 如果再沒有找到,也不要報錯,而是接下來創建一個默認的處理器
\t\t\t\t// 這裡輸出一個info信息
\t\t\t\tcatch (NoSuchBeanDefinitionException ex2) {
\t\t\t\t}
\t\t\t}
\t\t\tcatch (NoSuchBeanDefinitionException ex) {
\t\t\t\ttry {
\t\t\t\t\treturn beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
\t\t\t\t}
\t\t\t\tcatch (NoSuchBeanDefinitionException ex2) {
\t\t\t\t}
\t\t\t\t// 這裡還沒有獲取到,就放棄。 用本地默認的executor吧~~~
\t\t\t\t// 子類可以去複寫此方法,發現為null的話可議給一個默認值~~~~比如`AsyncExecutionInterceptor`默認給的就是`SimpleAsyncTaskExecutor`作為執行器的
\t\t\t\t// Giving up -> either using local default executor or none at all...
\t\t\t}
\t\t}
\t\treturn null;
\t}

\t//Delegate for actually executing the given task with the chosen executor
\t// 用選定的執行者實際執行給定任務的委託
\t@Nullable
\tprotected Object doSubmit(Callable<object> task, AsyncTaskExecutor executor, Class> returnType) {
\t\t// 根據不同的返回值類型,來採用不同的方案去異步執行,但是執行器都是executor
\t\tif (CompletableFuture.class.isAssignableFrom(returnType)) {
\t\t\treturn CompletableFuture.supplyAsync(() -> {
\t\t\t\ttry {

\t\t\t\t\treturn task.call();
\t\t\t\t}
\t\t\t\tcatch (Throwable ex) {
\t\t\t\t\tthrow new CompletionException(ex);
\t\t\t\t}
\t\t\t}, executor);
\t\t}
\t\t// ListenableFuture接口繼承自Future 是Spring自己擴展的一個接口。
\t\t// 同樣的AsyncListenableTaskExecutor也是Spring擴展自AsyncTaskExecutor的
\t\telse if (ListenableFuture.class.isAssignableFrom(returnType)) {
\t\t\treturn ((AsyncListenableTaskExecutor) executor).submitListenable(task);
\t\t}
\t\t
\t\t// 普通的submit
\t\telse if (Future.class.isAssignableFrom(returnType)) {
\t\t\treturn executor.submit(task);
\t\t}
\t\t// 沒有返回值的情況下 也用sumitt提交,按時返回null
\t\telse {
\t\t\texecutor.submit(task);
\t\t\treturn null;
\t\t}
\t}

\t// 處理錯誤
\tprotected void handleError(Throwable ex, Method method, Object... params) throws Exception {
\t \t// 如果方法的返回值類型是Future,就rethrowException,表示直接throw出去
\t\tif (Future.class.isAssignableFrom(method.getReturnType())) {
\t\t\tReflectionUtils.rethrowException(ex);
\t\t} else {
\t\t\t// Could not transmit the exception to the caller with default executor
\t\t\ttry {
\t\t\t\tthis.exceptionHandler.handleUncaughtException(ex, method, params);
\t\t\t} catch (Throwable ex2) {
\t\t\t\tlogger.error("Exception handler for async method '" + method.toGenericString() +
\t\t\t\t\t\t"' threw unexpected exception itself", ex2);
\t\t\t}
\t\t}
\t}

}/<object>/<method>

這個類相當於已經做了大部分的工作了,接下來繼續看子類:

AsyncExecutionInterceptor

終於,從此處開始。可議看出它是一個MethodInterceptor,是一個增強器了。但是從命名中也可以看出,它已經能夠處理異步的執行了(比如基於XML方式的),但是還和註解無關

// 他繼承自AsyncExecutionAspectSupport 來處理異步方法的處理,同時是個MethodInterceptor,來增強複合條件的方法
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {
\t...
\t// 顯然這個方法它直接返回null,因為XML配置嘛~~~~
\t@Override
\t@Nullable
\tprotected String getExecutorQualifier(Method method) {
\t\treturn null;
\t}

\t// 這個厲害了。如果父類返回的defaultExecutor 為null,那就new一個SimpleAsyncTaskExecutor作為默認的執行器
\t@Override
\t@Nullable
\tprotected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
\t\tExecutor defaultExecutor = super.getDefaultExecutor(beanFactory);
\t\treturn (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
\t}
\t// 最高優先級 希望的是最優先執行(XML配置就是這種優先級)
\t@Override
\tpublic int getOrder() {
\t\treturn Ordered.HIGHEST_PRECEDENCE;
\t}

// 最重要的當然是這個invoke方法
\t@Override
\t@Nullable

\tpublic Object invoke(final MethodInvocation invocation) throws Throwable {
\t\tClass> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
\t\t// 注意:此處是getMostSpecificMethod 拿到最終要執行的那個方法
\t\tMethod specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
\t\t// 橋接方法~~~~~~~~~~~~~~
\t\tfinal Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

\t\t// determine一個用於執行此方法的異步執行器
\t\tAsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
\t\tif (executor == null) {
\t\t\tthrow new IllegalStateException(
\t\t\t\t\t"No executor specified and no default executor set on AsyncExecutionInterceptor either");
\t\t}

\t\t// 構造一個任務,Callable(此處不採用Runable,因為需要返回值)
\t\tCallable<object> task = () -> {
\t\t\ttry {
\t\t\t\t// result就是返回值
\t\t\t\tObject result = invocation.proceed();
\t\t\t\t// 注意此處的處理~~~~ 相當於如果不是Future類型,就返回null了
\t\t\t\tif (result instanceof Future) {
\t\t\t\t\treturn ((Future>) result).get();
\t\t\t\t}
\t\t\t}
\t\t\t// 處理執行時可能產生的異常~~~~~~
\t\t\tcatch (ExecutionException ex) {
\t\t\t\thandleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
\t\t\t}
\t\t\tcatch (Throwable ex) {
\t\t\t\thandleError(ex, userDeclaredMethod, invocation.getArguments());
\t\t\t}
\t\t\treturn null;
\t\t};
\t\t
\t\t// 提交任務~~~~invocation.getMethod().getReturnType()為返回值類型
\t\treturn doSubmit(task, executor, invocation.getMethod().getReturnType());
\t}
\t
}/<object>

SimpleAsyncTaskExecutor:異步執行用戶任務的SimpleAsyncTaskExecutor。每次執行客戶提交給它的任務時,它會啟動新的線程,並允許開發者控制併發線程的上限(concurrencyLimit),從而起到一定的資源節流作用。默認時,concurrencyLimit取值為-1,即**不啟用**資源節流 所以它不是真的線程池,這個類不重用線程,每次調用都會創建一個新的線程(因此建議我們在使用@Aysnc的時候,自己配置一個線程池,節約資源)

AnnotationAsyncExecutionInterceptor

很顯然,他是在AsyncExecutionInterceptor的基礎上,提供了對@Async註解的支持。所以它是繼承它的。

// 它繼承自AsyncExecutionInterceptor ,只複寫了一個方法
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
\t...
\t// 由此可以見它就是去拿到@Async的value值。以方法的為準,其次是類上面的
\t// 備註:發現這裡是不支持EJB的@Asynchronous註解的,它是不能指定執行器的
\t@Override
\t@Nullable
\tprotected String getExecutorQualifier(Method method) {
\t\t// Maintainer's note: changes made here should also be made in
\t\t// AnnotationAsyncExecutionAspect#getExecutorQualifier
\t\tAsync async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
\t\tif (async == null) {
\t\t\tasync = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
\t\t}
\t\treturn (async != null ? async.value() : null);
\t}

}

@Async註解失效得原因

如下:若我是這麼寫的

@Service
public class HelloServiceImpl implements HelloService {

@Override
public Object hello() {
System.out.println("當前線程:" + Thread.currentThread().getName());
helloPrivate(); // 同類內部方法調用
return "service hello";
}

@Async
@Override
public Object hello2() {
System.out.println("當前線程:" + Thread.currentThread().getName());
return null;
}
}

最終輸出為;

當前線程:http-nio-8080-exec-3
當前線程:http-nio-8080-exec-3

發現都是tomcat的線程。what?@Async異步線程木有生效???? 這個也不是什麼新問題了。在之前一篇博文:【小家java】Spring事務不生效的原因大解讀 原因也是一樣的,都是所謂的:代理失效問題。

如何解決?這裡就說一種方案吧:

@Service 

public class HelloServiceImpl implements HelloService {

@Autowired
private ApplicationContext applicationContext;

@Override
public Object hello() {
System.out.println("當前線程:" + Thread.currentThread().getName());
// 從容器裡拿到該類型的實例~~~~(注意:若是JDK代理,這裡的類型只能傳接口,而不能是實現類,否則NoSuchBean...)
HelloService helloService = applicationContext.getBean(HelloService.class);
// 然後用本實例去調用 而不是用默認的this去調用
helloService.hello2();
return "service hello";
}

@Async
@Override
public Object hello2() {
System.out.println("當前線程:" + Thread.currentThread().getName());
return null;
}
}

輸出:

當前線程:http-nio-8080-exec-4
當前線程:SimpleAsyncTaskExecutor-2

顯然這才是我們想要的結果。因此在同類調用的時候,一定要特別的小心,很有可能是不生效的。

各位可以看看你們同事寫的代碼,據我推測,肯定有同事寫了這種不生效的代碼~~

使用推薦配置

@EnableAsync //對應的@Enable註解,最好寫在屬於自己的配置文件上,保持內聚性
@Configuration
public class AsyncConfig implements AsyncConfigurer {

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); //核心線程數
executor.setMaxPoolSize(20); //最大線程數
executor.setQueueCapacity(1000); //隊列大小
executor.setKeepAliveSeconds(300); //線程最大空閒時間
executor.setThreadNamePrefix("fsx-Executor-"); ////指定用於新創建的線程名稱的前綴。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒絕策略(一共四種,此處省略)

\t // 這一步千萬不能忘了,否則報錯: java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized
executor.initialize();
return executor;
}

// 異常處理器:當然你也可以自定義的,這裡我就這麼簡單寫了~~~
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}

使用Spring的異步,我個人十分建議配置上自己自定義的配置器(如上配置僅供參考),這樣能夠更好的掌握(比如異常處理AsyncUncaughtExceptionHandler~~~)

總結

有了Spring AOP整體運行原理作為基礎,再看這些基於AOP的應用,簡直行雲流水。因此還是應正了那句話:你能走多遠,就看你的根基有多穩。

最後,使用的有兩個建議:

  1. 異步方法建議儘量處理耗時的任務,或者是處理不希望阻斷主流程的任務(比如異步記錄操作日誌)
  2. 儘量為@Async準備一個專門的線程池來提高效率減少開銷,而不要用Spring默認的SimpleAsyncTaskExecutor,它不是一個真正的線程池~


分享到:


相關文章: