Spring Event运用和完结原理

铿然架构|作者/铿然一叶 这是铿然架构的第115篇原创文章

1. 介绍

事情驱动是一种很常用且重要的规划形式,它经过发布/订阅办法完结了生产者和顾客之间的解耦。

关于事情驱动形式的重视点有:

● 事情行列

是否有事情行列来排队,行列是有界仍是无界?

● 使命行列

是否有使命行列?支撑多大并发?

● 同步/异步

事情处理是同步仍是异步,会不会堵塞?

● 耐久化

排队中的事情是否支撑耐久化,进程挂掉后是否会丢失?

本章咱们将带着这些问题去剖析Spring Event支撑的能力,以及中心完结逻辑。

2. 中心类结构

Spring Event运用和完结原理

描绘
ApplicationEvent 运用事情类,要发布的事情需求承继此类。
ApplicationListener 事情监听接口,完结此接口才干接纳到发布的事情,此为监听事情的办法1。
ApplicationEventPublisher 事情发布者接口,要经过它的子类发布事情。
ApplicationEventPublisherAware 事情发布者aware,完结此接口能够获得事情发布者实例,经过事情发布者实例发布事情。
SimpleApplicationEventMulticaster 体系默许的事情播送者,一切事情都经过它播送出去,这样事情监听者才干收到事情。
ErrorHandler 事情接纳反常处理类,处理“事情监听者收到事情并处理时”抛出的反常。
Executor 事情监听者异步调用接口,经过它的实例异步调用事情监听者要执行的动作,防止堵塞,此为异步调用完结办法1。
AbstractApplicationContext 笼统运用上下文,完结了事情发布者接口,并调用SimpleApplicationEventMulticaster播送事情。
@EventListener 事情监听注解,作用于类的办法上,完结事情监听,此为事情监听办法2
@Async 异步调用注解,作用在@EventListener注解的事情监听办法上,使得能够异步调用,此为异步调用完结办法2。
@EnableAsync 使能异步调用注解,spring中@Async注解润饰的办法,均需求在启动类上运用@EnableAsync注解才有效。
AsyncAnnotationAdvisor 支撑Async注解的Advisor。
AnnotationAsyncExecutionInterceptor @Async注解阻拦器,获取该注解上的Executor bean称号,决定一步调用时运用哪个Executor bean实例。
AsyncExecutionInterceptor 异步处理阻拦器,完结异步调用的中心逻辑。
ThreadPoolTaskExecutor 体系默许的异步调用执行者,假如@Async注解没有指定,则运用它。

3. 验证场景

3.1 根本场景

经过编码办法完结事情通知。

3.1.1 类结构

Spring Event运用和完结原理

描绘
PayWagesEvent 发薪事情
PayWagesNotifier 发薪事情监听者
PayWagesService 发薪事情发布者

3.1.2 代码

3.1.2.1 PayWagesEvent

发薪事情类,需求承继ApplicationEvent类:

public class PayWagesEvent extends ApplicationEvent {
    private PayWages payWages;
    public PayWagesEvent(PayWages source) {
        super(source);
        this.payWages = source;
    }
    public PayWages getPayWages() {
        return payWages;
    }
}

3.1.2.2 PayWagesNotifier

发薪事情监听者,需求完结ApplicationListener接口:

@Component
public class PayWagesNotifier implements ApplicationListener<PayWagesEvent> {
    @Override
    public void onApplicationEvent(PayWagesEvent event) {
        System.out.println("received PayWagesEvent, payWages: " + event.getPayWages().toString());
    }
}

3.1.2.3 PayWagesService

发薪事情发布者,发布发薪事情,需求完结ApplicationEventPublisherAware接口,以获取ApplicationEventPublisher实例来发布事情:

@Service
public class PayWagesService implements ApplicationEventPublisherAware {
    private ApplicationEventPublisher publisher;
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }
    public void exec(int amount) {
        publisher.publishEvent(new PayWagesEvent(new PayWages(amount)));
    }
}

3.1.2.4 PayWagesController

触发服务调用:

@RestController
@RequestMapping(path = "/payWagesController", method = {RequestMethod.GET})
public class PayWagesController {
    @Autowired
    private PayWagesService service;
    @RequestMapping("/exec")
    public void exec() {
        int amount = 100;
        service.exec(amount);
    }
}

3.1.3 验证

建议恳求,打印日志如下:

received PayWagesEvent, payWages: PayWages(amount=100)

事情发布和监听处理成功。

3.2 同步逻辑验证

验证默许事情处理是同步仍是异步。

3.2.1 代码

3.2.1.1 PayWagesNotifierBlock

模拟收到事情后的耗时处理,休眠3秒:

@Component
public class PayWagesNotifierBlock implements ApplicationListener<PayWagesEventBlock> {
    @Override
    public void onApplicationEvent(PayWagesEventBlock event) {
        System.out.println("received PayWagesEventBlock, payWages: " + event.getPayWages().toString());
        pause(3);
    }
    private void pause(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

3.2.1.2 PayWagesService

循环发布屡次事情,并记录耗时时间,验证是否会堵塞:

@Service
public class PayWagesService implements ApplicationEventPublisherAware {
    public void blockExec(int amount) {
        long start = System.nanoTime();
        for (int i = 0; i < 3; i++) {
            System.out.println("publish Event PayWagesEventBlock: " + i + ", amount: " + amount);
            publisher.publishEvent(new PayWagesEventBlock(new PayWages(amount)));
            long costTime = (System.nanoTime() - start) / 1000 / 1000 / 1000;
            System.out.println("cost time: " + costTime);
        }
    }
}

3.2.2 验证

建议恳求,打印日志如下:

publish Event PayWagesEventBlock: 0, amount: 500
received PayWagesEventBlock, payWages: PayWages(amount=500)
cost time: 3
publish Event PayWagesEventBlock: 1, amount: 500
received PayWagesEventBlock, payWages: PayWages(amount=500)
cost time: 6
publish Event PayWagesEventBlock: 2, amount: 500
received PayWagesEventBlock, payWages: PayWages(amount=500)
cost time: 9

能够看到每次事情发布间隔3秒,即下一次事情发布要等上一次已发布事情处理完,阐明默许多个事情之间会同步等待,而在实际事务中同步堵塞处理办法或许不是咱们想要的,需求改成异步处理办法。

3.2.3 同步处理原因剖析

SimpleApplicationEventMulticaster类中播送事情处理时,判别了是否存在Executor实例,存在就异步处理,否则同步处理:

	@Override
	public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
		ResolvableType type = (eventType != null ? eventType : ResolvableType.forInstance(event));
		Executor executor = getTaskExecutor();
		for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
			// 异步处理
			if (executor != null && listener.supportsAsyncExecution()) {
				executor.execute(() -> invokeListener(listener, event));
			}
			else {
				// 同步处理
				invokeListener(listener, event);
			}
		}
	}

而在AbstractApplicationContext中初始化事情播送处理者时会先判别是否有定制的事情播送处理者,假如没有则会创立SimpleApplicationEventMulticaster实例,但此时不会传入Executor实例,所以默许是同步形式:

	protected void initApplicationEventMulticaster() {
		ConfigurableListableBeanFactory beanFactory = getBeanFactory();
		if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
			this.applicationEventMulticaster =
					beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
			if (logger.isTraceEnabled()) {
				logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
			}
		}
		else {
			// 创立实例,没有传入Executor和ErrorHandler
			this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
			beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
			if (logger.isTraceEnabled()) {
				logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
						"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
			}
		}
	}

3.3 定制事情播送类完结异步处理

上个场景验证了同步处理逻辑,并解说了其原理,这回就依照该原理修改为异步处理办法。

3.3.1 代码

3.3.1.1 MulticasterConfigure

定制事情播送类,bean的name必须设置为spring默许的事情播送bean称号,及“applicationEventMulticaster”,一起注入一个异步使命执行者,以完结异步处理:

@Configuration
public class MulticasterConfigure {
    @Bean(name = "applicationEventMulticaster") // bean称号必须是这个,这是默许称号
    public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster() {
        SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
        // 增加异步使命执行者,即可完结异步调用
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        eventMulticaster.setTaskExecutor(taskExecutor);
        return eventMulticaster;
    }
}

除此之外,不用修改其他任何类。

3.3.2 验证

参阅前面3.2同步堵塞的例子,再次建议恳求,输出日志如下:

publish Event PayWagesEventBlock: 0, amount: 500
cost time: 0
publish Event PayWagesEventBlock: 1, amount: 500
cost time: 0
publish Event PayWagesEventBlock: 2, amount: 500
cost time: 0
received PayWagesEventBlock, payWages: PayWages(amount=500)
received PayWagesEventBlock, payWages: PayWages(amount=500)
received PayWagesEventBlock, payWages: PayWages(amount=500)

能够看到,屡次发布事情之间耗时为0,没有堵塞,验证经过。

3.4 注解+异步

前面都是经过编码办法完结事情监听和异步处理,本场景验证经过注解办法来完结。

3.4.1 代码

3.4.1.1 PayWagesNotifierAsync

在办法上加了Async和EventListener注解,前者表明异步执行,后者表明监听事情,监听的事情为办法输入参数:

@Component
public class PayWagesNotifierAsync {
    @EventListener
    @Async
    public void onEvent(PayWagesEventAsync event) {
        System.out.println("received async PayWagesEventAsync, payWages: " + event.getPayWages().toString());
    }
}

3.4.1.2 启动类

启动类上必须加上EnableAsync注解,前面的Async注解才会起作用:

@EnableAsync
@SpringBootApplication
public class StartupApplication {
    public static void main(String[] args) {
        SpringApplication.run(StartupApplication.class, args);
    }
}

3.4.2 验证

建议恳求,打印日志如下:

publish Event PayWagesEventAsync: 0, amount: 300
cost time: 0
publish Event PayWagesEventAsync: 1, amount: 300
cost time: 0
publish Event PayWagesEventAsync: 2, amount: 300
cost time: 0
received PayWagesEventAsync, payWages: PayWages(amount=300)
received PayWagesEventAsync, payWages: PayWages(amount=300)
received PayWagesEventAsync, payWages: PayWages(amount=300)

能够看到屡次发布事情之间耗时为0,没有堵塞,验证经过。

3.4.3 原理

增加Async注解后会被AsyncExecutionInterceptor阻拦:

Spring Event运用和完结原理
别的能够看到此时Executor是ThreadPoolTaskExecutor。

接着在AsyncExecutionAspectSupport类中异步提交事情处理使命:

Spring Event运用和完结原理
经过上述操作完结了异步处理。

所以,编码办法和注解办法的异步处理是两个不同的处理流程。

4. 中心逻辑

需求重视的中心逻辑包括如下几点:

● 事情播送者的创立和处理逻辑

● 事情监听者注册逻辑

● 事情发布逻辑

● Async注解的办法如何被异步调用

4.1 事情播送者

4.1.1 实例化

AbstractApplicationContext.java

先经过beanFactory获取“applicationEventMulticaster”,获取不到则new一个默许的SimpleApplicationEventMulticaster实例,new的时候没有传入Executor和ErrorHandler。

	protected void initApplicationEventMulticaster() {
		ConfigurableListableBeanFactory beanFactory = getBeanFactory();
		// 先经过beanFactory获取
		if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
			this.applicationEventMulticaster =
					beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
			if (logger.isTraceEnabled()) {
				logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
			}
		}
		else {
			// 创立实例,没有传入Executor和ErrorHandler
			this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
			beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
			if (logger.isTraceEnabled()) {
				logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
						"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
			}
		}
	}

因而,正如前面的示例,能够定制完结事情播送者并传入定制的Executor和ErrorHandler完结类。

4.1.2 事情播送处理

SimpleApplicationEventMulticaster.java

前面例子也说到了,处理时假如存在Executor且listener支撑异步处理则异步调用,否则是同步调用:

	public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
		ResolvableType type = (eventType != null ? eventType : ResolvableType.forInstance(event));
		Executor executor = getTaskExecutor();
		for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
			// 异步处理
			if (executor != null && listener.supportsAsyncExecution()) {
				executor.execute(() -> invokeListener(listener, event));
			}
			else {
				// 同步处理
				invokeListener(listener, event);
			}
		}
	}

ApplicationListener接口中默许支撑异步处理,因而只需完结类不重载接口办法都支撑异步处理:

	default boolean supportsAsyncExecution() {
		return true;
	}

4.2 事情监听者注册逻辑

4.2.1 相关类

中心类结构如下:

Spring Event运用和完结原理

描绘
ApplicationListenerDetector 辨认出完结了ApplicationListener的listener bean,经过AbstractApplicationContext增加到SimpleApplicationEventMulticaster中。
EventListenerMethodProcessor 辨认EventListener注解的办法,经过DefaultEventListenerFactory创立ApplicationListenerMethodAdapter,增加到SimpleApplicationEventMulticaster中。
DefaultEventListenerFactory ApplicationListener创立工厂,将EventListener注解的办法生成ApplicationListenerMethodAdapter类。
ApplicationListenerMethodAdapter 依据EventListener注解的办法生成的类,记录了EventListener注解的类和办法等信息,完结了ApplicationListener接口。
SimpleApplicationEventMulticaster 事情播送者,将事情播送出去,让事情监听者处理。

4.2.2 代码

4.2.2.1 ApplicationListenerDetector

完结了BeanPostProcessor接口,在bean初始化完结的后处理中,辨认出ApplicationListener实例的bean,增加到applicationContext中:

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (bean instanceof ApplicationListener<?> applicationListener) {
			// potentially not detected as a listener by getBeanNamesForType retrieval
			Boolean flag = this.singletonNames.get(beanName);
			if (Boolean.TRUE.equals(flag)) {
				// singleton bean (top-level or inner): register on the fly
				this.applicationContext.addApplicationListener(applicationListener);
			}
			else if (Boolean.FALSE.equals(flag)) {
				if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) {
					// inner bean with other scope - can't reliably process events
					logger.warn("Inner bean '" + beanName + "' implements ApplicationListener interface " +
							"but is not reachable for event multicasting by its containing ApplicationContext " +
							"because it does not have singleton scope. Only top-level listener beans are allowed " +
							"to be of non-singleton scope.");
				}
				this.singletonNames.remove(beanName);
			}
		}
		return bean;
	}

4.2.2.2 AbstractApplicationContext

增加listener到SimpleApplicationEventMulticaster:

	public void addApplicationListener(ApplicationListener<?> listener) {
		Assert.notNull(listener, "ApplicationListener must not be null");
		if (this.applicationEventMulticaster != null) {
			this.applicationEventMulticaster.addApplicationListener(listener);
		}
		this.applicationListeners.add(listener);
	}

4.2.2.3 AbstractApplicationEventMulticaster

将listener增加到调集中:

	public void addApplicationListener(ApplicationListener<?> listener) {
		synchronized (this.defaultRetriever) {
			// Explicitly remove target for a proxy, if registered already,
			// in order to avoid double invocations of the same listener.
			Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
			if (singletonTarget instanceof ApplicationListener) {
				this.defaultRetriever.applicationListeners.remove(singletonTarget);
			}
			this.defaultRetriever.applicationListeners.add(listener);
			this.retrieverCache.clear();
		}
	}

4.2.2.4 EventListenerMethodProcessor

完结BeanPostProcessor接口,在初始化后处理时获取ApplicationListener bean工厂:

	@Override
	public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
		this.beanFactory = beanFactory;
		Map<String, EventListenerFactory> beans = beanFactory.getBeansOfType(EventListenerFactory.class, false, false);
		List<EventListenerFactory> factories = new ArrayList<>(beans.values());
		AnnotationAwareOrderComparator.sort(factories);
		this.eventListenerFactories = factories;
	}

满意条件的完结类只要一个,即DefaultEventListenerFactory。

别的还完结了SmartInitializingSingleton接口,在afterSingletonsInstantiated办法中经过前面获取到的DefaultEventListenerFactory创立ApplicationListenerMethodAdapter:

	private void processBean(final String beanName, final Class<?> targetType) {
		if (!this.nonAnnotatedClasses.contains(targetType) &&
				AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&
				!isSpringContainerClass(targetType)) {
			Map<Method, EventListener> annotatedMethods = null;
			try {
				annotatedMethods = MethodIntrospector.selectMethods(targetType,
						(MethodIntrospector.MetadataLookup<EventListener>) method ->
								AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
			}
			catch (Throwable ex) {
				// An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
				if (logger.isDebugEnabled()) {
					logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
				}
			}
			if (CollectionUtils.isEmpty(annotatedMethods)) {
				this.nonAnnotatedClasses.add(targetType);
				if (logger.isTraceEnabled()) {
					logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
				}
			}
			else {
				// Non-empty set of methods
				ConfigurableApplicationContext context = this.applicationContext;
				Assert.state(context != null, "No ApplicationContext set");
				List<EventListenerFactory> factories = this.eventListenerFactories;
				Assert.state(factories != null, "EventListenerFactory List not initialized");
				// 有EventListener注解的办法,
				for (Method method : annotatedMethods.keySet()) {
					for (EventListenerFactory factory : factories) {
						if (factory.supportsMethod(method)) {
							Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
							ApplicationListener<?> applicationListener =
									factory.createApplicationListener(beanName, targetType, methodToUse);
							if (applicationListener instanceof ApplicationListenerMethodAdapter alma) {
								alma.init(context, this.evaluator);
							}
							context.addApplicationListener(applicationListener);
							break;
						}
					}
				}
				if (logger.isDebugEnabled()) {
					logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
							beanName + "': " + annotatedMethods);
				}
			}
		}
	}

4.2.2.5 DefaultEventListenerFactory

创立ApplicationListenerMethodAdapter,逻辑比较简单:

	@Override
	public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
		return new ApplicationListenerMethodAdapter(beanName, type, method);
	}

4.2.2.6 ApplicationListenerMethodAdapter

有了beanName,method信息,经过反射来调用EventListener注解的事情监听办法:

	public void onApplicationEvent(ApplicationEvent event) {
		processEvent(event);
	}
	// 先获取事情参数
	public void processEvent(ApplicationEvent event) {
		Object[] args = resolveArguments(event);
		if (shouldHandle(event, args)) {
			Object result = doInvoke(args);
			if (result != null) {
				handleResult(result);
			}
			else {
				logger.trace("No result object given - no result to handle");
			}
		}
	}
    // 接着经过java反射调用
	protected Object doInvoke(Object... args) {
		Object bean = getTargetBean();
		// Detect package-protected NullBean instance through equals(null) check
		if (bean.equals(null)) {
			return null;
		}
		ReflectionUtils.makeAccessible(this.method);
		try {
			if (KotlinDetector.isSuspendingFunction(this.method)) {
				return CoroutinesUtils.invokeSuspendingFunction(this.method, bean, args);
			}
			return this.method.invoke(bean, args);
		}
	// ......
	}

4.3 事情发布

事情发布在最开始的类结构中现已说到,回顾一下:

Spring Event运用和完结原理

事情发布者完结类完结ApplicationEventPublisherAware接口,经过该接口注入AbstractApplicationContext的实例。

然后经过这个实例目标发布事情给SimpleApplicationEventMulticaster,由SimpleApplicationEventMulticaster调用事情监听者完结类,并将发布的事情传递给监听者处理。

这段逻辑比较简单,代码不再赘述。

4.4 Async注解异步调用逻辑

Async注解不只仅用于spring event,体系内的其他异步处理逻辑也运用此注解。

4.4.1 中心类结构

Async注解的异步处理中心类结构如下:

Spring Event运用和完结原理

描绘
AsyncConfigurer 异步配置接口,用于回来提交异步处理使命的Executor和异步处理反常类,能够定制完结类回来需求的处理类。
AbstractAsyncConfiguration 异步配置笼统类,注入AsyncConfigurer,获取它的成员,别的会校验EnableAsync注解,前面也说到了Async注解必须在EnableAsync注解启用前提下才会生效。
ProxyAsyncConfiguration 注册AsyncAnnotationBeanPostProcessor。
AsyncAnnotationBeanPostProcessor 将AsyncAnnotationAdvisor增加到署理工厂,这样Async注解的类和办法都能被署理处理。
AsyncAnnotationAdvisor 支撑Async注解的Advisor。
AnnotationAsyncExecutionInterceptor Async注解阻拦器,会获取Async注解的参数,以得到约束的Executor完结类。
AsyncExecutionInterceptor 异步处理阻拦器。
AsyncExecutionAspectSupport 异步处理支撑类,阻拦器的父类,提供公共办法。

4.4.2 关键代码

4.4.2.1 AbstractAsyncConfiguration

主动注入AsyncConfigurer:

	@Autowired
	void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
		Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
			List<AsyncConfigurer> candidates = configurers.stream().toList();
			if (CollectionUtils.isEmpty(candidates)) {
				return null;
			}
			if (candidates.size() > 1) {
				throw new IllegalStateException("Only one AsyncConfigurer may exist");
			}
			return candidates.get(0);
		});
		this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
		this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
	}

4.4.2.2 AsyncAnnotationAdvisor

初始化,创立advice和pointcut,支撑多个异步处理注解:

public AsyncAnnotationAdvisor(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
		asyncAnnotationTypes.add(Async.class);
		ClassLoader classLoader = AsyncAnnotationAdvisor.class.getClassLoader();
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("jakarta.ejb.Asynchronous", classLoader));
		}
		catch (ClassNotFoundException ex) {
			// If EJB API not present, simply ignore.
		}
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("jakarta.enterprise.concurrent.Asynchronous", classLoader));
		}
		catch (ClassNotFoundException ex) {
			// If Jakarta Concurrent API not present, simply ignore.
		}
		this.advice = buildAdvice(executor, exceptionHandler);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}

4.4.2.3 AsyncExecutionInterceptor

阻拦处理,使命被异步调用:

	public Object invoke(final MethodInvocation invocation) throws Throwable {
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}
		Callable<Object> task = () -> {
			try {
				Object result = invocation.proceed();
				if (result instanceof Future<?> future) {
					return future.get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};
		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}

4.4.2.4 AsyncExecutionAspectSupport

获取Executor:

	protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
		AsyncTaskExecutor executor = this.executors.get(method);
		if (executor == null) {
			Executor targetExecutor;
			String qualifier = getExecutorQualifier(method);
			if (this.embeddedValueResolver != null && StringUtils.hasLength(qualifier)) {
				qualifier = this.embeddedValueResolver.resolveStringValue(qualifier);
			}
			if (StringUtils.hasLength(qualifier)) {
				targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
			}
			else {
				targetExecutor = this.defaultExecutor.get();
			}
			if (targetExecutor == null) {
				return null;
			}
			executor = (targetExecutor instanceof AsyncTaskExecutor asyncTaskExecutor ?
					asyncTaskExecutor : new TaskExecutorAdapter(targetExecutor));
			this.executors.put(method, executor);
		}
		return executor;
	}

提交异步使命:

	protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
		if (CompletableFuture.class.isAssignableFrom(returnType)) {
			return executor.submitCompletable(task);
		}
		else if (org.springframework.util.concurrent.ListenableFuture.class.isAssignableFrom(returnType)) {
			return ((org.springframework.core.task.AsyncListenableTaskExecutor) executor).submitListenable(task);
		}
		else if (Future.class.isAssignableFrom(returnType)) {
			return executor.submit(task);
		}
		else if (void.class == returnType) {
			executor.submit(task);
			return null;
		}
		else {
			throw new IllegalArgumentException(
					"Invalid return type for async method (only Future and void supported): " + returnType);
		}
	}

5. 总结

5.1 问题回顾

开篇说到的几个问题,经过前面的验证和剖析,得到的结论如下:

问题 描绘
事情行列 不支撑,事情直接提交执行。
使命行列 支撑,异步形式下取决于运用的线程池目标。
同步/异步 支撑同步和异步处理。
耐久化 不支撑

5.2 本文小结

本文探讨了Spring Event的具体用法,包括编码和注解两种完结办法。一起深化了解了事情的播送、监听者注册、发布机制以及异步处理的关键原理。这些常识不只加深了咱们对Spring生命周期钩子和阻拦器等底层结构的了解,还为定制化事务完结提供了实用参阅。

综合来看,Spring Event适用于事情驱动的场景,但在运用时需注意以下几点:

● 同步形式或许导致堵塞问题,必要时应考虑切换至异步形式。

● 在自定义SimpleApplicationEventMulticaster进行异步处理时,应确保线程池巨细和使命行列能够满意事务需求。

● 由于事情本身不支撑耐久化,体系宕机或许对事务造成的影响要考虑到。

6. 思考

● Spring Event异步处理下的性能考量

当Spring Event配置为异步形式时,假如一起发布大量事情,或许会对体系性能发生什么样的影响?需求考虑的要素包括线程池的巨细、使命行列的容量以及体系资源的约束。

● Spring Event与开源音讯行列的对比

尽管Spring Event和开源音讯行列(如RabbitMQ, Kafka等)都采用了发布/订阅形式,但它们在完结机制、运用场景和性能方面有何不同?特别是在分布式环境和高可靠性要求下的表现差异。

● 简化进程内事情处理结构的规划

在完结一个仅限于进程内的事情处理结构时,Spring Event涉及的处理类众多。假如是你,怎样简化这个规划,以便更高效地完结事情的发布和处理?


其他阅读:

萌新快速成长之路
如何编写软件规划文档
Spring Cache架构、机制及运用
布隆过滤器适配Spring Cache及问题与处理战略
JAVA编程思维(一)经过依赖注入增加扩展性
JAVA编程思维(二)如何面向接口编程
JAVA编程思维(三)去掉别扭的if,自注册战略形式优雅满意开闭原则
Java编程思维(七)运用组合和承继的场景
JAVA根底(一)简单、透彻了解内部类和静态内部类
JAVA根底(二)内存优化-运用Java引用做缓存
JAVA根底(三)ClassLoader完结热加载
JAVA根底(五)函数式接口-复用,解耦之利刃