OpenTelemetry 实战:gRPC 监控的实现原理

news/2025/1/15 13:10:17/文章来源:https://www.cnblogs.com/crossoverJie/p/18395891

前言

最近在给 opentelemetry-java-instrumentation 提交了一个 PR,是关于给 gRPC 新增四个 metrics:

  • rpc.client.request.size: 客户端请求包大小
  • rpc.client.response.size:客户端收到的响应包大小
  • rpc.server.request.size:服务端收到的请求包大小
  • rpc.server.response.size:服务端响应的请求包大小

这个 PR 的主要目的就是能够在指标监控中拿到 RPC 请求的包大小,而这里的关键就是如何才能拿到这些包的大小。

首先支持的是 gRPC(目前在云原生领域使用的最多),其余的 RPC 理论上也是可以支持的:

在实现的过程中我也比较好奇 OpenTelemetry 框架是如何给 gRPC 请求创建 span 调用链的,如下图所示:
image.png
image.png

这是一个 gRPC 远程调用,java-demo 是 gRPC 的客户端,k8s-combat 是 gRPC 的服务端

在开始之前我们可以根据 OpenTelemetry 的运行原理大概猜测下它的实现过程。

首先我们应用可以创建这些链路信息的前提是:使用了 OpenTelemetry 提供的 javaagent,这个 agent 的原理是在运行时使用了 byte-buddy 增强了我们应用的字节码,在这些字节码中代理业务逻辑,从而可以在不影响业务的前提下增强我们的代码(只要就是创建 span、metrics 等数据)

Spring 的一些代理逻辑也是这样实现的

gRPC 增强原理

而在工程实现上,我们最好是不能对业务代码进行增强,而是要找到这些框架提供的扩展接口。

gRPC 来说,我们可以使用它所提供的 io.grpc.ClientInterceptorio.grpc.ServerInterceptor 接口来增强代码。

打开 io.opentelemetry.instrumentation.grpc.v1_6.TracingClientInterceptor 类我们可以看到它就是实现了 io.grpc.ClientInterceptor

而其中最关键的就是要实现 io.grpc.ClientInterceptor#interceptCall 函数:

@Override  
public <REQUEST, RESPONSE> ClientCall<REQUEST, RESPONSE> interceptCall(  MethodDescriptor<REQUEST, RESPONSE> method, CallOptions callOptions, Channel next) {  GrpcRequest request = new GrpcRequest(method, null, null, next.authority());  Context parentContext = Context.current();  if (!instrumenter.shouldStart(parentContext, request)) {  return next.newCall(method, callOptions);  }  Context context = instrumenter.start(parentContext, request);  ClientCall<REQUEST, RESPONSE> result;  try (Scope ignored = context.makeCurrent()) {  try {  // call other interceptors  result = next.newCall(method, callOptions);  } catch (Throwable e) {  instrumenter.end(context, request, Status.UNKNOWN, e);  throw e;  }  }  return new TracingClientCall<>(result, parentContext, context, request);  
}

这个接口是 gRPC 提供的拦截器接口,对于 gRPC 客户端来说就是在发起真正的网络调用前后会执行的方法。

所以在这个接口中我们就可以实现创建 span 获取包大小等逻辑。

使用 byte-buddy 增强代码

不过有一个问题是我们实现的 io.grpc.ClientInterceptor 类需要加入到拦截器中才可以使用:

var managedChannel = ManagedChannelBuilder.forAddress(host, port) .intercept(new TracingClientInterceptor()) // 加入拦截器
.usePlaintext()
.build();

但在 javaagent 中是没法给业务代码中加上这样的代码的。

此时就需要 byte-buddy 登场了,它可以动态修改字节码从而实现类似于修改源码的效果。

io.opentelemetry.javaagent.instrumentation.grpc.v1_6.GrpcClientBuilderBuildInstr umentation 类里可以看到 OpenTelemetry 是如何使用 byte-buddy 的。

  @Overridepublic ElementMatcher<TypeDescription> typeMatcher() {return extendsClass(named("io.grpc.ManagedChannelBuilder")).and(declaresField(named("interceptors")));}@Overridepublic void transform(TypeTransformer transformer) {transformer.applyAdviceToMethod(isMethod().and(named("build")),GrpcClientBuilderBuildInstrumentation.class.getName() + "$AddInterceptorAdvice");}@SuppressWarnings("unused")public static class AddInterceptorAdvice {@Advice.OnMethodEnter(suppress = Throwable.class)public static void addInterceptor(@Advice.This ManagedChannelBuilder<?> builder,@Advice.FieldValue("interceptors") List<ClientInterceptor> interceptors) {VirtualField<ManagedChannelBuilder<?>, Boolean> instrumented =VirtualField.find(ManagedChannelBuilder.class, Boolean.class);if (!Boolean.TRUE.equals(instrumented.get(builder))) {interceptors.add(0, GrpcSingletons.CLIENT_INTERCEPTOR);instrumented.set(builder, true);}}}

从这里的源码可以看出,使用了 byte-buddy 拦截了 io.grpc.ManagedChannelBuilder#intercept(java.util.List<io.grpc.ClientInterceptor>) 函数。

io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers#extendsClass/ isMethod 等函数都是 byte-buddy 库提供的函数。

而这个函数正好就是我们需要在业务代码里加入拦截器的地方。

interceptors.add(0, GrpcSingletons.CLIENT_INTERCEPTOR);
GrpcSingletons.CLIENT_INTERCEPTOR = new TracingClientInterceptor(clientInstrumenter, propagators);

通过这行代码可以手动将 OpenTelemetry 里的 TracingClientInterceptor 加入到拦截器列表中,并且作为第一个拦截器。

而这里的:

extendsClass(named("io.grpc.ManagedChannelBuilder")).and(declaresField(named("interceptors")))

通过函数的名称也可以看出是为了找到 继承了io.grpc.ManagedChannelBuilder 类中存在成员变量 interceptors 的类。

transformer.applyAdviceToMethod(  isMethod().and(named("build")),  GrpcClientBuilderBuildInstrumentation.class.getName() + "$AddInterceptorAdvice");

然后在调用 build 函数后就会进入自定义的 AddInterceptorAdvice 类,从而就可以拦截到添加拦截器的逻辑,然后把自定义的拦截器加入其中。

获取 span 的 attribute

我们在 gRPC 的链路中还可以看到这个请求的具体属性,比如:

  • gRPC 服务提供的 IP 端口。
  • 请求的响应码
  • 请求的 service 和 method
  • 线程等信息。

这些信息在问题排查过程中都是至关重要的。

可以看到这里新的 attribute 主要是分为了三类:

  • net.* 是网络相关的属性
  • rpc.* 是和 grpc 相关的属性
  • thread.* 是线程相关的属性

所以理论上我们在设计 API 时最好可以将这些不同分组的属性解耦开,如果是 MQ 相关的可能还有一些 topic 等数据,所以各个属性之间是互不影响的。

带着这个思路我们来看看 gRPC 这里是如何实现的。

clientInstrumenterBuilder.setSpanStatusExtractor(GrpcSpanStatusExtractor.CLIENT).addAttributesExtractors(additionalExtractors).addAttributesExtractor(RpcClientAttributesExtractor.create(rpcAttributesGetter)).addAttributesExtractor(ServerAttributesExtractor.create(netClientAttributesGetter)).addAttributesExtractor(NetworkAttributesExtractor.create(netClientAttributesGetter))

OpenTelemetry 会提供一个 io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder#addAttributesExtractor构建器函数,用于存放自定义的属性解析器。

从这里的源码可以看出分别传入了网络相关、RPC 相关的解析器;正好也就对应了图中的那些属性,也满足了我们刚才提到的解耦特性。

而每一个自定义属性解析器都需要实现接口 io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor

public interface AttributesExtractor<REQUEST, RESPONSE> {
}

这里我们以 GrpcRpcAttributesGetter 为例。

enum GrpcRpcAttributesGetter implements RpcAttributesGetter<GrpcRequest> {INSTANCE;@Overridepublic String getSystem(GrpcRequest request) {return "grpc";}@Override@Nullablepublic String getService(GrpcRequest request) {String fullMethodName = request.getMethod().getFullMethodName();int slashIndex = fullMethodName.lastIndexOf('/');if (slashIndex == -1) {return null;}return fullMethodName.substring(0, slashIndex);}

可以看到 system 是写死的 grpc,也就是对于到页面上的 rpc.system 属性。

而这里的 getService 函数则是拿来获取 rpc.service 属性的,可以看到它是通过 gRPC 的method 信息来获取 service 的。


public interface RpcAttributesGetter<REQUEST> {  @Nullable  String getService(REQUEST request);
}

而这里 REQUEST 其实是一个泛型,在 gRPC 里是 GrpcRequest,在其他 RPC 里这是对应的 RPC 的数据。

这个 GrpcRequest 是在我们自定义的拦截器中创建并传递的。

而我这里需要的请求包大小也是在拦截中获取到数据然后写入进 GrpcRequest。

static <T> Long getBodySize(T message) {  if (message instanceof MessageLite) {  return (long) ((MessageLite) message).getSerializedSize();  } else {  // Message is not a protobuf message  return null;  }}

这样就可以实现不同的 RPC 中获取自己的 attribute,同时每一组 attribute 也都是隔离的,互相解耦。

自定义 metrics

每个插件自定义 Metrics 的逻辑也是类似的,需要由框架层面提供 API 接口:

public InstrumenterBuilder<REQUEST, RESPONSE> addOperationMetrics(OperationMetrics factory) {  operationMetrics.add(requireNonNull(factory, "operationMetrics"));  return this;  
}
// 客户端的 metrics
.addOperationMetrics(RpcClientMetrics.get());// 服务端的 metrics
.addOperationMetrics(RpcServerMetrics.get());

之后也会在框架层面回调这些自定义的 OperationMetrics:

    if (operationListeners.length != 0) {// operation listeners run after span start, so that they have access to the current span// for capturing exemplarslong startNanos = getNanos(startTime);for (int i = 0; i < operationListeners.length; i++) {context = operationListeners[i].onStart(context, attributes, startNanos);}}if (operationListeners.length != 0) {  long endNanos = getNanos(endTime);  for (int i = operationListeners.length - 1; i >= 0; i--) {  operationListeners[i].onEnd(context, attributes, endNanos);  }}

这其中最关键的就是两个函数 onStart 和 onEnd,分别会在当前这个 span 的开始和结束时进行回调。

所以通常的做法是在 onStart 函数中初始化数据,然后在 onEnd 结束时统计结果,最终可以拿到 metrics 所需要的数据。

以这个 rpc.client.duration 客户端的请求耗时指标为例:

@Override  
public Context onStart(Context context, Attributes startAttributes, long startNanos) {  return context.with(  RPC_CLIENT_REQUEST_METRICS_STATE,  new AutoValue_RpcClientMetrics_State(startAttributes, startNanos));  
}@Override  
public void onEnd(Context context, Attributes endAttributes, long endNanos) {  State state = context.get(RPC_CLIENT_REQUEST_METRICS_STATE);Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build();  clientDurationHistogram.record(  (endNanos - state.startTimeNanos()) / NANOS_PER_MS, attributes, context);
}

在开始时记录下当前的时间,结束时获取当前时间和结束时间的差值正好就是这个 span 的执行时间,也就是 rpc client 的处理时间。

OpenTelemetry 中绝大多数的请求时间都是这么记录的。

Golang 增强

而在 Golang 中因为没有 byte-buddy 这种魔法库的存在,不可以直接修改源码,所以通常的做法还是得硬编码才行。

还是以 gRPC 为例,我们在创建 gRPC server 时就得指定一个 OpenTelemetry 提供的函数。

s := grpc.NewServer(  grpc.StatsHandler(otelgrpc.NewServerHandler()),  
)

在这个 SDK 中也会实现刚才在 Java 里类似的逻辑,限于篇幅具体逻辑就不细讲了。

总结

以上就是 gRPCOpenTelemetry 中的具体实现,主要就是在找到需要增强框架是否有提供扩展的接口,如果有就直接使用该接口进行埋点。

如果没有那就需要查看源码,找到核心逻辑,再使用 byte-buddy 进行埋点。

比如 Pulsar 并没有在客户端提供一些扩展接口,只能找到它的核心函数进行埋点。

而在具体埋点过程中 OpenTelemetry 提供了许多解耦的 API,方便我们实现埋点所需要的业务逻辑,也会在后续的文章继续分析 OpenTelemetry 的一些设计原理和核心 API 的使用。

这部分 API 的设计我觉得是 OpenTelemetry 中最值得学习的地方。

参考链接:

  • https://bytebuddy.net/#/
  • https://opentelemetry.io/docs/specs/semconv/rpc/rpc-metrics/#metric-rpcserverrequestsize

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/791984.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ICMAN液位检测方案

非接触式液位检测提醒方案TA是什么? ICMAN液位检测是基于双通道比较电容式液位检测原理,来判断容器中是否有液体或者液体是否达到一定高度。 有什么用? ICMAN液位检测可以实现非接触式检测,起到高低、不同液位提醒、缺水提醒、溢水提醒等作用,让我们的生产生活更加安全、便…

产品经理必修课:掌握API接口的艺术

https://img2024.cnblogs.com/blog/3506472/202409/3506472-20240904094832404-843025212.png在数字化时代,产品经理(PM)的角色已经远远超出了传统的产品规划和市场定位。现代产品经理需要具备跨领域的技术理解能力,其中一项关键技能就是对API接口的深入理解。API(应用程序…

WebShell流量特征检测_冰蝎篇

80后用菜刀,90后用蚁剑,95后用冰蝎和哥斯拉,以phpshell连接为例,本文主要是对这四款经典的webshell管理工具进行流量分析和检测。 什么是一句话木马? 1、定义 顾名思义就是执行恶意指令的木马,通过技术手段上传到指定服务器并可以正常访问,将我们需要服务器执行的命令上…

护网漏洞复现(三)

Mtab书签导航程序LinkStoregetlcon存在SQL注入漏洞 描述:Mtab书签导航程序 LinkStore/getIcon 接口存在SQL注入漏洞,未经身份验证的远程攻击者除了可以利用 SQL 注入漏洞获取数据库中的信息(例如,管理员后台密码、站点的用户个人信息)之外,甚至在高权限的情况可向服务器中…

360安全卫士极速版,如何查找恢复区,隔离区,信任区

我已经使用360安全卫士极速版已经2年时间,在个人使用感受上,说实话我觉得很不错,至少没有广告。如果有朋友是360安全卫士的使用者,我推荐你们使用安全极速版 界面更加清晰,简洁,好用。很多功能都没有删减,可以说是保留360安全卫士的核心功能 下载地址:360官网_360安全卫…

Jmeter 4.0压力测试工具安装及使用方法

一、Jmeter下载 1.网盘(官网下载贼慢):https://pan.xunlei.com/s/VO5ucfHW9SkAXN8Ns-a5oxoQA1?pwd=y4aw# 2.选择进行下载,下载下来为一个压缩包,解压即可。3.我下载的是jmeter4.0版本,对应jdk1.8。然后就进行解压。 个人认为要注意2点: 1)对应的jdk版本不可太低,一般jm…

遇见资深育儿专家智能体,开启无忧育儿之旅

本文由 ChatMoney团队出品介绍说明 在育儿的道路上,您是否常常感到迷茫和无助?面对孩子的成长问题、教育难题以及各种突发状况,您是否渴望有一位专业的导师为您指引方向?现在,让资深育儿专家智能体成为您育儿路上的得力助手,为您解锁育儿密码,告别养育迷茫! 资深育儿专…

LoRA大模型微调的利器

LoRA模型是小型的Stable Diffusion模型,它们对checkpoint模型进行微小的调整。它们的体积通常是检查点模型的10到100分之一。因为体积小,效果好,所以lora模型的使用程度比较高。LoRA模型是小型的Stable Diffusion模型,它们对checkpoint模型进行微小的调整。它们的体积通常是…

Linux服务器之TOP命令详解

在做性能调优的过程中,我们经常需要用到top命令来查看服务器实时的资源占用情况,通过top命令,我们可以查看到服务器的各项性能指标以及各个进行的资源使用情况。 命令格式:top [参数]命令参数:-b 批处理-c 显示完整的治命令-I 忽略失效过程-s 保密模式-S 累积模式-i<时…

护网漏洞复现(一)

金程云OA UploadFile存在任意文件上传漏洞 描述:金程云OA UploadFile存在任意文件上传漏洞,未经身份验证的攻击者可通过该漏洞在服务器端任意执行代码,写入后门,获取服务器权限,进而控制整个web服务器。 fofa语法:body="images/yipeoplehover.png" POC: POST …

LLM大模型基础知识学习总结

在这个已经被大模型包围的时代,不了解一点大模型的基础知识和相关概念,可能出去聊天都接不上话。刚好近期我也一直在用GPT和GitHub Copilot,也刚好对这些基础知识很感兴趣,于是学习了一下,做了如下的整理总结,分享与你!大家好,我是Edison。 在这个已经被大模型包围的时…

借助表格技术提升智能审计云平台应用体验

背景说明: 2009年,立信加入全球第五大国际会计网络——BDO国际。BDO 全球各地成员所均使用统一的审计方式,在完全遵守国际审计准则的原则同时,也会应不同地区要求提供附加指引,确保在全球提供一致的服务水平。如今,立信的审计过程及档案记录均已实现电子化。然而,审计工…