使用@EnableAsync开启异步切面,然后在异步调用的方法上加上@Asyc注解即可
@SpringBootApplication @EnableAsync //开启异步切面 public class SpringdemoApplication { public static void main(String[] args) { SpringApplication.run(SpringdemoApplication.class, args); } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @Async //异步 @Override public void invokeAsyncTest01() { System.out.println(Thread.currentThread() + "运行了invokeAsyncTest01方法!"); } }
@Async注解是异步切面默认的异步注解,我们可以在@EnableAsync(annotation = AsyncCustom.class)开启异步切面时指定自定义的异步注解
@Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface AsyncCustom { }
@SpringBootApplication @EnableAsync(annotation = AsyncCustom.class) public class SpringdemoApplication { public static void main(String[] args) { SpringApplication.run(SpringdemoApplication.class, args); } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @AsyncCustom //异步 @Override public void invokeAsyncTest01() { System.out.println(Thread.currentThread() + "运行了invokeAsyncTest01方法!"); } }
当@Async注解的value没有指定线程池名称时,将会使用此线程池,不手动设置默认线程池,系统也会给你创建一个默认线程池(详细流程请看 线程池获取优先级)。
@Slf4j @Component public class AsyncConfig implements AsyncConfigurer { /** * 设置默认线程池 **/ @Override public Executor getAsyncExecutor() { //此处最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程 SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setThreadNamePrefix("CustomAsync-Test-"); return taskExecutor; } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @Async @Override public void invokeAsyncTest01() { System.out.println(Thread.currentThread() + "运行了invokeAsyncTest01方法!"); } }
当@Async注解的value有指定线程池名称时,将会使用容器中beanname=此value值的Executor线程池
@Configuration public class TaskExecutorConfig { @Bean public Executor deleteFileExecutor() { //此处最好使用new ThreadPoolExecutor显示创建,SimpleAsyncTaskExecutor没有复用线程 SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setThreadNamePrefix("delete-file-"); return taskExecutor; } @Bean public Executor sendEmailExecutor() { //此处最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程 SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setThreadNamePrefix("send-email-"); return taskExecutor; } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @Async("deleteFileExecutor") @Override public void deleteFile() { System.out.println(Thread.currentThread() + "运行了deleteFile方法!"); } @Async("sendEmailExecutor") @Override public void sendEmail() { System.out.println(Thread.currentThread() + "运行了sendEmail方法!"); } }
只要是异步,一般都有可能用到需要返回结果的异步任务,当然@Async也支持异步结果返回,目前仅支持CompletableFuture、ListenableFuture、Future
@RestController @RequestMapping("/testasync") public class TestAsyncController { @Autowired private AsyncTestService asyncTestService; @GetMapping("/test02") public void test02() { CompletableFuturecompletableFuture = asyncTestService.invokeAsyncTest02(); completableFuture.thenAccept(System.out::println); } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @Async @Override public CompletableFutureinvokeAsyncTest02() { System.out.println(Thread.currentThread() + "运行了invokeAsyncTest02方法!"); return CompletableFuture.completedFuture("Hello world!"); } }
@RestController @RequestMapping("/testasync") public class TestAsyncController { @Autowired private AsyncTestService asyncTestService; @GetMapping("/test03") public void test03() { ListenableFuturestringListenableFuture = asyncTestService.invokeAsyncTest03(); stringListenableFuture.addCallback(System.out::println, System.out::println); } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @Async @Override public ListenableFutureinvokeAsyncTest03() { System.out.println(Thread.currentThread() + "运行了invokeAsyncTest03方法!"); return new AsyncResult ("Hello World!"); } }
@RestController @RequestMapping("/testasync") public class TestAsyncController { @Autowired private AsyncTestService asyncTestService; @GetMapping("/test04") public void test04() throws ExecutionException, InterruptedException { Futurefuture = asyncTestService.invokeAsyncTest04(); String str = future.get(); System.out.println(str); } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @Async @Override public FutureinvokeAsyncTest04() { System.out.println(Thread.currentThread() + "运行了invokeAsyncTest04方法!"); return new AsyncResult<>("Hello World!"); } }
Future为异步任务调用的结果
ListenableFuture继承了Future,所以也为异步任务调用的结果,但是ListenableFuture还阔以添加两个回调函数,成功回调和异常回调
CompletableFuture也继承了Future,所以也为异步任务调用的结果,但是CompletableFuture阔以对异步任务进行编排
此时,如果异步任务在执行时抛出异常时,异常先会存储在Future中并记录状态,当正真调用future.get()等获取结果函数时才会抛出异常。
@RestController @RequestMapping("/testasync") public class TestAsyncController { @Autowired private AsyncTestService asyncTestService; @GetMapping("/test04") public void test04() throws ExecutionException, InterruptedException { Futurefuture = asyncTestService.invokeAsyncTest04(); //此时当当前线程获取结果时 才会抛出异常 String str = future.get(); System.out.println(str); } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @Async @Override public FutureinvokeAsyncTest04() { System.out.println(Thread.currentThread() + "运行了invokeAsyncTest04方法!"); if(true){ throw new IllegalArgumentException("Hello sendEmailExecutor Exception!"); } return new AsyncResult<>("Hello World!"); } }
返回类型非Future时,任务发生异常将会调用异常处理器处理异常。异常处理器阔以AsyncConfigurer 实现类的getAsyncUncaughtExceptionHandler方法手动设置,如果未设置异常处理器,系统将会给你创建一个默认的SimpleAsyncUncaughtExceptionHandler异常处理器,此默认异常处理器异常处理器只对异常进行了日志输出
@Slf4j @Component public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); taskExecutor.setThreadNamePrefix("CustomAsync-Test-"); return taskExecutor; } /** * 当异步任务调用出现时将会调用此异常处理器 可在此记录日志,补偿机制等 **/ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> { System.err.println("Unexpected exception occurred invoking async method: " + method + ":" + ex.getMessage()); }; } }
@RestController @RequestMapping("/testasync") public class TestAsyncController { @Autowired private AsyncTestService asyncTestService; @GetMapping("/test06") public void test06() { asyncTestService.invokeAsyncTest06(); } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @Async @Override public void invokeAsyncTest06() { System.out.println(Thread.currentThread() + "运行了invokeAsyncTest06方法!"); throw new IllegalArgumentException("Hello Exception!"); } }
博主通过源码发现,异常处理器只能设置一个,且后续所有@Async使用的线程池全都只有走我们设置的默认异常处理器,如果我们根据业务划分了线程池,不同线程池的异常想走不同的处理逻辑,就只有在我们手动设置的异常处理器中进行逻辑判断,非常的不优雅。
扩展@Async注解,添加@JokerAsync继承@Async,添加exceptionHandler属性
@Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @Async public @interface JokerAsync { @AliasFor(annotation = Async.class) String value() default ""; String exceptionHandler() default ""; }
把AsyncConfigurer 实现类getAsyncUncaughtExceptionHandler方法设置一个自定义异常处理器,此处理器读取异常方法@Async的exceptionHandler属性值,然后获取到容器中名为exceptionHandler属性值的异常处理器
@Slf4j @Component public class AsyncConfig implements AsyncConfigurer { @Autowired(required = false) private MapexceptionHandlerMap = new HashMap<>(); private final AsyncUncaughtExceptionHandler defaultExceptionHandler = new SimpleAsyncUncaughtExceptionHandler(); @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> { String qualifier = getExceptionHandlerQualifier(method); AsyncUncaughtExceptionHandler exceptionHandler = null; if (Objects.nonNull(qualifier) && qualifier.length() > 0) { exceptionHandler = exceptionHandlerMap.get(qualifier); } if (Objects.isNull(exceptionHandler)) { exceptionHandler = defaultExceptionHandler; } exceptionHandler.handleUncaughtException(ex, method, params); }; } protected String getExceptionHandlerQualifier(Method method) { JokerAsync async = AnnotatedElementUtils.findMergedAnnotation(method, JokerAsync.class); if (async == null) { async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), JokerAsync.class); } return (async != null ? async.exceptionHandler() : null); } }
测试示例代码
@Slf4j @Component public class DeleteFileAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.error("DeleteFileAsyncUncaughtExceptionHandler Unexpected exception occurred invoking async method: " + method, ex); } }
@Slf4j @Component public class SendFileAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.error("SendFileAsyncUncaughtExceptionHandler Unexpected exception occurred invoking async method: " + method, ex); } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @JokerAsync(exceptionHandler = "deleteFileAsyncUncaughtExceptionHandler") @Override public void deleteFile() { System.out.println(Thread.currentThread() + "运行了deleteFile方法!"); throw new IllegalArgumentException("Hello deleteFileExecutor Exception!"); } @JokerAsync(exceptionHandler = "sendFileAsyncUncaughtExceptionHandler") @Override public void sendEmail() { System.out.println(Thread.currentThread() + "运行了sendEmail方法!"); throw new IllegalArgumentException("Hello sendEmailExecutor Exception!"); } }
@RestController @RequestMapping("/testasync") public class TestAsyncController { @Autowired private AsyncTestService asyncTestService; @GetMapping("/sendEmail") public void sendEmail() { asyncTestService.sendEmail(); } @GetMapping("/deleteFile") public void deleteFile() { asyncTestService.deleteFile(); } }
结果如下:不同的业务走不同的异常处理器
首先咱们从@EnableAsync入口开始看起
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented //使用@Import 导入AsyncConfigurationSelector类到容器中 @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { //自定义异步注解 Class extends Annotation> annotation() default Annotation.class; //JDK代理 还是 CGLIB代理 boolean proxyTargetClass() default false; AdviceMode mode() default AdviceMode.PROXY; int order() default Ordered.LOWEST_PRECEDENCE; }
注意使用@Import注解导入的一般会实现ImportSelector 接口,则ImportSelector 中的selectImports方法返回的类的完全限定名数组中的类会被加入到容器中;如果是实现了ImportBeanDefinitionRegistrar接口,则会调用registerBeanDefinitions的方法
public interface ImportSelector { String[] selectImports(AnnotationMetadata importingClassMetadata); @Nullable default PredicategetExclusionFilter() { return null; } } public interface ImportBeanDefinitionRegistrar { default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) { registerBeanDefinitions(importingClassMetadata, registry); } default void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { } }
继续看@EnableAsync使用@Import导入的AsyncConfigurationSelector类
public class AsyncConfigurationSelector extends AdviceModeImportSelector{ private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, * respectively. */ @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { //@EnableAsync mode属性默认为AdviceMode.PROXY case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } } }
看哈AsyncConfigurationSelector的父类AdviceModeImportSelector
/** * 由于该类实现ImportSelector接口 所以会调用selectImports方法 **/ public abstract class AdviceModeImportSelector implements ImportSelector { public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode"; protected String getAdviceModeAttributeName() { return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME; } //importingClassMetadata 是加了@Import注解的类的元信息 @Override public final String[] selectImports(AnnotationMetadata importingClassMetadata) { Class> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class); Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector"); AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (attributes == null) { throw new IllegalArgumentException(String.format( "@%s is not present on importing class '%s' as expected", annType.getSimpleName(), importingClassMetadata.getClassName())); } //得到加了@Import注解类上的mode属性值 AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName()); //模板方法 调用子类实现的selectImports方法得到需要导入到Spring容器中的类的 String[] imports = selectImports(adviceMode); if (imports == null) { throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode); } return imports; } @Nullable protected abstract String[] selectImports(AdviceMode adviceMode); }
由于@EnableAsync mode属性默认为AdviceMode.PROXY ,所以ProxyAsyncConfiguration类将会导入容器继续点进去看
@Configuration(proxyBeanMethods = false) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { //把异步后置处理器放入容器中 @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); //异步后置处理器 AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); //把线程池和异常处理器放到后置处理器中 bpp.configure(this.executor, this.exceptionHandler); //得到@EnableAsync中annotation的注解 Class extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); //自定义注解不等于默认值时 把自定义异步注解放入后置处理器中 if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } //设置动态代理方式 bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.getNumber("order")); return bpp; } }
看哈ProxyAsyncConfiguration 的父类AbstractAsyncConfiguration
@Configuration(proxyBeanMethods = false) public abstract class AbstractAsyncConfiguration implements ImportAware { @Nullable protected AnnotationAttributes enableAsync; @Nullable protected Supplierexecutor; @Nullable protected Supplier exceptionHandler; //importMetadata 是加了@Import注解的类的元信息 @Override public void setImportMetadata(AnnotationMetadata importMetadata) { //@EnableAsync的注解属性设置给enableAsync属性 this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } /** * 配置默认线程池 默认异常处理器 **/ @Autowired(required = false) void setConfigurers(Collection configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } } public interface AsyncConfigurer { //配置异步线程池 @Nullable default Executor getAsyncExecutor() { return null; } //配置异步异常处理器 @Nullable default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
上述代码表明 把@EnableAsync注解的属性解析了设置到了AsyncAnnotationBeanPostProcessor后置处理器中,还有AsyncConfigurer配置的线程池和异常处理器也设置到了后置处理中,现在我们继续看AsyncAnnotationBeanPostProcessor后置处理器的代码
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor { public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME; //默认线程池 @Nullable private Supplierexecutor; //异常处理器 @Nullable private Supplier exceptionHandler; //异步注解 @Nullable private Class extends Annotation> asyncAnnotationType; public AsyncAnnotationBeanPostProcessor() { setBeforeExistingAdvisors(true); } public void configure( @Nullable Supplier executor, @Nullable Supplier exceptionHandler) { this.executor = executor; this.exceptionHandler = exceptionHandler; } public void setExecutor(Executor executor) { this.executor = SingletonSupplier.of(executor); } public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) { this.exceptionHandler = SingletonSupplier.of(exceptionHandler); } public void setAsyncAnnotationType(Class extends Annotation> asyncAnnotationType) { Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null"); this.asyncAnnotationType = asyncAnnotationType; } /** * 由于父类实现了BeanFactoryAware接口 在实例初始化时会被调用 **/ @Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); /** * Advice:通知,标识逻辑织入的位置(增强代码调用的地方)。 * PointCut:切入点,标识对什么方法进入代理(判断哪个方法能被增强); * Advisor:通知器,是通知与切入点的集合(一般里面持有一个Advice和一个PointCut,用来标识一个切面增强)。 **/ //我们阔以看到此处创建了一个通知器 把线程池和异常处理器传进去 AsyncAnnotation advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } //把类工厂传入通知器中 advisor.setBeanFactory(beanFactory); //把通知器赋给本类的成员变量 this.advisor = advisor; } }
上诉代码主要是把增强的advisor 类创建好并复制给了本类成员变量,
下面我们继续看此类的父类AbstractAdvisingBeanPostProcessor,应为此类实现了BeanPostProcessor 接口,所以初始化完后肯定会调用postProcessAfterInitialization方法
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor { @Nullable protected Advisor advisor; protected boolean beforeExistingAdvisors = false; private final Map, Boolean> eligibleBeans = new ConcurrentHashMap<>(256); public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) { this.beforeExistingAdvisors = beforeExistingAdvisors; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (this.advisor == null || bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } //如果被代理过 直接把Advisor加入到代理里中的Advisor列表中 if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { // Add our local Advisor to the existing proxy's Advisor chain... if (this.beforeExistingAdvisors) { advised.addAdvisor(0, this.advisor); } else { advised.addAdvisor(this.advisor); } return bean; } } //如果没被代理过但是需要被代理的类 创建代理并直接加入到增强Advisor加入的Advisor列表中,并返回代理类 if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); // Use original ClassLoader if bean class not locally loaded in overriding class loader ClassLoader classLoader = getProxyClassLoader(); if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) { classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader(); } return proxyFactory.getProxy(classLoader); } // No proxy needed. return bean; } protected boolean isEligible(Object bean, String beanName) { return isEligible(bean.getClass()); } //判断此类是否需要代理 protected boolean isEligible(Class> targetClass) { Boolean eligible = this.eligibleBeans.get(targetClass); if (eligible != null) { return eligible; } if (this.advisor == null) { return false; } eligible = AopUtils.canApply(this.advisor, targetClass); this.eligibleBeans.put(targetClass, eligible); return eligible; } protected ProxyFactory prepareProxyFactory(Object bean, String beanName) { ProxyFactory proxyFactory = new ProxyFactory(); proxyFactory.copyFrom(this); proxyFactory.setTarget(bean); return proxyFactory; } protected void customizeProxyFactory(ProxyFactory proxyFactory) { } }
上述代码可以知道,只是把增强的advisor 放入代理类中,所以我们只需要看advisor 中的增强方法就知道增强的代码逻辑。我们来看advisor 成员的实现类AsyncAnnotationAdvisor,而AsyncAnnotationAdvisor是Advisor的实现类。而Advisor实现类一般会包含一般里面持有一个Advice和一个PointCut类,而Advice的子类MethodInterceptor的invoke方法就是代理的主要增强代码实现的地方
* Advice:通知,标识逻辑织入的位置(增强代码调用的地方)。 * PointCut:切入点,标识对什么方法进入代理(判断哪个方法能被增强); * Advisor:通知器,是通知与切入点的集合(一般里面持有一个Advice和一个PointCut,用来标识一个切面增强)。
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware { private Advice advice; private Pointcut pointcut; public AsyncAnnotationAdvisor() { this((Supplier) null, (Supplier ) null); } public AsyncAnnotationAdvisor( @Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) { this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler)); } @SuppressWarnings("unchecked") public AsyncAnnotationAdvisor( @Nullable Supplier executor, @Nullable Supplier exceptionHandler) { Set > asyncAnnotationTypes = new LinkedHashSet<>(2); asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } //通知实现 this.advice = buildAdvice(executor, exceptionHandler); //切入点实现 this.pointcut = buildPointcut(asyncAnnotationTypes); } public void setAsyncAnnotationType(Class extends Annotation> asyncAnnotationType) { Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null"); Set > asyncAnnotationTypes = new HashSet<>(); asyncAnnotationTypes.add(asyncAnnotationType); this.pointcut = buildPointcut(asyncAnnotationTypes); } @Override public void setBeanFactory(BeanFactory beanFactory) { if (this.advice instanceof BeanFactoryAware) { ((BeanFactoryAware) this.advice).setBeanFactory(beanFactory); } } @Override public Advice getAdvice() { return this.advice; } @Override public Pointcut getPointcut() { return this.pointcut; } /** * 通知的实现类 **/ protected Advice buildAdvice( @Nullable Supplier executor, @Nullable Supplier exceptionHandler) { //核心通知类 AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; } protected Pointcut buildPointcut(Set > asyncAnnotationTypes) { ComposablePointcut result = null; for (Class extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); } }
上面代码可以知道核心通知的实现类是AnnotationAsyncExecutionInterceptor,那就继续AnnotationAsyncExecutionInterceptor代码
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor { public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) { super(defaultExecutor); } public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) { super(defaultExecutor, exceptionHandler); } @Override @Nullable protected String getExecutorQualifier(Method method) { // Maintainer's note: changes made here should also be made in // AnnotationAsyncExecutionAspect#getExecutorQualifier Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class); if (async == null) { async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class); } return (async != null ? async.value() : null); } }
没有看到我们需要的invoke方法,继续看父类AsyncExecutionInterceptor
public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered { public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) { super(defaultExecutor); } public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) { super(defaultExecutor, exceptionHandler); } @Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { Class> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); //通过方法上的@Async注解里的value参数 value参数就是线程池Executor放入Spring容器的名称 ******** AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } //把任务调用封装成callable方法 **************** Callable
我们会发现获取线程池方法和正真调用方法的doSubmit方法都是在父类AsyncExecutionAspectSupport中,继续看AsyncExecutionAspectSupport代码
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware { public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor"; protected final Log logger = LogFactory.getLog(getClass()); private final Mapexecutors = new ConcurrentHashMap<>(16); private SingletonSupplier defaultExecutor; private SingletonSupplier exceptionHandler; @Nullable private BeanFactory beanFactory; public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) { //默认线程池 this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); //默认异常处理器 this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new); } public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) { //默认线程池 this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); //默认异常处理器 this.exceptionHandler = SingletonSupplier.of(exceptionHandler); } public void configure(@Nullable Supplier defaultExecutor, @Nullable Supplier exceptionHandler) { //默认线程池 this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory)); //默认异常处理器 this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new); } public void setExecutor(Executor defaultExecutor) { this.defaultExecutor = SingletonSupplier.of(defaultExecutor); } public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) { this.exceptionHandler = SingletonSupplier.of(exceptionHandler); } @Override public void setBeanFactory(BeanFactory beanFactory) { this.beanFactory = beanFactory; } /** * 获取@Async注释方法使用的线程池 **/ @Nullable protected AsyncTaskExecutor determineAsyncExecutor(Method method) { //先从缓存中取 AsyncTaskExecutor executor = this.executors.get(method); //没有在从容器中找 if (executor == null) { Executor targetExecutor; //得到此方法中@Async属性value的值 即 容器中线程池的Bean名称 String qualifier = getExecutorQualifier(method); //如果设置了value值 就从容器中获取 if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } //如果没有设置value值 就获取AsyncConfigurer配置的默认线程池 如果没有就从容器中获取TaskExecutor的实现类,如果有多个TaskExecutor实现类,就取容器bean名称为“taskExecutor”的容Bean类 else { targetExecutor = this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); //放入缓存中 this.executors.put(method, executor); } return executor; } @Nullable protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { if (beanFactory != null) { try { //先获取容器中TaskExecutor的实现类 return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean. " + "Continuing search for an Executor bean named 'taskExecutor'", ex); try { //如果有多个就取名称DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor容器类 return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } //如果容器中没有TaskExecutor的实现类 取名称DEFAULT_TASK_EXECUTOR_BEAN_NAME的Executor容器类 catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean. " + "Continuing search for an Executor bean named 'taskExecutor'", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } //走完所有都没取到 线程池 那么就返回null 子类中会判断如果返回null 将new出一个默认线程池 return null; } @Nullable protected Object doSubmit(Callable
到此为止,源码已经分析的差不多了,我们阔以得出几个重点:
/最好使用new ThreadPoolExecutor显示创建线程池,SimpleAsyncTaskExecutor没有复用线程
//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke Callable
//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke Callable
注意ListenableFuture.addCallback()添加回调函数时,如果异步任务还未执行完成,则回调函数由异步任务线程执行,如果异步任务已经执行完成,则是当前掉addCallback函数的线程调用回调函数
//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke Callable
//org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke Callable
@RestController @RequestMapping("/testasync") public class TestAsyncController { @Autowired private AsyncTestService asyncTestService; @GetMapping("/test05") public void test05() { asyncTestService.invokeAsyncTest05(); } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @Async @Override public void invokeAsyncTest05() { System.out.println(Thread.currentThread() + "运行了invokeAsyncTest05方法!"); } }
@RestController @RequestMapping("/testasync") public class TestAsyncController { @Autowired private AsyncTestService asyncTestService; @GetMapping("/test07") public void test07() { //永远为null 如果要异步结果 请用Future封装返回结果 Listresult = asyncTestService.invokeAsyncTest07(); System.out.println(result); } }
@Service public class AsyncTestServiceImpl implements AsyncTestService { @Async @Override public ListinvokeAsyncTest07() { System.out.println(Thread.currentThread() + "invokeAsyncTest07!"); List result = Arrays.asList("Hello World1", "Hello World2"); return result; } }