基于Skywalking开发分布式监控(四)一个案例

上一篇我们简单介绍了基于SkyWalking自定义增强的基本架构,即通过把Trace数据导入数据加工模块进行加工,进行持久化,并赋能grafana展示。

现在我们给出一个例子,对于量化交易系统,市场交易订单提交,该订单可以走模拟盘也可以走实盘,可以自动提交,也可以走人工提交,订单提交后,会把交易所给到的订单信息反馈回来。 需要监控的需求很简单:可以按,自动实盘/虚拟盘,人工实盘/虚拟盘订单分类监控,提交和反馈流程,满足指标项:

1 每分钟延时、延时百分位(P50/75/90/95/99 MAX)、每分钟请求数,排名前5的慢请求等监控项(metrics)

2 以及按排名前5的慢请求对应的SPAN进行抓取,分析出最慢的SPAN

那么SW原生监控有啥问题呢?
1 需要根据该流程在不同阶段的特征才能定位该流程,按Trace-Span模型来说,即需要一个Trace链根据不同Span提供的特征才能抓取该Trace,SW并不支持

例如 分辨人工/自动订单实际上是按Trace相关EndpointName来的
人工订单走页面,EntrySpan的 endpointName为POST:/api/trade/order/send
但自动订单由程序发起,EntrySpan的 endpointName为“rpc.OrderTradeService.send

而分辨是否走实盘/虚拟盘,则是在后续Span,按tag systemFlag=1或2,来确认

在这里插入图片描述
而SW的搜索显然是不支持的

  1. 问题2 反馈消息是根据交易所API生成的,不是一个标准通讯架构,只能根据自定义用户增强(customize-enhance),生成的localSpan形成跟踪链,那SW原生Trace查询根本没法按endpoint名字搜索,只能按tag搜索,然后按时间取定位,效率非常低
  2. 还有一个上一篇说了,SW对Trace和Span不提供metric聚合项

那增强计算模块怎么解决上述问题
对问题1: 按人工、自动、虚拟、实盘,形成4个搜索项,然后定时(基本)同时执行,把搜索结果叠加到ES索引中,按订单编号trade_id更新索引项,利用ES的向量特征形加上业务标签,供下游按业务标签定位需要的Trace

对问题2: 按预先设计的Tag值标识反馈消息,然后按Tag搜索,把搜索结果叠加到ES索引中,按订单编号trade_id更新索引项,利用ES的向量特征形加上业务标签,供下游按业务标签定位需要的Trace

对问题3 按业务标签计算各监控项(metrics),并按时间点汇总最慢的5个Trace,查找Span

我们按配置config来说明
关于问题1,我们配置了4个搜索项

"tasks" : [{  #查找按EndpointName=rpc.OrderTradeService.send查找自动订单,并且在ES索引中增加业务标签 businessTag:: Auto"name": "task.QueryTraces",       "para" : {"serviceName" : "TradeService","endpointName" : "rpc.OrderTradeService.send","businessTag" : { "key": "businessTag", "value": "Auto"},"tags" : {},"traces_index" :  "traces-"    #索引名,xx-后面跟着日期},"switch" : "on",      #搜索项有效"interval" : "60"      #每隔60秒执行一次},{ #查找按EndpointName=POST:/api/trade/order/send查找人工订单,并且在ES索引中增加业务标签 businessTag:: manual"name" : "task.QueryTraces","para" : {"serviceName" : "TradeService","endpointName" : "POST:/api/trade/order/send","businessTag" : { "key": "businessTag", "value": "manual"},"tags" : {},"traces_index" :  "traces-"},"switch" : "on","interval" : "60"},{  #查找按tag: systemFlag=1 查找人工订单,并且在ES索引中增加业务标签 systemFlag:: 1 (实盘)"name" : "task.QueryTraces","para" : {"serviceName" : "TradeService","endpointName" : "","businessTag" : { "key": "systemFlag", "value": "sim"},"tags" : { "key": "systemFlag", "value": "1"},"traces_index" :  "traces-"},"switch" : "on","interval" : "60"},{   #查找按tag: systemFlag=2 查找人工订单,并且在ES索引中增加业务标签 systemFlag:: 2 (实盘)"name" : "task.QueryTraces","para" : {"serviceName" : "TradeService","endpointName" : "","businessTag" : { "key": "systemFlag", "value": "RealTime"},"tags" : { "key": "systemFlag", "value": "2"},"traces_index" :  "traces-"},"switch" : "on","interval" : "60"},

task.QueryTraces是查询程序,按每分钟1次的节奏,按Graphql接口查询,需要用到的接口,按ServiceName按SW内置查询searchService接口查ServiceId , 按SW内置查询searchEndpoint接口查EndpointId
然后根据ServiceId , EndpointId调用,或者ServiceId和预置Tag,按SW内置查询接口queryBasicTraces查询相关Traces,注意点如下:
1 查询窗口要注意,也就是要防止Trace形成前执行查询语句,建议做成滑动窗口,可以调节窗口的大小,或者隔几秒多试几次(比如10秒执行3次)
2 要注意应用多页查询,queryBasicTraces有页数限制,一次最多1000条,要查全需要比较完整多页查询结构
查询完更新ES索引之后
在这里插入图片描述
很容易根据业务标签,获取我们所需的Traces

同理对问题2,我们引入配置文件,实际上我们利用FIX报文msgtype=8 报文的特征来标识反馈消息,然后按ordStatus,表示是否是成交或者订单有效的报文,即按tags msgType=8, ordStatus=2/0 查询相关Traces

{"name" : "task.QueryTraces","para" : {"serviceName" : "APIService","endpointName" : "","businessTag" : { "key": "OrdStatus", "value": "deal"},"tags" : [{ "key": "msgType", "value": "8"},{"key": "ordStatus","value": "2"}],"traces_index" :  "traces-"},"switch" : "on","interval" : "60"},{"name" : "task.TracesQueryInfo","para" : {"serviceName" : "APIService","endpointName" : "","businessTag" : { "key": "OrdStatus", "value": "effect"},"tags" : [{ "key": "msgType", "value": "8"},{"key": "ordStatus","value": "0"}],"traces_index" :  "traces-"},"switch" : "on","interval" : "60"},

对于问题3,我们配置两种计算模块: 一是 task.Caculator用于计算各类Metrics,与SW无关,二是 task.SpanInfo计算 ES索引库中 按大于95%分位数延时的慢Traces,逐条查找全部Span

{ # 按业务标签查人工实盘的订单traces(businessTag=manual,systemFlag=RealTime),计算监控项"name": "task.Caculator","para" : {"businessTags" :[{ "key": "businessTag", "value": "manual"},{"key": "systemFlag","value": "RealTime"}],"traces_index" :  "traces-",    # 源索引"stat_index" : "traces_index-"   #监控项索引},"switch" : "on","interval" : "60","delay" : 10      # 比源索引执行慢10秒},{  # 按业务标签查自动虚拟盘的订单traces(businessTag=auto,systemFlag=sim),计算监控项"name": "task.Caculator","para" : {"businessTags" :[{ "key": "businessTag", "value": "Auto"},{"key": "systemFlag","value": "sim"}],"traces_index" :  "traces-","stat_index" : "traces_index-"},"switch" : "on","interval" : "60","delay" : 10},{ # 按业务标签查自动实盘的订单traces(businessTag=auto,systemFlag=Realtime),计算监控项"name": "task.Caculator","para" : {"businessTags" :[{ "key": "businessTag", "value": "Auto"},{"key": "systemFlag","value": "RealTime"}],"traces_index" :  "traces-","stat_index" : "traces_index-"},"switch" : "on","interval" : "60","delay" : 10},{ # 按业务标签查反馈提交有效订单(OrdStatus=effect,systemFlag=Realtime),计算监控项"name": "task.Caculator","para" : {"businessTags" : { "key": "OrdStatus", "value": "effect"},"traces_index" :  "traces-","stat_index" : "traces_index-"},"switch" : "on","interval" : "60","delay" : 10},{ # 计算 ES索引库中 按大于95%分位数延时的慢Traces,逐条查找全部Span"name": "task.SpanInfo","para" : {"percentile" : 0.95,"traces_index" :  "traces-","span_index" : "traces_index-"},"switch" : "on","interval" : "60","delay" : 10}

我们看一下订单提交计算结果索引
在这里插入图片描述

以及慢Trace相关Span的索引
在这里插入图片描述
关于task.QueryTraces,task.Caculator,task.SpanInfo,主要代码如下
task.QueryTraces

public class QueryTraces extends AbstractTraceQuery implements TaskService,Runnable{private static final Lock lock = new ReentrantLock();  //对不同任务的竞争性资源加锁ObjectMapper objectMapper = new ObjectMapper();String serviceName,serviceId,endpointName,endpointId,traces_index;ArrayNode businessTags;JsonNode businessTag,tags;DatasourceService datasource;TargetdbService targetdb;@Overridepublic void run() {logger.info("QueryInfo begin...");if("".equals(serviceId)){//防止获取不到serviceIdserviceId=this.datasource.queryServiceId(serviceName);if("".equals(serviceId)){//第二次获取不成功就终止线程logger.error("query serviceId fail");return;}}if(endpointName.equals("")){//检查tags是否为空,为空就终止线程if(tags.isNull() || tags.isMissingNode()) {logger.error("endpointName & tags is both empty");return;}} else{if("".equals(endpointId)){//防止获取不到endpointIdendpointId=this.datasource.queryEndPointId(endpointName,serviceName);if("".equals(endpointId)){//第二次获取不成功就终止线程logger.error("query endpointId fail");return;}}}targetdb.createForm(traces_index);String endTime=getTimeEndPoint(1,40);String startTime=getTimeEndPoint(3,41);int retry=3;  //重试次数int lastArraylistSize=0;ArrayNode traceList= JsonNodeFactory.instance.arrayNode();logger.info("QueryInfo startTime:: {}  endTime:: {}",startTime,endTime);try{while(retry>0){//查询SW的traces数据,注意有可能需要分页查询traceList=getMultiPageResult(datasource,serviceId,endpointId,startTime,endTime,tags);logger.info("traceList:: {} retry:: {}",traceList.toString(),retry);if(traceList.size()>lastArraylistSize){//如果查到结果,打业务标签,并按TraceId调批量更新目标库lastArraylistSize=traceList.size();Map<String, List<Map<String,Object>>> traceMap = genTraceMap(businessTags, traceList); //结果集合targetdb.updateDate(traces_index,traceMap);//打时间戳logger.info("TracesQuery update is done. {}",System.currentTimeMillis());}try {// 暂停执行5秒钟Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}retry--;}}catch (Exception e) {e.printStackTrace();return;}}@Overridepublic void init(JsonNode paraData, DatasourceService datasourceService, TargetdbService targetdbService) {......}
}

task.Caculator

public class Caculator extends AbstractTraceQuery implements TaskService,Runnable {private final static Logger logger = LoggerFactory.getLogger(TracesQueryInfo.class);private static final Lock lock = new ReentrantLock();  //对不同任务的竞争性资源加锁String traces_index, stat_index;ArrayNode businessTags;JsonNode businessTag;DatasourceService datasource;TargetdbService targetdb;private Map<String,Object> traceProcess(Map<String,Object> sourceMap){//处理traces查询结果AtomicInteger durationSum= new AtomicInteger();AtomicInteger count= new AtomicInteger();AtomicInteger maxDuration=new AtomicInteger();double durationAvg,p50,p75,p90,p95,p99;ArrayList<Integer> durationArray = new ArrayList<>();;  //延时集合,用于计算分位数sourceMap.entrySet().stream().forEach((Map.Entry<String,Object> entry) -> {count.getAndIncrement();String traceId = entry.getKey();System.out.println("traceId::" + traceId);Integer duration = (int) Double.parseDouble(entry.getValue().toString());durationSum.addAndGet(duration);if (duration > maxDuration.get()) {maxDuration.getAndSet(duration);}durationArray.add(duration);});durationAvg=(durationSum.get())/(count.get());p50=percentile(durationArray.toArray(new Integer[durationArray.size()]),0.5);p75=percentile(durationArray.toArray(new Integer[durationArray.size()]),0.75);p90=percentile(durationArray.toArray(new Integer[durationArray.size()]),0.90);p95=percentile(durationArray.toArray(new Integer[durationArray.size()]),0.95);p99=percentile(durationArray.toArray(new Integer[durationArray.size()]),0.99);Map<String,Object> resultMap = new HashMap<>();resultMap.put("max_resp",maxDuration.get());resultMap.put("mean_resp",durationAvg);resultMap.put("count",count.get());resultMap.put("p50",p50);resultMap.put("p75",p75);resultMap.put("p90",p90);resultMap.put("p95",p95);resultMap.put("p99",p99);return resultMap;}@Overridepublic void run() {if(targetdb.isExisted(traces_index)){logger.info("TracesStatInfo begin...");String endTime =getTimeUtcEndPoint(1,30);String startTime=getTimeUtcEndPoint(2,31);logger.info("startTime:: {}  endTime:: {}",startTime,endTime);try{// 在es trace表中,按bussinesTagList 查找local_time_stamp在当前时间范围内的记录logger.info("statQuery queryDate begins ... {}",System.currentTimeMillis());Map<String, Object> dataMap=targetdb.queryData(traces_index,businessTags,startTime,endTime,"duration");Map<String, Object> resMap = new HashMap<>();if(null!=dataMap) {//Map<String, Object> resMap = new HashMap<>();logger.info("TracesStatInfo resultMap:: {} ", dataMap.toString());resMap = traceProcess(dataMap);// targetdb.createForm(stat_index);//targetdb.insertDate(stat_index, seqNo, resMap);}else{//找不到置0logger.info("StatInfo resultMap is null ");resMap.put("max_resp", 0);resMap.put("mean_resp", 0);resMap.put("count", 0);resMap.put("p50", 0);resMap.put("p75", 0);resMap.put("p90", 0);resMap.put("p95", 0);resMap.put("p99", 0);}//打业务标签和时间戳resMap = getMapWithTags(businessTags, resMap);String seqNo = generateSeqNo(); //生成序号// 加锁lock.lock();targetdb.createForm(stat_index);targetdb.insertDate(stat_index, seqNo, resMap)}catch(Exception e){e.printStackTrace();return;}finally {// 释放锁lock.unlock();}}else{logger.info("trace_index {} is not existed",traces_index);}}@Overridepublic void init(JsonNode paraData, DatasourceService datasourceService, TargetdbService targetdbService) {.....}
}

task.SpanInfo

public class SpanInfo extends AbstractTraceQuery implements TaskService,Runnable{private final static Logger logger = LoggerFactory.getLogger(SpanQueryInfo.class);private static final Lock lock = new ReentrantLock();  //对不同任务的竞争性资源加锁String traces_index, span_index;DatasourceService datasource;TargetdbService targetdb;double percentile;private Map<String,Object> findTraces(Map<String,Object> sourceMap,double percentile){ArrayList<Integer> durationArray = new ArrayList<>();;  //延时集合,用于计算分位数Map<String,Object> resultMap = new HashMap<>(); //结果集合//计算percentile分位sourceMap.entrySet().stream().forEach((Map.Entry<String,Object> entry) ->{Integer duration = (int) Double.parseDouble(entry.getValue().toString());durationArray.add(duration);});double percentileData = percentile(durationArray.toArray(new Integer[0]), percentile);logger.info("percentileData:: {}",percentileData);//查找超过percentile的traceIdsourceMap.entrySet().stream().forEach((Map.Entry<String,Object> entry) ->{double duration = (double) Double.parseDouble(entry.getValue().toString());if(duration>=percentileData){String traceId=entry.getKey().toString();resultMap.put(traceId,duration);}});return resultMap;}@Overridepublic void run() {logger.info("SpanInfo begin...");//建表targetdb.createForm(span_index);try{logger.info("SpanInfo try begin...");//找到当前trace_index索引中所有高出95%的值的traceId集合Map<String, Object> dataMap=targetdb.queryAllData(traces_index,"duration");if(null!=dataMap) {logger.info("SpanInfo resultMap:: {} ", dataMap.toString());//查找高于percentile分位数的值Map<String, Object> resMap = findTraces(dataMap, percentile);logger.info("spanInfo foundedMap:: {} ", resMap.toString());//遍历查询结果,如果span_index中不存在,则查询span后插入span_indexresMap.entrySet().stream().forEach((Map.Entry<String, Object> entry) -> {String traceId = entry.getKey();if (targetdb.isNotInTheIndex(span_index, "traceId", traceId)) {//按traceId查询spanArrayNode spanList = datasource.getTraceSpans(traceId);Map<String, List<Map<String, Object>>> spansMap = genSpanMap(traceId, spanList); //组成SpanList//插入span_indextargetdb.updateDate(span_index, spansMap);}});}else{logger.info("SpanInfo resultMap is null ");}}catch(Exception e){e.printStackTrace();return;}}@Overridepublic void init(JsonNode paraData, DatasourceService datasourceService, TargetdbService targetdbService) {....}
}

完成索引持久化后,就可以以grafana访问ES库形成展示,这部分不展开,看一下效果
在这里插入图片描述
在这里插入图片描述

姑且算抛砖引玉吧,希望各位大佬也分享一下方案

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/526484.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s应用综合实例

k8s应用综合实例 目录 k8s应用综合实例 目录 原文链接 推荐文章 实验环境 实验软件 本节实战 预期 原理 高可用 稳定性 避免单点故障 使用 PDB 健康检查 服务质量 QoS QoS类型 资源回收策略 滚动更新 失败原因 零宕机 HPA 安全性 持久化 Ingress FAQ …

区块链和人工智能的关系以及经典案例

目录 1.区块链与人工智能的关系 2.应用案例&#xff1a;基于区块链的医疗数据共享平台 2.1背景 2.2方案 2.3优势 2.4挑战 区块链技术和人工智能&#xff08;AI&#xff09;是两种不同的技术&#xff0c;但它们之间存在着互补关系。区块链技术提供了一种安全、透明、去中心…

《JAVA与模式》之桥梁模式

系列文章目录 文章目录 系列文章目录前言一、桥梁模式的用意二、桥梁模式的结构三、使用场景四、不使用模式的解决方案五、实现发送加急消息前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂…

前端vite+vue3——可视化页面性能耗时指标(fmp、fp)

文章目录 ⭐前言&#x1f496;vue3系列文章 ⭐可视化fmp、fp指标&#x1f496; MutationObserver 计算 dom的变化&#x1f496; 使用条形图展示 fmp、fp时间 ⭐项目代码⭐结束 ⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享关于 前端vitevue3——可视化页面性能耗时…

【解读】OWASP 大语言模型(LLM)安全测评基准V1.0

大语言模型&#xff08;LLM&#xff0c;Large Language Model&#xff09;是指参数量巨大、能够处理海量数据的模型, 此类模型通常具有大规模的参数&#xff0c;使得它们能够处理更复杂的问题&#xff0c;并学习更广泛的知识。自2022 年以来&#xff0c;LLM技术在得到了广泛的应…

企业AI转型之路:策略与实践

目录 前言1 试点项目&#xff1a;积累AI经验1.1 选择有实际价值的项目1.2 创新氛围的激发1.3 员工对新技术的接受度提升 2 建立高效的内部AI团队2.1 团队独立性与高层直报2.2 初期资金支持与资源整合 3 提供全面的AI培训计划3.1 针对不同层次的培训3.2 多样化培训形式3.3 内部人…

软考71-上午题-【面向对象技术2-UML】-UML中的图2

一、用例图 上午题&#xff0c;考的少&#xff1b;下午题&#xff0c;考的多。 1-1、用例图的定义 用例图展现了一组用例、参与者以及它们之间的关系。 用例图用于对系统的静态用例图进行建模。 可以用下列两种方式来使用用例图&#xff1a; 1、对系统的语境建模&#xff1b…

常见3大web漏洞

常见3大web漏洞 XSS攻击 描述&#xff1a; 跨站脚本&#xff08;cross site script&#xff09;-简称XSS&#xff0c;常出现在web应用中的计算机安全漏桶、web应用中的主流攻击方式。 攻击原理&#xff1a; 攻击者利用网站未对用户提交数据进行转义处理或者过滤不足的缺点。 …

【蓝桥杯-单片机】LED和按键小练习:Led彩灯控制系统

文章目录 【蓝桥杯-单片机】LED和按键小练习&#xff1a;Led彩灯控制系统01 题目描述02 题目解答03 本题总结整体逻辑框架&#xff08;详细版&#xff09;整体逻辑框架&#xff08;缩略版&#xff09;按键读取模块按键消抖模块流水灯显示模式&#xff08;1&#xff09;从上向下…

NoSQL--3.MongoDB配置(Linux版)

目录 2.2 Linux环境下操作 2.2.1 传输MongoDB压缩包到虚拟机&#xff1a; 2.2.2 启动MongoDB服务&#xff1a; 2.2 Linux环境下操作 2.2.1 传输MongoDB压缩包到虚拟机&#xff1a; &#xff08;笔者使用XShell传输&#xff09; 如果不想放在如图的路径&#xff0c;删除操作…

前端框架的发展历史介绍

前端框架的发展历史是Web技术进步的一个重要方面。从最初的简单HTML页面到现在的复杂单页应用程序&#xff08;SPA&#xff09;&#xff0c;前端框架和库的发展极大地推动了Web应用程序的构建方式。以下是一些关键的前端框架和库&#xff0c;以及它们的发布年份、创建者和主要特…

seo蜘蛛池的概念!蚂蚁SEO

蜘蛛池是一种特殊的网络营销技术&#xff0c;它的主要作用是吸引搜索引擎爬虫&#xff0c;提高网站的收录和排名&#xff0c;从而增加网站的流量和曝光度。 蚂蚁SEO是一个SEO工具&#xff0c;可以帮助您提高网站权重&#xff0c;吸引更多的搜索引擎爬虫&#xff0c;提高网站的…