敞开成长之旅!这是我参与「日新计划 12 月更文应战」的第 12 天,点击查看活动概况

笔者曾根据 SkyWalking 打造千亿级储能的链路追寻体系

概况查看《Skywalking on the way-千亿级的数据储能、毫秒级的查询耗时》

欢迎重视大众号【架构染色】沟通学习

一、异步链路追寻的概述

SkyWalking 的中构建 Trace 信息时会凭借 ThreadLocal来存储一些上下文信息,当遇到跨线程的时候,如果 Trace 的上下文信息没有传递到新线程的ThreadLocal 中,那么链路就断开了。

SkyWalking供给了跨线程构建Trace的才能,经过对 CallableRunnableSupplier 这3种接口的完成者进行增强阻拦,将 Trace 的上下文信息传递到子线程中,完成了异步链路追寻。有十分多的办法来完成Callable,Runnable,Supplier 这3种接口,那么增强就面临以下问题:

  1. 增强一切的完成类显然不或许,必须根据有限的约好
  2. 不能让运用者很多修正代码,尽或许的根据现有的完成

或许根据以上问题的考虑,SkyWalking供给了一种既通用又方便的办法来规范这一现象:

  1. 只阻拦增强带有@TraceCrossThread 注解的类:
  2. 经过装修的办法包装使命,避免雷厉风行的修正
原始类 供给的包装类 阻拦办法 运用技巧
Callable CallableWrapper call CallableWrapper.of(xxxCallable)
Runnable RunnableWrapper run RunnableWrapper.of(xxxRunable)
Supplier SupplierWrapper get SupplierWrapper.of(xxxSupplier)

包装类 都有注解 @TraceCrossThread ,skywalking内部的阻拦匹配逻辑是,标示了@TraceCrossThread的类,阻拦 其名称为callrunget ,且没有入参的办法;对运用者来说大致分为2种办法:

  1. 自界说类,完成接口 CallableRunnableSupplier,加@TraceCrossThread注解。当需要有更多的自界说特点时,考虑这种办法;参阅 CallableWrapperRunnableWrapper SupplierWrapper 的完成办法。

  2. 经过xxxWrapper.of 装修的办法,即CallableWrapper.of(xxxCallable)RunnableWrapper.of(xxxRunable)SupplierWrapper.of(xxxSupplier)。大多情况下,经过这种包装形式即可。

二、异步链路追寻的运用

2.1 pom依靠:

<dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-toolkit-trace</artifactId>
    <version>xxx</version>
</dependency>

2.2 CallableWrapper

Skywalking 经过CallableWrapper包装Callable

SkyWalking 中如何构建异步链路的 Trace

1) thread+callable

private String async_thread_callable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
    FutureTask<String> futureTask = new FutureTask<String>(CallableWrapper.of(()->{
        ActiveSpan.debug("async_Thread_Callable");
        String str1 = service.sendMessage(way, time11, time22);
        return str1;
    }));
    new Thread(futureTask).start();
    return futureTask.get();
}

2)threadPool+callable

private String async_executorService_callable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
    Future<String> callableResult = executorService.submit(CallableWrapper.of(() -> {
        String str1 = service.sendMessage(way, time11, time22);
        return str1;
    }));
    return (String) callableResult.get();
}

2.3 RunnableWrapper

Skywalking 经过RunnableWrapper包装Runnable

SkyWalking 中如何构建异步链路的 Trace
1)thread+runnable

private String async_thread_runnable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
    //疏忽回来值
    FutureTask futureTask = new FutureTask(RunnableWrapper.of(() -> {
        String str1 = service.sendMessage(way, time11, time22);
    }), "mockRunnableResult");
    new Thread(futureTask).start();
    return (String) futureTask.get();
}

2)threadPool+runnable

private String async_executorService_runnable(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
    //疏忽实在回来值,mock固定回来值
    Future<String> mockRunnableResult = executorService.submit(RunnableWrapper.of(() -> {
        String str1 = service.sendMessage(way, time11, time22);
    }), "mockRunnableResult");
    return (String) mockRunnableResult.get();
}

3)completableFuture + runAsync

经过RunnableWrapper.of(xxx)包装rannable即可。

2.4 SupplierWrapper

Skywalking 经过SupplierWrapper<V>包装Supplier<V>

SkyWalking 中如何构建异步链路的 Trace

1) completableFuture + supplyAsync

private String async_completableFuture_supplyAsync(String way,long time11,long time22 ) throws ExecutionException, InterruptedException {
    CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(SupplierWrapper.of(() -> {
        String str1 = service.sendMessage(way, time11, time22);
        return str1;
    }));
    return stringCompletableFuture.get();
}

三、异步链路追寻的内部原理

Trace相关的信息需要由能跨越线程的那些目标来搭载,比如线程A调用线程B的场景:

SkyWalking 中如何构建异步链路的 Trace

SkyWalking 中如何构建异步链路的 Trace

  • 线程A

    1.调用ContextManager.capture()将Trace的上下文信息保存到一个ContextSnapshot的实例并回来。

    2.ContextSnapshot则被附加到使命目标的特定特点中,那么当线程B接触到使命目标时,便能感知到ContextSnapshot。

  • 线程B

    1.线程B中,在使命目标的使命办法被执行前,从使命目标的特定特点中获取ContextSnapshot目标,并将其作为入参调用ContextManager.continued(contextSnapshot)

    2.ContextManager.continued(contextSnapshot)办法中解分出Trace的信息后,存储到线程B的线程上下文中。

四、专用特点的机理

SkyWalkingAgent会给被增强的类中扩展一个专用特点的机制是这样的:这个类会被修正,完成了接口EnhancedInstance,此接口中供给了2个办法来读写这个扩展特点

publicinterfaceEnhancedInstance{
ObjectgetSkyWalkingDynamicField();
voidsetSkyWalkingDynamicField(Objectvalue);
}

这个扩展特点就是一个一般的Object,在宿主应用这边感知不到它的存在,由于它不是在宿主应用中界说的;但是在Agent的上下文中可将其作为数据载体,在如下这些场景运用:

  1. 阻拦目标类的构造办法,在构造房中new一个自界说目标,经过setSkyWalkingDynamicField赋值给这个专用特点
  2. 在其他办法中,捕获到不同的数据,暂存到这个专用特点里;在构建Span的时候,将专用特点中暂存的数据读取出来,填充至Span的相关特点

这种经过在目标中扩展专用特点来在上下文中传递一些信息的办法,我个人的运用感受是比ThreadLocal要更舒服,既能完成信息传递,也能处理跨线程的问题。

五、总结

本篇介绍了在SkyWalking构建异步链路 Trace 的多种办法,并描述了异步链路Trace 信息的传递原理,最后还介绍了 SkyWakling 内特殊的增强机制,是怎么增加了专用特点以应对上下文之间传递 Trace 信息的需求。

最后说一句(请重视,莫错失)

如果这篇文章对您有帮助,或许有所启示的话,欢迎重视大众号【 架构染色 】进行沟通和学习。您的支撑是我坚持写作最大的动力。

求一键三连:重视、点赞、转发。