我们好,我是老三,在项目里,常常会有一些主线业务之外的其它业务,比如,下单之后,发送告诉、监控埋点、记载日志……
这些非中心业务,如果悉数一梭子写下去,有两个问题,一个是业务耦合,一个是串行耗时。
所以,一般在开发的时分,都会把这些操作抽象成观察者形式,也就是发布/订阅形式(这儿就不讨论观察者形式和发布/订阅形式的不同),并且一般会选用多线程的办法来异步履行这些观察者办法。
一开始,咱们都是自己去写观察者形式。
自己完成观察者形式
观察者
- 观察者界说接口
/**
* @Author: fighter3
* @Description: 观察者接口
* @Date: 2022/11/7 11:40 下午
*/
public interface OrderObserver {
void afterPlaceOrder(PlaceOrderMessage placeOrderMessage);
}
-
详细观察者
- 监控埋点观察者
@Slf4j public class OrderMetricsObserver implements OrderObserver { @Override public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) { log.info("[afterPlaceOrder] metrics"); } }
- 日志记载观察者
@Slf4j public class OrderLogObserver implements OrderObserver{ @Override public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) { log.info("[afterPlaceOrder] log."); } }
- 业务告诉观察者
@Slf4j public class OrderNotifyObserver implements OrderObserver{ @Override public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) { log.info("[afterPlaceOrder] notify."); } }
被观察者
- 音讯实体界说
@Data
public class PlaceOrderMessage implements Serializable {
/**
* 订单号
*/
private String orderId;
/**
* 订单状态
*/
private Integer orderStatus;
/**
* 下单用户ID
*/
private String userId;
//……
}
- 被观察者抽象类
public abstract class OrderSubject {
//界说一个观察者列表
private List<OrderObserver> orderObserverList = new ArrayList<>();
//界说一个线程池,这儿参数随意写的
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
//添加一个观察者
public void addObserver(OrderObserver o) {
this.orderObserverList.add(o);
}
//删去一个观察者
public void delObserver(OrderObserver o) {
this.orderObserverList.remove(o);
}
//告诉一切观察者
public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
for (OrderObserver orderObserver : orderObserverList) {
//运用多线程异步履行
threadPoolExecutor.execute(() -> {
orderObserver.afterPlaceOrder(placeOrderMessage);
});
}
}
}
这儿运用了多线程,来异步履行观察者。
- 被观察者完成类
/**
* @Author: fighter3
* @Description: 订单完成类-被观察者完成类
* @Date: 2022/11/7 11:52 下午
*/
@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {
/**
* 下单
*/
@Override
public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
PlaceOrderResVO resVO = new PlaceOrderResVO();
//添加观察者
this.addObserver(new OrderMetricsObserver());
this.addObserver(new OrderLogObserver());
this.addObserver(new OrderNotifyObserver());
//告诉观察者
this.notifyObservers(new PlaceOrderMessage());
log.info("[placeOrder] end.");
return resVO;
}
}
测验
@Test
@DisplayName("下单")
void placeOrder() {
PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
orderService.placeOrder(placeOrderReqVO);
}
- 测验履行成果
2022-11-08 00:11:13.617 INFO 20235 --- [pool-1-thread-1] c.f.obverser.OrderMetricsObserver : [afterPlaceOrder] metrics
2022-11-08 00:11:13.618 INFO 20235 --- [ main] cn.fighter3.obverser.OrderServiceImpl : [placeOrder] end.
2022-11-08 00:11:13.618 INFO 20235 --- [pool-1-thread-3] c.fighter3.obverser.OrderNotifyObserver : [afterPlaceOrder] notify.
2022-11-08 00:11:13.617 INFO 20235 --- [pool-1-thread-2] cn.fighter3.obverser.OrderLogObserver : [afterPlaceOrder] log.
能够看到,观察者是异步履行的。
运用Spring精简
能够看到,观察者形式写起来仍是比较简略的,可是既然都用到了Spring来管理Bean的生命周期,代码还能够更精简一些。
观察者完成类:界说成Bean
-
OrderLogObserver
@Slf4j @Service public class OrderLogObserver implements OrderObserver { @Override public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) { log.info("[afterPlaceOrder] log."); } }
-
OrderMetricsObserver
@Slf4j
@Service
public class OrderMetricsObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] metrics");
}
}
- OrderNotifyObserver
@Slf4j
@Service
public class OrderNotifyObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] notify.");
}
}
被观察者:自动注入Bean
-
OrderSubject
public abstract class OrderSubject { /** * 运用Spring的特性直接注入观察者 */ @Autowired protected List<OrderObserver> orderObserverList; //界说一个线程池,这儿参数随意写的 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30)); //告诉一切观察者 public void notifyObservers(PlaceOrderMessage placeOrderMessage) { for (OrderObserver orderObserver : orderObserverList) { //运用多线程异步履行 threadPoolExecutor.execute(() -> { orderObserver.afterPlaceOrder(placeOrderMessage); }); } } }
-
OrderServiceImpl
@Service
@Slf4j
public class OrderServiceImpl extends OrderSubject implements OrderService {
/**
* 完成类里也要注入一下
*/
@Autowired
private List<OrderObserver> orderObserverList;
/**
* 下单
*/
@Override
public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
PlaceOrderResVO resVO = new PlaceOrderResVO();
//告诉观察者
this.notifyObservers(new PlaceOrderMessage());
log.info("[placeOrder] end.");
return resVO;
}
}
这样一来,发现被观察者又简练了许多,可是后来我发现,在SpringBoot项目里,运用Spring事情驱动驱动模型(event)模型来完成,愈加地简练。
Spring Event完成发布/订阅形式
Spring Event对发布/订阅形式进行了封装,运用起来愈加简略,仍是以咱们这个场景为例,看看怎样来完成吧。
自界说事情
- PlaceOrderEvent:承继ApplicationEvent,并重写构造函数。ApplicationEvent是Spring提供的一切应用程序事情扩展类。
public class PlaceOrderEvent extends ApplicationEvent {
public PlaceOrderEvent(PlaceOrderEventMessage source) {
super(source);
}
}
- PlaceOrderEventMessage:事情音讯,界说了事情的音讯体。
@Data
public class PlaceOrderEventMessage implements Serializable {
/**
* 订单号
*/
private String orderId;
/**
* 订单状态
*/
private Integer orderStatus;
/**
* 下单用户ID
*/
private String userId;
//……
}
事情监听者
事情监听者,有两种完成办法,一种是完成ApplicationListener接口,另一种是运用@EventListener注解。
完成ApplicationListener接口
完成ApplicationListener接口,重写onApplicationEvent办法,将类界说为Bean,这样,一个监听者就完成了。
- OrderLogListener
@Slf4j
@Service
public class OrderLogListener implements ApplicationListener<PlaceOrderEvent> {
@Override
public void onApplicationEvent(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
}
- OrderMetricsListener
@Slf4j
@Service
public class OrderMetricsListener implements ApplicationListener<PlaceOrderEvent> {
@Override
public void onApplicationEvent(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] metrics");
}
}
- OrderNotifyListener
@Slf4j
@Service
public class OrderNotifyListener implements ApplicationListener<PlaceOrderEvent> {
@Override
public void onApplicationEvent(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] notify.");
}
}
运用@EventListener注解
运用@EventListener注解就更简略了,直接在办法上,加上@EventListener注解就行了。
-
OrderLogListener
@Slf4j @Service public class OrderLogListener { @EventListener public void orderLog(PlaceOrderEvent event) { log.info("[afterPlaceOrder] log."); } }
-
OrderMetricsListener
@Slf4j @Service public class OrderMetricsListener { @EventListener public void metrics(PlaceOrderEvent event) { log.info("[afterPlaceOrder] metrics"); } }
-
OrderNotifyListener
@Slf4j @Service public class OrderNotifyListener{ @EventListener public void notify(PlaceOrderEvent event) { log.info("[afterPlaceOrder] notify."); } }
异步和自界说线程池
异步履行
异步履行也十分简略,运用Spring的异步注解@Async就能够了。例如:
- OrderLogListener
@Slf4j
@Service
public class OrderLogListener {
@EventListener
@Async
public void orderLog(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}
}
当然,还需要敞开异步,SpringBoot项目默许是没有敞开异步的,咱们需要手动装备敞开异步功用,很简略,只需要在装备类上加上@EnableAsync
注解就行了,这个注解用于声明启用Spring的异步办法履行功用,需要和@Configuration
注解一起运用,也能够直接加在启动类上。
@SpringBootApplication
@EnableAsync
public class DailyApplication {
public static void main(String[] args) {
SpringApplication.run(DairlyLearnApplication.class, args);
}
}
自界说线程池
运用@Async的时分,一般都会自界说线程池,由于@Async
的默许线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默许每次调用都会创立一个新的线程。
自界说线程池有三种办法:
- 完成接口AsyncConfigurer
- 承继AsyncConfigurerSupport
- 装备由自界说的TaskExecutor替代内置的使命履行器
咱们来看看三种写法:
- 完成接口AsyncConfigurer
@Configuration
@Slf4j
public class AsyncConfiguration implements AsyncConfigurer {
@Bean("fighter3AsyncExecutor")
public ThreadPoolTaskExecutor executor() {
//Spring封装的一个线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//随意写的一些装备
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(30);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("fighter3AsyncExecutor-");
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return executor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
}
}
- 承继AsyncConfigurerSupport
@Configuration
@Slf4j
public class SpringAsyncConfigurer extends AsyncConfigurerSupport {
@Bean
public ThreadPoolTaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//随意写的一些装备
threadPool.setCorePoolSize(10);
threadPool.setMaxPoolSize(30);
threadPool.setWaitForTasksToCompleteOnShutdown(true);
threadPool.setAwaitTerminationSeconds(60 * 15);
return threadPool;
}
@Override
public Executor getAsyncExecutor() {
return asyncExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex);
}
}
-
装备自界说的TaskExecutor
-
装备线程池
@Configuration public class TaskPoolConfig { @Bean(name = "asyncExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //随意写的一些装备 executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("asyncExecutor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }
-
运用@Async注解的时分,指定线程池,推荐运用这种办法,由于在项目里,尽量做到线程池阻隔,不同的使命运用不同的线程池
@Slf4j @Service public class OrderLogListener { @EventListener @Async("asyncExecutor") public void orderLog(PlaceOrderEvent event) { log.info("[afterPlaceOrder] log."); } }
-
异步和自界说线程池
这一部分只是一些扩展,稍微占了一些篇幅,我们可不要觉得Spring Event用起来很繁琐。
发布事情
发布事情也十分简略,只需要运用Spring 提供的ApplicationEventPublisher
来发布自界说事情。
-
OrderServiceImpl
@Service @Slf4j public class OrderServiceImpl implements OrderService { @Autowired private ApplicationEventPublisher applicationEventPublisher; /** * 下单 */ @Override public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) { log.info("[placeOrder] start."); PlaceOrderResVO resVO = new PlaceOrderResVO(); //音讯 PlaceOrderEventMessage eventMessage = new PlaceOrderEventMessage(); //发布事情 applicationEventPublisher.publishEvent(new PlaceOrderEvent(eventMessage)); log.info("[placeOrder] end."); return resVO; } }
在Idea里检查事情的监听者也比较方便,点击下面图中的图标,就能够检查监听者。
测验
最终,咱们仍是测验一下。
@Test
void placeOrder() {
PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO();
orderService.placeOrder(placeOrderReqVO);
}
- 履行成果
2022-11-08 10:05:14.415 INFO 22674 --- [ main] c.f.o.event.event.OrderServiceImpl : [placeOrder] start.
2022-11-08 10:05:14.424 INFO 22674 --- [ main] c.f.o.event.event.OrderServiceImpl : [placeOrder] end.
2022-11-08 10:05:14.434 INFO 22674 --- [sync-executor-3] c.f.o.event.event.OrderNotifyListener : [afterPlaceOrder] notify.
2022-11-08 10:05:14.435 INFO 22674 --- [sync-executor-2] c.f.o.event.event.OrderMetricsListener : [afterPlaceOrder] metrics
2022-11-08 10:05:14.436 INFO 22674 --- [sync-executor-1] c.f.o.event.event.OrderLogListener : [afterPlaceOrder] log.
能够看到,异步履行,并且用到了咱们自界说的线程池。
小结
这篇文章里,从最开始自己完成的观察者形式,再到运用Spring简化的观察者形式,再到运用Spring Event完成发布/订阅形式,能够看到,Spring Event用起来仍是比较简略的。除此之外,还有Guava EventBus这样的事情驱动完成,我们更习惯运用哪种呢?
参考:
[1]. 《设计形式之禅》