概述
因为分布式体系节点很多,排查过错日志要涉及到多个节点,假如在多个节点中没有唯一的恳求id来把各个节点的恳求日志串联起来,那么查询起来就会耗时耗力,因此Spring Sleuth
出现了(Spring Sleuth基于Google dapper论文完成,详细了解能够检查此论文),Sleuth会在接收恳求的入口经过Filter生成唯一的标识TraceId
,这个TraceId会一向跟随恳求路径传递到各个节点,只需有日志输出就会把TraceId的值打印出来,如下图(正常还会生成SpanId,为了便于了解没展现)
假设线上产生问题,要排查日志,那么依据这个TraceId
,就能够快速查询到各个节点对应的恳求日志,但是唯一的惋惜是异步履行会丢失TraceId,因此这儿介绍异步跨线程下如何确保TraceId不丢失的问题
咱们在官方文档中找到了异步传递Traceid阐明,如下图
大致意思Sleuth默许支撑@Async
传递TraceId,并且支撑spring.sleuth.async.enabled
进行操控,同时提供了
LazyTraceExecutor
TraceableExecutorService
TraceableScheduledExecutorService
线程包装类,来支撑跨线程传递TraceId,其中TraceableScheduledExecutorService
是ScheduledExecutorService类的完成,用于完成守时使命触发,个人觉得这种需求不是特别多,所以只介绍常用的一些装备,比如@Async
装备、线程池装备、EventBus
装备,具体检查后续章节
Asnc装备
默许Sleuth是支撑@Async注解
异步传递TraceId的,但是假如自定义线程池,装备不对的状况或许就会导致失效,因为Spring在这快有个bug,详细了解请检查以下链接:
github.com/spring-proj…
github.com/spring-proj…
所以正确装备办法有如下3种
装备办法
方法1(引荐)
这儿用到了Sleuth的LazyTraceExecutor包装了线程池,这样能够确保trace目标传到下一个线程中
@Configuration
@EnableAsync
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SpringAsyncConfig extends AsyncConfigurerSupport {
@Autowired
private BeanFactory beanFactory;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("AsyncExecutor-");
executor.initialize();
return new LazyTraceExecutor(this.beanFactory, executor);
}
}
方法2
Sleuth初始化时会默许查找TaskExecutor作为Async的线程池,假如查找不到会获取默许的线程池
@EnableAsync
@Configuration
public class WebConfig {
@Bean
public TaskExecutor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("AsyncExecutor-");
executor.initialize();
return executor;
}
}
方法3
假如默许不装备任何线程池,只在工程中加了@EnableAsync
注解,那么Sleuth会运用自带的线程池SimpleAsyncTaskExecutor
,这个线程池每次调用都会创立新线程,假如调用量比较多,创立的线程也会非常多,咱们知道体系资源是有限的,假如线程数过多,会导致程序内存吃紧,从而导致OOM,所以不引荐运用这种方法
测验验证
测验代码
Async装备
@Configuration
@EnableAsync
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SpringAsyncConfig extends AsyncConfigurerSupport {
@Autowired
private BeanFactory beanFactory;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("AsyncExecutor-");
executor.initialize();
return new LazyTraceExecutor(this.beanFactory, executor);
}
}
Service
@Service
@Slf4j
public class TestService {
@Async
public void printAsyncLog() {
log.info("async log.....");
}
}
Controller
@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
@Autowired
private TestService testService;
@RequestMapping(value = "/print/log", method = RequestMethod.GET)
public String printLog() {
log.info("sync log.....1.....");
testService.printAsyncLog();
log.info("sync log.....2.....");
return "success";
}
}
恳求测验
履行恳求test/async/print/log,输出以下信息,能够看到TraceId一样,只要Spanid产生了变化,线程称号前缀AsyncExecutor
与设置前缀相同
19:44:54.818, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com.example.elkdemo.web.AsyncTestWeb printLog:30 - sync log.....1.....
19:44:54.819, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com.example.elkdemo.web.AsyncTestWeb printLog:32 - sync log.....2.....
19:44:54.819, [fae1c9449e12695f 2d51edbb45896bd8] [AsyncExecutor-2] INFO [] c.e.elkdemo.service.TestService printAsyncLog:50 - async log.....
线程池装备
线程池履行是经过TraceableExecutorService
包装了ExecutorService
,而且在初始化的时分需要注入进去BeanFactory
目标,所以线程池作为全局变量和局部变量装备稍有不同,注意下面线程池设置只是示例代码,实际运用中能够依据需求自行修正
全局变量装备
结构函数初始化(引荐)
@Service
@Slf4j
public class TestService{
final BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;
public TestService(BeanFactory beanFactory1) {
this.beanFactory = beanFactory1;
this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
/**
* 异步输出线程池日志
*/
public void printThreadPoolLog() {
traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}
}
单例初始化
@Service
@Slf4j
public class TestService {
@Autowired
private BeanFactory beanFactory;
volatile TraceableExecutorService traceableExecutorService;
public TraceableExecutorService getTraceableExecutorService() {
if (traceableExecutorService == null) {
synchronized (TraceableExecutorService.class) {
if (traceableExecutorService == null) {
traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
}
}
return traceableExecutorService;
}
/**
* 异步输出线程池日志
*/
public void printThreadPoolLog() {
TraceableExecutorService executorService = getTraceableExecutorService();
executorService.execute(() -> log.info("async thread pool log....."));
}
}
经过InitializingBean的afterPropertiesSet进行初始化
@Service
@Slf4j
public class TestService implements InitializingBean {
@Autowired
private BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;
@Override
public void afterPropertiesSet() {
traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
/**
* 异步输出线程池日志
*/
public void printThreadPoolLog() {
traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}
}
局部变量装备
/**
* 异步输出线程池日志
*/
public void printThreadPoolLog2() {
TraceableExecutorService executorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
executorService.execute(() -> log.info("async thread pool log....."));
}
测验验证
这儿选用全局变量装备方法测验
测验代码
Controller
@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
@Autowired
private TestService testService;
@RequestMapping(value = "/print/threadPool/log", method = RequestMethod.GET)
public String printThreadPoolLog() {
log.info("sync log.....1.....");
testService.printThreadPoolLog();
log.info("sync log.....2.....");
return "success";
}
}
Service
service选用结构函数方法进行初始化
@Service
@Slf4j
public class TestService{
final BeanFactory beanFactory;
private TraceableExecutorService traceableExecutorService;
public TestService(BeanFactory beanFactory1) {
this.beanFactory = beanFactory1;
this.traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
}
/**
* 异步输出线程池日志
*/
public void printThreadPoolLog() {
traceableExecutorService.execute(() -> log.info("async thread pool log....."));
}
}
恳求测验
履行恳求/test/async/print/threadPool/log,输出以下信息,能够看到Traceid一样,只要Spanid产生了变化
19:35:13.799, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:38 - sync log.....1.....
19:35:13.801, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com.example.elkdemo.web.AsyncTestWeb printThreadPoolLog:40 - sync log.....2.....
19:35:13.801, [884212fb58c658c5 70008b8d3a97602d] [pool-4-thread-2] INFO [] c.e.elkdemo.service.TestService lambda$printThreadPoolLog$0:37 - async thread pool log.....
EventBus装备
EventBus
装备与线程池装备相似,把TraceableExecutorService
注入到AsyncEventBus
中即可,因TraceableExecutorService
类引用了BeanFactory
实例,所以比原生方法杂乱了一点,以下只介绍结构函数的初始化方法,其他初始化方法与线程池装备相似,所以这儿就不再举例阐明
结构函数进行初始化
@Component
@Slf4j
public class PushEventBus {
private EventBus eventBus;
public PushEventBus(BeanFactory beanFactory) {
Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
this.eventBus = new AsyncEventBus(traceableExecutorService);
}
public void register(Object obj) {
eventBus.register(obj);
}
public void post(Object obj) {
eventBus.post(obj);
}
}
测验验证
测验代码
EventBus
@Component
@Slf4j
public class PushEventBus {
private EventBus eventBus;
public PushEventBus(BeanFactory beanFactory) {
Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors.newFixedThreadPool(10), "test");
this.eventBus = new AsyncEventBus(traceableExecutorService);
}
public void register(Object obj) {
eventBus.register(obj);
}
public void post(Object obj) {
eventBus.post(obj);
}
}
监听类
@Slf4j
public class EventListener {
/**
* 监听 Integer 类型的消息
*/
@Subscribe
public void listenInteger(Integer param) {
log.info("EventListener#listenInteger->{}",param);
}
/**
* 监听 String 类型的消息
*/
@Subscribe
public void listenString(String param) {
log.info("EventListener#listenString->{}",param);
}
}
controller
@Slf4j
@RestController
@RequestMapping("/test/async")
public class AsyncTestWeb {
@Autowired
private PushEventBus pushEventBus;
@RequestMapping(value = "/print/guava/log", method = RequestMethod.GET)
public String printGuavaLog() {
pushEventBus.register(new EventListener());
log.info("sync log.....1.....");
pushEventBus.post("11");
log.info("sync log.....2.....");
return "success";
}
}
恳求测验
履行恳求/test/async/print/guava/log,输出以下信息,能够看到Traceid一样,只要Spanid产生了变化
19:27:44.234, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:48 - sync log.....1.....
19:27:44.236, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com.example.elkdemo.web.AsyncTestWeb printGuavaLog:50 - sync log.....2.....
19:27:44.236, [50844e0d3909868c 702bf55c84873f17] [pool-3-thread-1] INFO [] c.e.elkdemo.service.EventListener listenString:21 - EventListener#listenString->11
作者公众号《架构生长攻略》欢迎重视!