铿然架构|作者/铿然一叶 这是铿然架构的第115篇原创文章
1. 介绍
事情驱动是一种很常用且重要的规划形式,它经过发布/订阅办法完结了生产者和顾客之间的解耦。
关于事情驱动形式的重视点有:
● 事情行列
是否有事情行列来排队,行列是有界仍是无界?
● 使命行列
是否有使命行列?支撑多大并发?
● 同步/异步
事情处理是同步仍是异步,会不会堵塞?
● 耐久化
排队中的事情是否支撑耐久化,进程挂掉后是否会丢失?
本章咱们将带着这些问题去剖析Spring Event支撑的能力,以及中心完结逻辑。
2. 中心类结构
类 | 描绘 |
---|---|
ApplicationEvent | 运用事情类,要发布的事情需求承继此类。 |
ApplicationListener | 事情监听接口,完结此接口才干接纳到发布的事情,此为监听事情的办法1。 |
ApplicationEventPublisher | 事情发布者接口,要经过它的子类发布事情。 |
ApplicationEventPublisherAware | 事情发布者aware,完结此接口能够获得事情发布者实例,经过事情发布者实例发布事情。 |
SimpleApplicationEventMulticaster | 体系默许的事情播送者,一切事情都经过它播送出去,这样事情监听者才干收到事情。 |
ErrorHandler | 事情接纳反常处理类,处理“事情监听者收到事情并处理时”抛出的反常。 |
Executor | 事情监听者异步调用接口,经过它的实例异步调用事情监听者要执行的动作,防止堵塞,此为异步调用完结办法1。 |
AbstractApplicationContext | 笼统运用上下文,完结了事情发布者接口,并调用SimpleApplicationEventMulticaster播送事情。 |
@EventListener | 事情监听注解,作用于类的办法上,完结事情监听,此为事情监听办法2 |
@Async | 异步调用注解,作用在@EventListener注解的事情监听办法上,使得能够异步调用,此为异步调用完结办法2。 |
@EnableAsync | 使能异步调用注解,spring中@Async注解润饰的办法,均需求在启动类上运用@EnableAsync注解才有效。 |
AsyncAnnotationAdvisor | 支撑Async注解的Advisor。 |
AnnotationAsyncExecutionInterceptor | @Async注解阻拦器,获取该注解上的Executor bean称号,决定一步调用时运用哪个Executor bean实例。 |
AsyncExecutionInterceptor | 异步处理阻拦器,完结异步调用的中心逻辑。 |
ThreadPoolTaskExecutor | 体系默许的异步调用执行者,假如@Async注解没有指定,则运用它。 |
3. 验证场景
3.1 根本场景
经过编码办法完结事情通知。
3.1.1 类结构
类 | 描绘 |
---|---|
PayWagesEvent | 发薪事情 |
PayWagesNotifier | 发薪事情监听者 |
PayWagesService | 发薪事情发布者 |
3.1.2 代码
3.1.2.1 PayWagesEvent
发薪事情类,需求承继ApplicationEvent类:
public class PayWagesEvent extends ApplicationEvent {
private PayWages payWages;
public PayWagesEvent(PayWages source) {
super(source);
this.payWages = source;
}
public PayWages getPayWages() {
return payWages;
}
}
3.1.2.2 PayWagesNotifier
发薪事情监听者,需求完结ApplicationListener接口:
@Component
public class PayWagesNotifier implements ApplicationListener<PayWagesEvent> {
@Override
public void onApplicationEvent(PayWagesEvent event) {
System.out.println("received PayWagesEvent, payWages: " + event.getPayWages().toString());
}
}
3.1.2.3 PayWagesService
发薪事情发布者,发布发薪事情,需求完结ApplicationEventPublisherAware接口,以获取ApplicationEventPublisher实例来发布事情:
@Service
public class PayWagesService implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
public void exec(int amount) {
publisher.publishEvent(new PayWagesEvent(new PayWages(amount)));
}
}
3.1.2.4 PayWagesController
触发服务调用:
@RestController
@RequestMapping(path = "/payWagesController", method = {RequestMethod.GET})
public class PayWagesController {
@Autowired
private PayWagesService service;
@RequestMapping("/exec")
public void exec() {
int amount = 100;
service.exec(amount);
}
}
3.1.3 验证
建议恳求,打印日志如下:
received PayWagesEvent, payWages: PayWages(amount=100)
事情发布和监听处理成功。
3.2 同步逻辑验证
验证默许事情处理是同步仍是异步。
3.2.1 代码
3.2.1.1 PayWagesNotifierBlock
模拟收到事情后的耗时处理,休眠3秒:
@Component
public class PayWagesNotifierBlock implements ApplicationListener<PayWagesEventBlock> {
@Override
public void onApplicationEvent(PayWagesEventBlock event) {
System.out.println("received PayWagesEventBlock, payWages: " + event.getPayWages().toString());
pause(3);
}
private void pause(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
3.2.1.2 PayWagesService
循环发布屡次事情,并记录耗时时间,验证是否会堵塞:
@Service
public class PayWagesService implements ApplicationEventPublisherAware {
public void blockExec(int amount) {
long start = System.nanoTime();
for (int i = 0; i < 3; i++) {
System.out.println("publish Event PayWagesEventBlock: " + i + ", amount: " + amount);
publisher.publishEvent(new PayWagesEventBlock(new PayWages(amount)));
long costTime = (System.nanoTime() - start) / 1000 / 1000 / 1000;
System.out.println("cost time: " + costTime);
}
}
}
3.2.2 验证
建议恳求,打印日志如下:
publish Event PayWagesEventBlock: 0, amount: 500
received PayWagesEventBlock, payWages: PayWages(amount=500)
cost time: 3
publish Event PayWagesEventBlock: 1, amount: 500
received PayWagesEventBlock, payWages: PayWages(amount=500)
cost time: 6
publish Event PayWagesEventBlock: 2, amount: 500
received PayWagesEventBlock, payWages: PayWages(amount=500)
cost time: 9
能够看到每次事情发布间隔3秒,即下一次事情发布要等上一次已发布事情处理完,阐明默许多个事情之间会同步等待,而在实际事务中同步堵塞处理办法或许不是咱们想要的,需求改成异步处理办法。
3.2.3 同步处理原因剖析
SimpleApplicationEventMulticaster类中播送事情处理时,判别了是否存在Executor实例,存在就异步处理,否则同步处理:
@Override
public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : ResolvableType.forInstance(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 异步处理
if (executor != null && listener.supportsAsyncExecution()) {
executor.execute(() -> invokeListener(listener, event));
}
else {
// 同步处理
invokeListener(listener, event);
}
}
}
而在AbstractApplicationContext中初始化事情播送处理者时会先判别是否有定制的事情播送处理者,假如没有则会创立SimpleApplicationEventMulticaster实例,但此时不会传入Executor实例,所以默许是同步形式:
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
// 创立实例,没有传入Executor和ErrorHandler
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
3.3 定制事情播送类完结异步处理
上个场景验证了同步处理逻辑,并解说了其原理,这回就依照该原理修改为异步处理办法。
3.3.1 代码
3.3.1.1 MulticasterConfigure
定制事情播送类,bean的name必须设置为spring默许的事情播送bean称号,及“applicationEventMulticaster”,一起注入一个异步使命执行者,以完结异步处理:
@Configuration
public class MulticasterConfigure {
@Bean(name = "applicationEventMulticaster") // bean称号必须是这个,这是默许称号
public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
// 增加异步使命执行者,即可完结异步调用
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
eventMulticaster.setTaskExecutor(taskExecutor);
return eventMulticaster;
}
}
除此之外,不用修改其他任何类。
3.3.2 验证
参阅前面3.2同步堵塞的例子,再次建议恳求,输出日志如下:
publish Event PayWagesEventBlock: 0, amount: 500
cost time: 0
publish Event PayWagesEventBlock: 1, amount: 500
cost time: 0
publish Event PayWagesEventBlock: 2, amount: 500
cost time: 0
received PayWagesEventBlock, payWages: PayWages(amount=500)
received PayWagesEventBlock, payWages: PayWages(amount=500)
received PayWagesEventBlock, payWages: PayWages(amount=500)
能够看到,屡次发布事情之间耗时为0,没有堵塞,验证经过。
3.4 注解+异步
前面都是经过编码办法完结事情监听和异步处理,本场景验证经过注解办法来完结。
3.4.1 代码
3.4.1.1 PayWagesNotifierAsync
在办法上加了Async和EventListener注解,前者表明异步执行,后者表明监听事情,监听的事情为办法输入参数:
@Component
public class PayWagesNotifierAsync {
@EventListener
@Async
public void onEvent(PayWagesEventAsync event) {
System.out.println("received async PayWagesEventAsync, payWages: " + event.getPayWages().toString());
}
}
3.4.1.2 启动类
启动类上必须加上EnableAsync注解,前面的Async注解才会起作用:
@EnableAsync
@SpringBootApplication
public class StartupApplication {
public static void main(String[] args) {
SpringApplication.run(StartupApplication.class, args);
}
}
3.4.2 验证
建议恳求,打印日志如下:
publish Event PayWagesEventAsync: 0, amount: 300
cost time: 0
publish Event PayWagesEventAsync: 1, amount: 300
cost time: 0
publish Event PayWagesEventAsync: 2, amount: 300
cost time: 0
received PayWagesEventAsync, payWages: PayWages(amount=300)
received PayWagesEventAsync, payWages: PayWages(amount=300)
received PayWagesEventAsync, payWages: PayWages(amount=300)
能够看到屡次发布事情之间耗时为0,没有堵塞,验证经过。
3.4.3 原理
增加Async注解后会被AsyncExecutionInterceptor阻拦: 别的能够看到此时Executor是ThreadPoolTaskExecutor。
接着在AsyncExecutionAspectSupport类中异步提交事情处理使命: 经过上述操作完结了异步处理。
所以,编码办法和注解办法的异步处理是两个不同的处理流程。
4. 中心逻辑
需求重视的中心逻辑包括如下几点:
● 事情播送者的创立和处理逻辑
● 事情监听者注册逻辑
● 事情发布逻辑
● Async注解的办法如何被异步调用
4.1 事情播送者
4.1.1 实例化
AbstractApplicationContext.java
先经过beanFactory获取“applicationEventMulticaster”,获取不到则new一个默许的SimpleApplicationEventMulticaster实例,new的时候没有传入Executor和ErrorHandler。
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
// 先经过beanFactory获取
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
// 创立实例,没有传入Executor和ErrorHandler
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
因而,正如前面的示例,能够定制完结事情播送者并传入定制的Executor和ErrorHandler完结类。
4.1.2 事情播送处理
SimpleApplicationEventMulticaster.java
前面例子也说到了,处理时假如存在Executor且listener支撑异步处理则异步调用,否则是同步调用:
public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : ResolvableType.forInstance(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 异步处理
if (executor != null && listener.supportsAsyncExecution()) {
executor.execute(() -> invokeListener(listener, event));
}
else {
// 同步处理
invokeListener(listener, event);
}
}
}
ApplicationListener接口中默许支撑异步处理,因而只需完结类不重载接口办法都支撑异步处理:
default boolean supportsAsyncExecution() {
return true;
}
4.2 事情监听者注册逻辑
4.2.1 相关类
中心类结构如下:
类 | 描绘 |
---|---|
ApplicationListenerDetector | 辨认出完结了ApplicationListener的listener bean,经过AbstractApplicationContext增加到SimpleApplicationEventMulticaster中。 |
EventListenerMethodProcessor | 辨认EventListener注解的办法,经过DefaultEventListenerFactory创立ApplicationListenerMethodAdapter,增加到SimpleApplicationEventMulticaster中。 |
DefaultEventListenerFactory | ApplicationListener创立工厂,将EventListener注解的办法生成ApplicationListenerMethodAdapter类。 |
ApplicationListenerMethodAdapter | 依据EventListener注解的办法生成的类,记录了EventListener注解的类和办法等信息,完结了ApplicationListener接口。 |
SimpleApplicationEventMulticaster | 事情播送者,将事情播送出去,让事情监听者处理。 |
4.2.2 代码
4.2.2.1 ApplicationListenerDetector
完结了BeanPostProcessor接口,在bean初始化完结的后处理中,辨认出ApplicationListener实例的bean,增加到applicationContext中:
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof ApplicationListener<?> applicationListener) {
// potentially not detected as a listener by getBeanNamesForType retrieval
Boolean flag = this.singletonNames.get(beanName);
if (Boolean.TRUE.equals(flag)) {
// singleton bean (top-level or inner): register on the fly
this.applicationContext.addApplicationListener(applicationListener);
}
else if (Boolean.FALSE.equals(flag)) {
if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) {
// inner bean with other scope - can't reliably process events
logger.warn("Inner bean '" + beanName + "' implements ApplicationListener interface " +
"but is not reachable for event multicasting by its containing ApplicationContext " +
"because it does not have singleton scope. Only top-level listener beans are allowed " +
"to be of non-singleton scope.");
}
this.singletonNames.remove(beanName);
}
}
return bean;
}
4.2.2.2 AbstractApplicationContext
增加listener到SimpleApplicationEventMulticaster:
public void addApplicationListener(ApplicationListener<?> listener) {
Assert.notNull(listener, "ApplicationListener must not be null");
if (this.applicationEventMulticaster != null) {
this.applicationEventMulticaster.addApplicationListener(listener);
}
this.applicationListeners.add(listener);
}
4.2.2.3 AbstractApplicationEventMulticaster
将listener增加到调集中:
public void addApplicationListener(ApplicationListener<?> listener) {
synchronized (this.defaultRetriever) {
// Explicitly remove target for a proxy, if registered already,
// in order to avoid double invocations of the same listener.
Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
if (singletonTarget instanceof ApplicationListener) {
this.defaultRetriever.applicationListeners.remove(singletonTarget);
}
this.defaultRetriever.applicationListeners.add(listener);
this.retrieverCache.clear();
}
}
4.2.2.4 EventListenerMethodProcessor
完结BeanPostProcessor接口,在初始化后处理时获取ApplicationListener bean工厂:
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
this.beanFactory = beanFactory;
Map<String, EventListenerFactory> beans = beanFactory.getBeansOfType(EventListenerFactory.class, false, false);
List<EventListenerFactory> factories = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(factories);
this.eventListenerFactories = factories;
}
满意条件的完结类只要一个,即DefaultEventListenerFactory。
别的还完结了SmartInitializingSingleton接口,在afterSingletonsInstantiated办法中经过前面获取到的DefaultEventListenerFactory创立ApplicationListenerMethodAdapter:
private void processBean(final String beanName, final Class<?> targetType) {
if (!this.nonAnnotatedClasses.contains(targetType) &&
AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&
!isSpringContainerClass(targetType)) {
Map<Method, EventListener> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(targetType,
(MethodIntrospector.MetadataLookup<EventListener>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
}
catch (Throwable ex) {
// An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
if (logger.isDebugEnabled()) {
logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
}
}
if (CollectionUtils.isEmpty(annotatedMethods)) {
this.nonAnnotatedClasses.add(targetType);
if (logger.isTraceEnabled()) {
logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
}
}
else {
// Non-empty set of methods
ConfigurableApplicationContext context = this.applicationContext;
Assert.state(context != null, "No ApplicationContext set");
List<EventListenerFactory> factories = this.eventListenerFactories;
Assert.state(factories != null, "EventListenerFactory List not initialized");
// 有EventListener注解的办法,
for (Method method : annotatedMethods.keySet()) {
for (EventListenerFactory factory : factories) {
if (factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
ApplicationListener<?> applicationListener =
factory.createApplicationListener(beanName, targetType, methodToUse);
if (applicationListener instanceof ApplicationListenerMethodAdapter alma) {
alma.init(context, this.evaluator);
}
context.addApplicationListener(applicationListener);
break;
}
}
}
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
beanName + "': " + annotatedMethods);
}
}
}
}
4.2.2.5 DefaultEventListenerFactory
创立ApplicationListenerMethodAdapter,逻辑比较简单:
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new ApplicationListenerMethodAdapter(beanName, type, method);
}
4.2.2.6 ApplicationListenerMethodAdapter
有了beanName,method信息,经过反射来调用EventListener注解的事情监听办法:
public void onApplicationEvent(ApplicationEvent event) {
processEvent(event);
}
// 先获取事情参数
public void processEvent(ApplicationEvent event) {
Object[] args = resolveArguments(event);
if (shouldHandle(event, args)) {
Object result = doInvoke(args);
if (result != null) {
handleResult(result);
}
else {
logger.trace("No result object given - no result to handle");
}
}
}
// 接着经过java反射调用
protected Object doInvoke(Object... args) {
Object bean = getTargetBean();
// Detect package-protected NullBean instance through equals(null) check
if (bean.equals(null)) {
return null;
}
ReflectionUtils.makeAccessible(this.method);
try {
if (KotlinDetector.isSuspendingFunction(this.method)) {
return CoroutinesUtils.invokeSuspendingFunction(this.method, bean, args);
}
return this.method.invoke(bean, args);
}
// ......
}
4.3 事情发布
事情发布在最开始的类结构中现已说到,回顾一下:
事情发布者完结类完结ApplicationEventPublisherAware接口,经过该接口注入AbstractApplicationContext的实例。
然后经过这个实例目标发布事情给SimpleApplicationEventMulticaster,由SimpleApplicationEventMulticaster调用事情监听者完结类,并将发布的事情传递给监听者处理。
这段逻辑比较简单,代码不再赘述。
4.4 Async注解异步调用逻辑
Async注解不只仅用于spring event,体系内的其他异步处理逻辑也运用此注解。
4.4.1 中心类结构
Async注解的异步处理中心类结构如下:
类 | 描绘 |
---|---|
AsyncConfigurer | 异步配置接口,用于回来提交异步处理使命的Executor和异步处理反常类,能够定制完结类回来需求的处理类。 |
AbstractAsyncConfiguration | 异步配置笼统类,注入AsyncConfigurer,获取它的成员,别的会校验EnableAsync注解,前面也说到了Async注解必须在EnableAsync注解启用前提下才会生效。 |
ProxyAsyncConfiguration | 注册AsyncAnnotationBeanPostProcessor。 |
AsyncAnnotationBeanPostProcessor | 将AsyncAnnotationAdvisor增加到署理工厂,这样Async注解的类和办法都能被署理处理。 |
AsyncAnnotationAdvisor | 支撑Async注解的Advisor。 |
AnnotationAsyncExecutionInterceptor | Async注解阻拦器,会获取Async注解的参数,以得到约束的Executor完结类。 |
AsyncExecutionInterceptor | 异步处理阻拦器。 |
AsyncExecutionAspectSupport | 异步处理支撑类,阻拦器的父类,提供公共办法。 |
4.4.2 关键代码
4.4.2.1 AbstractAsyncConfiguration
主动注入AsyncConfigurer:
@Autowired
void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
List<AsyncConfigurer> candidates = configurers.stream().toList();
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
if (candidates.size() > 1) {
throw new IllegalStateException("Only one AsyncConfigurer may exist");
}
return candidates.get(0);
});
this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
}
4.4.2.2 AsyncAnnotationAdvisor
初始化,创立advice和pointcut,支撑多个异步处理注解:
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
asyncAnnotationTypes.add(Async.class);
ClassLoader classLoader = AsyncAnnotationAdvisor.class.getClassLoader();
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("jakarta.ejb.Asynchronous", classLoader));
}
catch (ClassNotFoundException ex) {
// If EJB API not present, simply ignore.
}
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("jakarta.enterprise.concurrent.Asynchronous", classLoader));
}
catch (ClassNotFoundException ex) {
// If Jakarta Concurrent API not present, simply ignore.
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
4.4.2.3 AsyncExecutionInterceptor
阻拦处理,使命被异步调用:
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future<?> future) {
return future.get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
4.4.2.4 AsyncExecutionAspectSupport
获取Executor:
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
String qualifier = getExecutorQualifier(method);
if (this.embeddedValueResolver != null && StringUtils.hasLength(qualifier)) {
qualifier = this.embeddedValueResolver.resolveStringValue(qualifier);
}
if (StringUtils.hasLength(qualifier)) {
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
executor = (targetExecutor instanceof AsyncTaskExecutor asyncTaskExecutor ?
asyncTaskExecutor : new TaskExecutorAdapter(targetExecutor));
this.executors.put(method, executor);
}
return executor;
}
提交异步使命:
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return executor.submitCompletable(task);
}
else if (org.springframework.util.concurrent.ListenableFuture.class.isAssignableFrom(returnType)) {
return ((org.springframework.core.task.AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else if (void.class == returnType) {
executor.submit(task);
return null;
}
else {
throw new IllegalArgumentException(
"Invalid return type for async method (only Future and void supported): " + returnType);
}
}
5. 总结
5.1 问题回顾
开篇说到的几个问题,经过前面的验证和剖析,得到的结论如下:
问题 | 描绘 |
---|---|
事情行列 | 不支撑,事情直接提交执行。 |
使命行列 | 支撑,异步形式下取决于运用的线程池目标。 |
同步/异步 | 支撑同步和异步处理。 |
耐久化 | 不支撑 |
5.2 本文小结
本文探讨了Spring Event的具体用法,包括编码和注解两种完结办法。一起深化了解了事情的播送、监听者注册、发布机制以及异步处理的关键原理。这些常识不只加深了咱们对Spring生命周期钩子和阻拦器等底层结构的了解,还为定制化事务完结提供了实用参阅。
综合来看,Spring Event适用于事情驱动的场景,但在运用时需注意以下几点:
● 同步形式或许导致堵塞问题,必要时应考虑切换至异步形式。
● 在自定义SimpleApplicationEventMulticaster进行异步处理时,应确保线程池巨细和使命行列能够满意事务需求。
● 由于事情本身不支撑耐久化,体系宕机或许对事务造成的影响要考虑到。
6. 思考
● Spring Event异步处理下的性能考量
当Spring Event配置为异步形式时,假如一起发布大量事情,或许会对体系性能发生什么样的影响?需求考虑的要素包括线程池的巨细、使命行列的容量以及体系资源的约束。
● Spring Event与开源音讯行列的对比
尽管Spring Event和开源音讯行列(如RabbitMQ, Kafka等)都采用了发布/订阅形式,但它们在完结机制、运用场景和性能方面有何不同?特别是在分布式环境和高可靠性要求下的表现差异。
● 简化进程内事情处理结构的规划
在完结一个仅限于进程内的事情处理结构时,Spring Event涉及的处理类众多。假如是你,怎样简化这个规划,以便更高效地完结事情的发布和处理?
其他阅读:
萌新快速成长之路
如何编写软件规划文档
Spring Cache架构、机制及运用
布隆过滤器适配Spring Cache及问题与处理战略
JAVA编程思维(一)经过依赖注入增加扩展性
JAVA编程思维(二)如何面向接口编程
JAVA编程思维(三)去掉别扭的if,自注册战略形式优雅满意开闭原则
Java编程思维(七)运用组合和承继的场景
JAVA根底(一)简单、透彻了解内部类和静态内部类
JAVA根底(二)内存优化-运用Java引用做缓存
JAVA根底(三)ClassLoader完结热加载
JAVA根底(五)函数式接口-复用,解耦之利刃