Skywalking Kafka Tracing实现

背景

Skywalking默认场景下,Tracing对于消息队列的发送场景,无法将TraceId传递到下游消费者,但对于微服务场景下,是有大量消息队列的业务场景的,这显然无法满足业务预期。

解决方案

Skywalking的官方社区中,有用户提出了该场景问题,Skywalking在补充工具包中,提供了对Kafka的tracing支持。

skywalking kafka problem

代码实现:

<dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-kafka</artifactId><version>${skywalking.version}</version></dependency>

对于该工具包,默认情况下,是针对KafkaTemplate进行trace,即如果使用KafkaTemplate发送消息,代码层面无需做任何改动。

如果没有使用KafkaTemplate的场景,toolkit也提供的了注解的支持:

public class ConsumerThread2 extends Thread {@Overridepublic void run() {Properties consumerProperties = new Properties();//...consumerProperties.put()KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);consumer.subscribe(topicPattern, new NoOpConsumerRebalanceListener());while (true) {if (pollAndInvoke(consumer)) break;}consumer.close();}@KafkaPollAndInvokeprivate boolean pollAndInvoke(KafkaConsumer<String, String> consumer) {try {Thread.sleep(1000);} catch (InterruptedException e) {}ConsumerRecords<String, String> records = consumer.poll(100);if (!records.isEmpty()) {OkHttpClient client = new OkHttpClient.Builder().build();Request request = new Request.Builder().url("http://localhost:8080/kafka-scenario/case/kafka-thread2-ping").build();Response response = null;try {response = client.newCall(request).execute();} catch (IOException e) {}response.body().close();return true;}return false;}
}

异步线程Tracing

对于Kafka消息的发送,经常会配合异步线程池的场景使用,Tracing的基本原理是基于ThreadLocal进行实现的,那么对于异步场景,是会丢失TraceId,通常的解决方式,是需要手动将主线程的TraceId手动赋值给子线程,但这种方式需要手动代码侵入,并不友好。

幸运的是,Skywalking的toolkit中提供了对于异步线程tracing的支持。

<dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-trace</artifactId><version>${skywalking.version}</version>
</dependency>

推荐用法:

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(RunnableWrapper.of(new Runnable() {@Override public void run() {//your code}
}));

或者:

 @TraceCrossThreadpublic static class MyCallable<String> implements Callable<String> {@Overridepublic String call() throws Exception {return null;}}
...ExecutorService executorService = Executors.newFixedThreadPool(1);executorService.submit(new MyCallable());

PS:事实上,RunnableWrapper也是基于@TraceCrossThread实现。

相关文档:
https://skywalking.apache.org/docs/skywalking-java/v8.16.0/en/setup/service-agent/java-agent/application-toolkit-kafka/

https://skyapm.github.io/document-cn-translation-of-skywalking/zh/6.1.0/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.html

https://blog.51cto.com/knifeedge/5268667

https://blog.csdn.net/lijunwyf/article/details/107954543

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/89677.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构(Java实现)-栈和队列

栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。 先进后出 栈的使用 栈的模拟实现 上述的主要代码 public class MyStack {private int[] elem;private int usedSize;public MyStack() {this.elem new int[5];}Overridepublic …

【小吉测评】哔哩哔哩接入AI?!效果如何?

文章目录 &#x1f384;前言⭐申请方式&#x1f3f3;️‍&#x1f308;注意 &#x1f6f8;简介&#x1f354;上手体验&#x1f6f8;进行数学计算&#x1f970;可以写代码吗 &#x1f384;前言 最近人工智能特别火&#xff0c;chatgpt&#xff0c;Claude2&#xff0c;文心一言等…

IP协议分片重组问题

分片是什么&&为什么会有分片 IP数据报分片的主要目的是为了防止IP数据报文长度超过下一跳链路MTU(最大传输单元)。 数据链路层之MTU 数据链路层中有一个东西叫做MTU&#xff08;最大传输单元&#xff09;&#xff0c;它的作用主要是控制上层给的数据报不要太大&#…

WPF怎么实现文件拖放功能winform怎么实现拖拽功能

WPF怎么实现文件拖放功能winform怎么实现文件拖拽功能&#xff0c;在管理员模式下wpf winform怎么实现文件的拖拽功能 WPF实现文件拖放功能&#xff0c;正常情况并没有什么问题&#xff0c;但是如果你的程序使用管理员身份启动&#xff0c;你就会发现文件拖放功能就会失效。同…

MOS管开关电路栅极为什么要串接电阻

在MOS管开关电路或者驱动电路中&#xff0c;常常会在MOS管的栅极串接一个电阻。 这个电阻阻值一般是几十欧姆&#xff0c;那么这个电阻有什么作用呢&#xff1f; 第一个作用就是可以限制驱动电流 &#xff0c;防止瞬间驱动电流过大导致驱动芯片驱动能力不足或者损坏。 MOS管的…

Mybatis缓存

缓存(cache&#xff09;的作用是为了减去数据库的压力&#xff0c;提高查询性能。缓存实现的原理 是从数据库中查询出来的对象在使用完后不要销毁&#xff0c;而是存储在内存&#xff08;缓存&#xff09;中&#xff0c; 当再次需要获取该对象时&#xff0c;直接从内存&#xf…

性价比高的照明品牌,五大性价比高的照明品牌台灯推荐

很多家长有时候会说孩子觉得家里的台灯灯光刺眼&#xff0c;看书看久了就不舒服。这不仅要看光线亮度是否柔和&#xff0c;还要考虑台灯是不是有做遮光式设计。没有遮光式设计的台灯&#xff0c;光源外露&#xff0c;灯光会直射孩子头部&#xff0c;孩子视线较低&#xff0c;很…

安达发|模拟车间模型生成生产排产计划

根据车间模型生成排产计划的一般程序可简单地描述为下面6个步骤。 1. 建模 车间模型必须详细地捕捉生产流程的特征和相应的物流&#xff0c;以便以最小的成本生成可行的计划。由于一个系统的产出率只受潜在瓶颈资源的限制&#xff0c;因此&#xff0c;我们只需对车间现有全部资…

成都睿趣科技:抖音开网店前期的流程是什么

随着互联网的快速发展&#xff0c;电子商务成为了商业领域中的一大利器&#xff0c;而在电商领域中&#xff0c;抖音作为一个强大的平台&#xff0c;也吸引了众多商家的目光。然而&#xff0c;要在抖音上开设一家成功的网店&#xff0c;并不是一件简单的事情&#xff0c;需要经…

网络字节序——TCP接口及其实现简单TCP服务器

网络字节序——TCP接口及其实现简单TCP服务器 文章目录 网络字节序——TCP接口及其实现简单TCP服务器简单TCP服务器的实现1. 单进程版&#xff1a;客户端串行版2. 多进程版&#xff1a;客户端并行版netstat查看网络信息3.多线程版&#xff1a;并行执行log.hpp 守护进程fg、bg s…

【GO】LGTM_Grafana_Tempo(2)_官方用例改后实操

最近在尝试用 LGTM 来实现 Go 微服务的可观测性&#xff0c;就顺便整理一下文档。 Tempo 会分为 4 篇文章&#xff1a; Tempo 的架构官网测试实操跑通gin 框架发送 trace 数据到 tempogo-zero 微服务框架使用发送数据到 tempo 根据官方文档实操跑起来 tempo&#xff0c;中间根…

几个nlp的小任务(生成式任务——语言模型(CLM与MLM))

@TOC 本章节需要用到的类库 微调任意Transformers模型(CLM因果语言模型、MLM遮蔽语言模型) CLM MLM 准备数据集 展示几个数据的结构