前语
本文将从Kafka供给的扩展机制出发,结合Opentracing和Jaeger,完成Kafka的分布式链路追寻。
kafka-clients版别:3.1.2
spring-kafka版别:2.8.11
正文
一. Kafka的阻拦器和监听器
先看一下Kafka的阻拦器和监听器的接口界说。
首先是阻拦器,分为出产者阻拦器ProducerInterceptor和顾客阻拦器ConsumerInterceptor,接口界说如下。
public interface ProducerInterceptor<K, V> extends Configurable {
// 音讯被发送前履行
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
// 音讯发送出现反常或许发送后触发回调时履行
void onAcknowledgement(RecordMetadata metadata, Exception exception);
void close();
}
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
// 音讯拉取回来前履行
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
// 提交offset后履行
void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
void close();
}
再看一下监听器的接口界说,如下所示。
public interface ProducerListener<K, V> {
// 音讯发送成功后触发回调时履行
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
}
// 音讯发送有反常触发回调时履行
default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata,
Exception exception) {
}
}
那么出产者出产音讯和顾客拉取音讯这两个动作,结合阻拦器和监听器,一个简单的图示图下。
二. Kafka分布式链路追寻Span模型规划
Kafka作为音讯中间件,支持音讯的push和pull,这姿态的信息传递方法,有别于运用RestTemplate恳求传递音讯,可是实质还是将音讯传递到了下流,那么咱们稍加笼统,就能够得到如下的Span模型。
其实便是在出产者这里发送音讯时创立代表下流的Span,然后将Span经过Record的Header来传递给到下流,这姿态全体的思路就和咱们运用RestTemplate恳求下流时,把Span放到HTTP恳求头里传递给下流是相同的。
三. Kafka出产者链路追寻规划和完成
1. Kafka出产者链路追寻规划
初步看好像能够根据出产者阻拦器来完成,例如在onSend() 办法中创立出来Span并加到ProducerRecord的Header中,然后在onAcknowledgement() 办法中调用Span的finish() 办法来记载Span,可是实际是不能根据出产者阻拦器来完成的,理由如下。
- 在onSend() 和onAcknowledgement() 办法中,入参都只要发送的音讯,而咱们记载Span时需求的一些其它信息,很难拿到;
- 没有办法把Tracer和装修器等对象设置给出产者阻拦器,这是因为出产者阻拦器的初始化是在KafkaProducer中读取到阻拦器的全限定名然后再经过反射的方法创立出来的,咱们很难把Tracer和装修器等对象设置给出产者阻拦器。
鉴于上述的问题,咱们就不能根据出产者阻拦器来完成Kafka出产者链路追寻,有一个很好的替代的思路是,咱们在KafkaTemplate之上再包装一层,作为咱们自己的增强的KafkaTemplate,这里命名为HoneyKafkaTemplate,然后咱们再界说自己的阻拦器接口HoneyKafkaProducerInterceptor,当运用HoneyKafkaTemplate来发送音讯时,会先进入到HoneyKafkaProducerInterceptor,创立Span和记载Span的功用都写在HoneyKafkaProducerInterceptor中,终究发送音讯时,还是将这个发送的动作委派给原生的KafkaTemplate,全体的结构如下所示。
2. Kafka出产者链路追寻完成
首先咱们完成HoneyKafkaTemplate来替代原生的KafkaTemplate,HoneyKafkaTemplate如下所示。
public class HoneyKafkaTemplate<K, V> extends KafkaTemplate<K, V> {
public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory) {
super(producerFactory);
}
public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, Map<String, Object> configOverrides) {
super(producerFactory, configOverrides);
}
public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
super(producerFactory, autoFlush);
}
public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, Map<String, Object> configOverrides) {
super(producerFactory, autoFlush, configOverrides);
}
}
然后供给一个主动安装的装备类HoneyKafkaTemplateConfig,如下所示。
@Configuration
@AutoConfigureBefore(KafkaAutoConfiguration.class)
public class HoneyKafkaTemplateConfig {
@Bean
public HoneyKafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory) {
return new HoneyKafkaTemplate<>(kafkaProducerFactory);
}
}
因为KafkaAutoConfiguration在注册KafkaTemplate时,添加了@ConditionalOnMissingBean(KafkaTemplate.class),所以只需求保证咱们的HoneyKafkaTemplateConfig在KafkaAutoConfiguration之前被处理,那么咱们注册的HoneyKafkaTemplate就能替代原生的KafkaTemplate来到达以假乱真的作用。
终究要在spring.factories文件中指定咱们的主动安装类以及在pom文件中添加spring-kafka依靠,如下所示。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.honey.tracing.config.HoneyTracingConfig,
com.honey.tracing.config.HoneyTracingFilterConfig,
com.honey.tracing.config.HoneyRestTemplateTracingConfig,
com.honey.tracing.config.HoneyKafkaTemplateConfig
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>provided</scope>
</dependency>
现在再给出自界说阻拦器HoneyKafkaProducerInterceptor的界说,如下所示。
public interface HoneyKafkaProducerInterceptor<K, V> {
/**
* 阻拦Kafka出产者的音讯发送。
*
* @param producerRecord 出产者发送的音讯。
*/
ListenableFuture<SendResult<K, V>> intercept(
ProducerRecord<K, V> producerRecord,
HoneyKafkaProducerInterceptorChain<K, V> kafkaProducerInterceptorChain);
}
咱们再供给一个完成类,如下所示。
public class HoneyKafkaProducerTracingInterceptor<K, V> implements HoneyKafkaProducerInterceptor<K, V> {
private final Tracer tracer;
private final List<HoneyKafkaTracingProducerDecorator<K, V>> kafkaTracingProducerDecorators;
public HoneyKafkaProducerTracingInterceptor(Tracer tracer, List<HoneyKafkaTracingProducerDecorator<K, V>> kafkaTracingProducerDecorators) {
this.tracer = tracer;
this.kafkaTracingProducerDecorators = kafkaTracingProducerDecorators;
}
@Override
public ListenableFuture<SendResult<K, V>> intercept(ProducerRecord<K, V> producerRecord,
HoneyKafkaProducerInterceptorChain<K, V> kafkaProducerInterceptorChain) {
if (tracer.activeSpan() == null) {
return kafkaProducerInterceptorChain.intercept(producerRecord);
}
// 生成Kafka出产者对应的Span
// 类似于RestTemplate调用前的Span
Span span = tracer.buildSpan(HONEY_KAFKA_NAME)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.start();
for (HoneyKafkaTracingProducerDecorator<K, V> kafkaTracingProducerDecorator : kafkaTracingProducerDecorators) {
try {
kafkaTracingProducerDecorator.onSend(span, producerRecord);
} catch (Exception e) {
// do nothing
}
}
ListenableFuture<SendResult<K, V>> result;
try (Scope scope = tracer.activateSpan(span)) {
// 设置Kafka服务端的host
String hostString = kafkaProducerInterceptorChain.getHoneyKafkaTemplate().getProducerFactory()
.getConfigurationProperties().get(BOOTSTRAP_SERVERS_CONFIG).toString();
span.setTag(FIELD_HOST, hostString.substring(1, hostString.length() - 1));
// host需求传递给顾客
span.setBaggageItem(FIELD_HOST, hostString.substring(1, hostString.length() - 1));
// 把SpanContext注入到ProducerRecord的Headers中
tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new HoneyKafkaCarrier(producerRecord.headers()));
try {
result = kafkaProducerInterceptorChain.intercept(producerRecord);
} catch (Exception e1) {
for (HoneyKafkaTracingProducerDecorator<K, V> kafkaTracingProducerDecorator : kafkaTracingProducerDecorators) {
try {
kafkaTracingProducerDecorator.onError(span, producerRecord);
} catch (Exception e2) {
// do nothing
}
}
throw e1;
}
for (HoneyKafkaTracingProducerDecorator<K, V> kafkaTracingProducerDecorator : kafkaTracingProducerDecorators) {
try {
kafkaTracingProducerDecorator.onSuccess(span, producerRecord);
} catch (Exception e) {
// do nothing
}
}
} finally {
span.finish();
tracer.activeSpan().log(RequestStackUtil.assembleRequestStack((JaegerSpan) span));
}
return result;
}
}
和之前完成的RestTemplate的阻拦器逻辑几乎是一模相同,区别就在于一个是Span放在HTTP恳求头,一个是Span放在音讯的Header中,以及咱们在Inject时运用了一个自界说的HoneyKafkaCarrier,如下所示。
public class HoneyKafkaCarrier implements TextMap {
private final Headers headers;
public HoneyKafkaCarrier(Headers headers) {
this.headers = headers;
}
@NotNull
@Override
public Iterator<Map.Entry<String, String>> iterator() {
Map<String, String> headerMap = new HashMap<>();
for (Header header : headers) {
headerMap.put(header.key(), new String(header.value()));
}
return headerMap.entrySet().iterator();
}
@Override
public void put(String key, String value) {
headers.add(key, value.getBytes());
}
}
特别注意到HoneyKafkaProducerInterceptorChain,咱们将所有的HoneyKafkaProducerInterceptor创立为了一个连接器链,如下所示。
public class HoneyKafkaProducerInterceptorChain<K, V> {
private final Iterator<HoneyKafkaProducerInterceptor<K, V>> kafkaProducerInterceptors;
private final HoneyKafkaTemplate<K, V> honeyKafkaTemplate;
public HoneyKafkaProducerInterceptorChain(Iterator<HoneyKafkaProducerInterceptor<K, V>> kafkaProducerInterceptors,
HoneyKafkaTemplate<K, V> honeyKafkaTemplate) {
this.kafkaProducerInterceptors = kafkaProducerInterceptors;
this.honeyKafkaTemplate = honeyKafkaTemplate;
}
public HoneyKafkaTemplate<K, V> getHoneyKafkaTemplate() {
return honeyKafkaTemplate;
}
public ListenableFuture<SendResult<K, V>> intercept(ProducerRecord<K, V> producerRecord) {
if (kafkaProducerInterceptors.hasNext()) {
// 阻拦器没履行完则先履行阻拦器
return kafkaProducerInterceptors.next().intercept(producerRecord, this);
}
// 阻拦器悉数履行完后才发送音讯
return honeyKafkaTemplate.actuallySend(producerRecord);
}
}
意图如下。
- 支持后续添加更多的阻拦器;
- 让连接器链持有HoneyKafkaTemplate从而所有阻拦器都持有了HoneyKafkaTemplate。
至于装修器,咱们暂时就界说一个空接口出来,后边按需添加完成类,如下所示。
/**
* Kafka出产者链路追寻装修器。
*/
public interface HoneyKafkaTracingProducerDecorator<K, V> {
void onSend(Span span, ProducerRecord<K, V> producerRecord);
void onSuccess(Span span, ProducerRecord<K, V> producerRecord);
void onError(Span span, ProducerRecord<K, V> producerRecord);
}
现在已经有阻拦器链了,接下来要做的事情便是把阻拦器链给到咱们的HoneyKafkaTemplate,如下所示。
public class HoneyKafkaTemplate<K, V> extends KafkaTemplate<K, V> {
private List<HoneyKafkaProducerInterceptor<K, V>> kafkaProducerInterceptors;
public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory,
List<HoneyKafkaProducerInterceptor<K, V>> kafkaProducerInterceptors) {
super(producerFactory);
this.kafkaProducerInterceptors = kafkaProducerInterceptors;
}
public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, Map<String, Object> configOverrides) {
super(producerFactory, configOverrides);
}
public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
super(producerFactory, autoFlush);
}
public HoneyKafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, Map<String, Object> configOverrides) {
super(producerFactory, autoFlush, configOverrides);
}
public ListenableFuture<SendResult<K, V>> actuallySend(ProducerRecord<K, V> producerRecord) {
return super.doSend(producerRecord);
}
@NotNull
@Override
public ListenableFuture<SendResult<K, V>> doSend(@NotNull ProducerRecord<K, V> producerRecord) {
HoneyKafkaProducerInterceptorChain<K, V> kafkaProducerInterceptorChain
= new HoneyKafkaProducerInterceptorChain<>(kafkaProducerInterceptors.iterator(), this);
return kafkaProducerInterceptorChain.intercept(producerRecord);
}
}
这样在运用HoneyKafkaTemplate来发送音讯时,能够先让阻拦器链履行,然后再把音讯委派给原生KafkaTemplate来发送。
现在需求修正注册HoneyKafkaTemplate的主动安装类,如下所示。
@Configuration
@ConditionalOnBean(KafkaTemplate.class)
@AutoConfigureBefore(KafkaAutoConfiguration.class)
public class HoneyKafkaTemplateConfig {
@Bean
public HoneyKafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
List<HoneyKafkaProducerInterceptor<Object, Object>> kafkaProducerInterceptors) {
return new HoneyKafkaTemplate<>(kafkaProducerFactory, kafkaProducerInterceptors);
}
}
即在创立HoneyKafkaTemplate时需求把注册到容器中的HoneyKafkaProducerInterceptor设置进去。一起还要为咱们的阻拦器的完成HoneyKafkaProducerTracingInterceptor供给一个主动安装类,如下所示。
@Configuration
@ConditionalOnBean(KafkaTemplate.class)
@AutoConfigureAfter(HoneyTracingConfig.class)
public class HoneyKafkaTracingConfig {
@Bean
public HoneyKafkaProducerInterceptor kafkaProducerTracingInterceptor(
Tracer tracer, List<HoneyKafkaTracingProducerDecorator<Object, Object>> kafkaTracingProducerDecorators) {
return new HoneyKafkaProducerTracingInterceptor<>(tracer, kafkaTracingProducerDecorators);
}
}
对应修正spring.factories文件如下所示。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.honey.tracing.config.HoneyTracingConfig,
com.honey.tracing.config.HoneyTracingFilterConfig,
com.honey.tracing.config.HoneyRestTemplateTracingConfig,
com.honey.tracing.config.HoneyKafkaTemplateConfig,
com.honey.tracing.config.HoneyKafkaTracingConfig
至此咱们的具有分布式链路追寻功用的HoneyKafkaTemplate就能够运用啦,终究给出上述运用到的常量类,如下所示。
public class CommonConstants {
public static final double DEFAULT_SAMPLE_RATE = 1.0;
public static final String HONEY_TRACER_NAME = "HoneyTracer";
public static final String HONEY_REST_TEMPLATE_NAME = "HoneyRestTemplate";
public static final String HONEY_KAFKA_NAME = "HoneyKafka";
public static final String FIELD_HOST = "host";
public static final String FIELD_API = "api";
public static final String FIELD_HTTP_CODE = "httpCode";
public static final String FIELD_SUB_SPAN_ID = "subSpanId";
public static final String FIELD_SUB_HTTP_CODE = "subHttpCode";
public static final String FIELD_SUB_TIMESTAMP = "subTimestamp";
public static final String FIELD_SUB_DURATION = "subDuration";
public static final String FIELD_SUB_HOST = "subHost";
public static final String HOST_PATTERN_STR = "(?<=(https://|http://)).*?(?=/)";
public static final String SLASH = "/";
public static final String LOG_EVENT_KIND = "logEventKind";
public static final String LOG_EVENT_KIND_REQUEST_STACK = "requestStack";
}
四. Kafka顾客链路追寻规划和完成
1. Kafka顾客链路追寻规划
同样的,Kafka顾客阻拦器也不适合来完成分布式链路追寻,理由和第三节第1末节中基本共同。
通常在Spring中完成Kafka音讯消费时,咱们运用@KafkaListener注解,被该注解修饰的办法用于消费音讯并处理,入参是ConsumerRecord,所以咱们能够挑选供给切面来切这些被@KafkaListener注解修饰的办法并在办法履行前后操作Span,一起为了和@KafkaListener注解解耦,咱们能够专门界说一个注解来作为咱们的切点,那么现在的音讯消费流程就成下面这样了。
2. Kafka顾客链路追寻完成
首先界说作为切点的注解,如下所示。
/**
* Kafka顾客办法运用该注解。
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface HoneyKafkaTracing {
}
对应的切面如下所示。
@Aspect
public class HoneyKafkaConsumerTracingAspect {
private final HoneyKafkaConsumerInterceptor kafkaConsumerTracingInterceptor;
public HoneyKafkaConsumerTracingAspect(HoneyKafkaConsumerInterceptor kafkaConsumerTracingInterceptor) {
this.kafkaConsumerTracingInterceptor = kafkaConsumerTracingInterceptor;
}
@Pointcut("@annotation(com.honey.tracing.kafka.consumer.interceptor.HoneyKafkaTracing)")
private void kafkaTracing() {
}
@Around("kafkaTracing()")
public Object intercept(ProceedingJoinPoint joinPoint) throws Throwable {
return kafkaConsumerTracingInterceptor.intercept(joinPoint);
}
}
在盘绕告诉中,会调用到咱们自界说的Kafka顾客阻拦器,接口界说如下。
public interface HoneyKafkaConsumerInterceptor {
Object intercept(ProceedingJoinPoint joinPoint) throws Throwable;
}
具体的完成类,如下所示。
public class HoneyKafkaConsumerTracingInterceptor<K, V> implements HoneyKafkaConsumerInterceptor {
private final Tracer tracer;
private final List<HoneyKafkaTracingConsumerDecorator<K, V>> kafkaTracingConsumerDecorators;
public HoneyKafkaConsumerTracingInterceptor(
Tracer tracer, List<HoneyKafkaTracingConsumerDecorator<K, V>> kafkaTracingConsumerDecorators) {
this.tracer = tracer;
this.kafkaTracingConsumerDecorators = kafkaTracingConsumerDecorators;
}
@Override
public Object intercept(ProceedingJoinPoint joinPoint) throws Throwable {
ConsumerRecord consumerRecord = null;
// 找到Kafka音讯
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if (arg instanceof ConsumerRecord) {
consumerRecord = (ConsumerRecord) arg;
}
}
if (consumerRecord == null) {
// 没有获取到Kafka音讯则不处理链路
return joinPoint.proceed();
}
SpanContext extractSpanContext = tracer.extract(Format.Builtin.HTTP_HEADERS,
new HoneyKafkaCarrier(consumerRecord.headers()));
Span span = tracer.buildSpan(HONEY_KAFKA_NAME)
.asChildOf(extractSpanContext)
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
.start();
span.setTag(FIELD_HOST, span.getBaggageItem(FIELD_HOST));
for (HoneyKafkaTracingConsumerDecorator<K, V> kafkaTracingConsumerDecorator : kafkaTracingConsumerDecorators) {
try {
kafkaTracingConsumerDecorator.onReceive(span, consumerRecord);
} catch (Exception e) {
// do nothing
}
}
Object result;
try (Scope scope = tracer.activateSpan(span)) {
try {
result = joinPoint.proceed();
} catch (Exception e1) {
for (HoneyKafkaTracingConsumerDecorator<K, V> kafkaTracingConsumerDecorator : kafkaTracingConsumerDecorators) {
try {
kafkaTracingConsumerDecorator.onError(span, consumerRecord);
} catch (Exception e2) {
// do nothing
}
}
throw e1;
}
for (HoneyKafkaTracingConsumerDecorator<K, V> kafkaTracingConsumerDecorator : kafkaTracingConsumerDecorators) {
try {
kafkaTracingConsumerDecorator.onFinished(span, consumerRecord);
} catch (Exception e) {
// do nothing
}
}
} finally {
span.finish();
}
return result;
}
}
同样的咱们也界说了一个装修器接口,如下所示。
/**
* Kafka顾客链路追寻装修器。
*/
public interface HoneyKafkaTracingConsumerDecorator<K, V> {
void onReceive(Span span, ConsumerRecord<K, V> consumerRecord);
void onFinished(Span span, ConsumerRecord<K, V> consumerRecord);
void onError(Span span, ConsumerRecord<K, V> consumerRecord);
}
然后需求修正一下主动安装类HoneyKafkaTracingConfig来将咱们自界说的Kafka顾客阻拦器和切面注册到容器中,修正如下。
@Configuration
@AutoConfigureAfter(HoneyTracingConfig.class)
public class HoneyKafkaTracingConfig {
@Bean
public HoneyKafkaProducerInterceptor kafkaProducerTracingInterceptor(
Tracer tracer, List<HoneyKafkaTracingProducerDecorator<Object, Object>> kafkaTracingProducerDecorators) {
return new HoneyKafkaProducerTracingInterceptor<>(tracer, kafkaTracingProducerDecorators);
}
@Bean
public HoneyKafkaConsumerInterceptor honeyKafkaConsumerInterceptor(
Tracer tracer, List<HoneyKafkaTracingConsumerDecorator<Object, Object>> kafkaTracingProducerDecorators) {
return new HoneyKafkaConsumerTracingInterceptor(tracer, kafkaTracingProducerDecorators);
}
@Bean
public HoneyKafkaConsumerTracingAspect honeyKafkaConsumerTracingAspect(
HoneyKafkaConsumerInterceptor kafkaConsumerTracingInterceptor) {
return new HoneyKafkaConsumerTracingAspect(kafkaConsumerTracingInterceptor);
}
}
终究,因为运用到了切面,所以需求引进Spring的AOP的依靠,如下所示。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
五. 功用验证
现在运用一个简单的Kafka的例子来验证Kafka分布式链路追寻的功用。
咱们让example-service-1充任出产者,pom中引进Kafka依靠如下所示。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
然后添加出产者的装备,如下所示。
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
终究添加一个KafkaController来发送Kafka音讯,如下所示。
@RestController
public class KafkaController {
private static final String TEST_TOPIC = "testTopic";
private static final String TEST_MESSAGE = "testMessage";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/kafka/send")
public void send(String url) {
kafkaTemplate.send(TEST_TOPIC, TEST_MESSAGE);
}
}
咱们再让example-service-2充任顾客,pom中引进Kafka依靠如下所示。
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
然后添加顾客的装备,如下所示。
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: kafka-tracing
终究添加一个KafkaService来消费音讯,如下所示。
@Service
public class KafkaService {
private static final String TEST_TOPIC = "testTopic";
@HoneyKafkaTracing
@KafkaListener(topics = TEST_TOPIC)
public void onMessage(ConsumerRecord<String, String> consumerRecord) {
System.out.println(consumerRecord.value());
}
}
终究example-service-1打印链路如下所示。
{
"traceId": "904360d3ae85faa7ce5d698debda55fd",
"spanId": "ce5d698debda55fd",
"parentSpanId": "0000000000000000",
"timestamp": "1708067308855",
"duration": "306",
"httpCode": "200",
"host": "http://localhost:8080",
"requestStacks": [{
"subSpanId": "7cb4062ca0fa7cdd",
"subHttpCode": "null",
"subTimestamp": "1708067308876",
"subDuration": "262",
"subHost": "127.0.0.1:9092"
}]
}
example-service-2打印链路如下所示。
{
"traceId": "904360d3ae85faa7ce5d698debda55fd",
"spanId": "7cb4062ca0fa7cdd",
"parentSpanId": "ce5d698debda55fd",
"timestamp": "1708067309171",
"duration": "5",
"httpCode": "null",
"host": "127.0.0.1:9092",
"requestStacks": []
}
可见链路信息经过Kafka音讯得到了传递。
总结
Kafka尽管是经过push和pull方法来传递音讯,可是咱们能够将这种方法笼统为出产者直接将音讯传递给到了顾客,传递介质便是Record,一起Record中预留了Header,咱们就能够将Span经过Header来传递,整个模式就和经过RestTemplate恳求下流一模相同了。
Kafka分布式链路追寻的完成,实质是阻拦Kafka的音讯发送和接纳,尽管Kafka供给了阻拦器,可是因为Kafka阻拦器的加载机制问题,运用起来并不是很方便,所以咱们挑选自界说阻拦器和切面的方法来阻拦Kafka的音讯发送和接纳,一起为了方便的运用咱们自界说的阻拦器,咱们还自界说了KafkaTemplate来对原生KafkaTemplate进行了包装,运用自界说KafkaTemplate发送音讯时,能够先在自界说KafkaTemplate中先应用自界说阻拦器的逻辑,然后真实发送音讯的动作就委派给原生KafkaTemplate。
本文中,honey-starter-tracing的工程目录结构如下所示。
测试demo的工程目录结构如下所示。