ccframe系统的链路追踪,用户ID追踪的实现

需求

之前ccframe cloud V1用的是springcloud微服务,只需要在header将jwttoken一直传下去就没事,最近弄V2转dubbo发现用户id没有自动保存进数据库表。于是开始研究dubbo如何追踪,顺便把链路追踪ID的问题给一并解决掉。

理论

MDC

MDC(Mapped Diagnostic Context,映射调试上下文)是Slf4j(提供了接口定义和核心实现,日志库负责适配器的实现)提供的一种方便在多线程条件下记录日志的功能。

MDC 可以看成是一个与当前线程绑定的Map,可以往其中添加键值对。MDC 中包含的内容可以被同一线程中执行的代码所访问。当前线程的子线程会继承其父线程中的 MDC 的内容。当需要记录日志时,只需要从 MDC 中获取所需的信息即可。MDC 的内容则由程序在适当的时候保存进去。对于一个 Web 应用来说,通常是在请求被处理的最开始保存这些数据。

简而言之,MDC就是日志框架提供的一个InheritableThreadLocal,项目代码中可以将键值对放入其中,然后使用指定方式取出打印即可。

RpcContext

RpcContext 本质上是一个使用 ThreadLocal 实现的临时状态记录器,RPC请求时会自动给传递给下游服务,当RPC请求结束的时候,当前线程的RpcContex会清空

实现

原理图

生成追踪信息

了解了MDC的原理可知,MDC数据是线程级别的,那么我们可以放在一次线程调用最靠前的位置。对于当前的ccframe系统,我们使用了spring security,而spring security的filter在请求比较靠前的位置,因此我们可以在JwtHeadFilter进行对应的处理。在解析完用户的信息后,将链路ID及会员ID放入MDC。链路ID这里采用Htool的短NanoId形式,因为本身请求会有时间属性,我们只需要在短时间(一次链路请求时间范围),例如几分钟内,不重复即可方便关联日志。这里采用2个记录量:

  1. userId 记录操作人
  2. traceId 记录一次链路请求
package org.ccframe.commons.auth;import cn.hutool.core.lang.id.NanoId;
import com.alibaba.fastjson.JSON;
import io.jsonwebtoken.JwtException;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.http.HttpStatus;
import org.ccframe.commons.util.IpUtil;
import org.ccframe.commons.util.JwtUtil;
import org.ccframe.commons.util.UUIDUtil;
import org.ccframe.config.GlobalEx;
import org.ccframe.subsys.core.domain.code.RoleCodeEnum;
import org.ccframe.subsys.core.dto.Result;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.slf4j.MDC;
import org.springframework.context.MessageSource;
import org.springframework.http.MediaType;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.web.authentication.WebAuthenticationDetails;
import org.springframework.web.filter.OncePerRequestFilter;
import org.springframework.web.servlet.LocaleResolver;import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;public class JwtHeadFilter extends OncePerRequestFilter {private final RedissonClient redissonClient;private final LocaleResolver localeResolver;private final MessageSource messageSource;public JwtHeadFilter(RedissonClient redissonClient, MessageSource messageSource, LocaleResolver localeResolver) {this.redissonClient = redissonClient;this.localeResolver = localeResolver;this.messageSource = messageSource;}@Overrideprotected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)throws ServletException, IOException {String traceId = NanoId.randomNanoId(8);MDC.put(GlobalEx.TRACE_ID, traceId);RpcContext.getContext().setAttachment(GlobalEx.TRACE_ID, traceId); // 同时放入RPC上下文String uri = request.getRequestURI();String token = null;boolean adminUrl = false;if(uri.startsWith("/" + GlobalEx.ADMIN_URI_PERFIX)) { //后端用户token = request.getHeader(GlobalEx.ADMIN_TOKEN);adminUrl = true;}else if(uri.startsWith("/" + GlobalEx.API_URI_PERFIX)) { //前端用户token = request.getHeader(GlobalEx.API_TOKEN);}else {filterChain.doFilter(request, response);return;}if (!adminUrl && StringUtils.isEmpty(token)){ // 前台的直接退出了filterChain.doFilter(request,response);return;}List<SimpleGrantedAuthority> authorityList = new ArrayList<>();TokenUser tokenUser = null;if(StringUtils.isNotEmpty(token)){ //有token情况try {tokenUser = JwtUtil.decodeData(token, TokenUser.class);authorityList = tokenUser.getRoleIds().stream().map(item->new SimpleGrantedAuthority("ROLE_"+item)).collect(Collectors.toList());}catch(IllegalArgumentException|JwtException e) { //构造无权访问的json返回信息response.setStatus(HttpServletResponse.SC_FORBIDDEN);response.setContentType(MediaType.APPLICATION_JSON_VALUE);response.setCharacterEncoding("UTF-8");response.setHeader("Cache-Control", "no-cache, must-revalidate");try {Locale currentLocale = localeResolver.resolveLocale(request);String message = messageSource.getMessage("errors.auth.noAuth", GlobalEx.EMPTY_ARGS, currentLocale); // 未登陆/无权限response.getWriter().write(JSON.toJSONString(Result.error(HttpStatus.SC_FORBIDDEN, message, "errors.auth.noAuth", e.getMessage())));logger.error(e.getMessage());response.flushBuffer();return;} catch (IOException ioe) {logger.error("与客户端通讯异常:" + e.getMessage(), e);e.printStackTrace();}}}if(adminUrl) { // 后台,尝试匹配IP白名单String remoteIp = IpUtil.getRemoteIp(request);RBucket<String> whiteIpBucket = redissonClient.getBucket(GlobalEx.CACHKEY_WHITE_IP, StringCodec.INSTANCE);String whiteIp = whiteIpBucket.get();if(StringUtils.isNotEmpty(remoteIp) && StringUtils.isNotEmpty(whiteIp) && remoteIp.equals(whiteIp)){ // 后台操作匹配白名单权限authorityList.add(new SimpleGrantedAuthority("ROLE_" + RoleCodeEnum.WHITE_IP.toCode()));}}if(!authorityList.isEmpty()){//认证,根据ROLE集合解析即可JwtAuthenticationToken jwtAuthenticationToken =  new JwtAuthenticationToken(tokenUser, // 如果只是IP白名单直接访问,tokenUser可能为NullauthorityList);MDC.put(GlobalEx.TRACE_USER_ID, tokenUser.getUserId().toString());RpcContext.getContext().setAttachment(GlobalEx.TRACE_USER_ID, tokenUser.getUserId().toString()); // 同时放入RPC上下文jwtAuthenticationToken.setDetails(new WebAuthenticationDetails(request));SecurityContextHolder.getContext().setAuthentication(jwtAuthenticationToken); //授权对象放入上下文}filterChain.doFilter(request,response);}
}

传递信息

生成MDC信息后,我们需要在dubbo服务请求时进行传递。dubbo的RpcContext机制能够保证数据传递到下游的处理服务。唯一要做的是把RpcContext放入到MDC,这样下游服务里的所有日志都具备链路信息了。dubbo有一个扩展机制,可以在消费服务的时候进行切面处理。定义对应的Bean即可

package org.ccframe.commons.helper;import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.*;
import org.ccframe.config.GlobalEx;
import org.slf4j.MDC;@Activate(group = {"provider"})
public class TraceProviderFilter implements Filter {@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {String traceId = RpcContext.getContext().getAttachment(GlobalEx.TRACE_ID);if (traceId != null) {MDC.put(GlobalEx.TRACE_ID, traceId);};String traceUserId = RpcContext.getContext().getAttachment(GlobalEx.TRACE_USER_ID);if (traceId != null) {MDC.put(GlobalEx.TRACE_USER_ID, traceId);};try {return invoker.invoke(invocation);} finally {MDC.remove(GlobalEx.TRACE_ID);MDC.remove(GlobalEx.TRACE_USER_ID);}}
}

关联操作数据

这个是原来就实现的自动记录操作人的方案。原理是采用JPA的EntityListeners方案持久化监听器,原方案是在springsecurity的上下文里获取,由于微服务后请求跨机器,因此改从MDC里获取(前面已经把RpcContext放入到MDC了)

这个是数据库操作基类,EntityListeners注解定义

package org.ccframe.commons.base;import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.ccframe.commons.helper.EntityOperationListener;
import org.ccframe.config.GlobalEx;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;import javax.persistence.*;
import java.util.Date;/*** SAAS数据操作基础类* @author JIM**/
@MappedSuperclass
@Setter
@Getter
@EntityListeners({EntityOperationListener.class})
public abstract class BaseEntity implements IBaseEntity {private static final long serialVersionUID = -5656014821398158846L;public static final String CREATE_TIME = "createTime";public static final String UPDATE_TIME = "updateTime";public static final String CREATE_USER_ID = "createUserId"; //创建人public static final String UPDATE_USER_ID = "updateUserId"; //最后修改人public static final String TENANT_ID = "tenantId"; // 租户,所有资源按照租户隔离,提供检测annotation@Temporal(TemporalType.TIMESTAMP)@Column(name = "CREATE_TIME", nullable = false, length = 0, updatable = false)//elasticsearch@Field(type = FieldType.Date, format = DateFormat.custom, pattern = GlobalEx.ES_DATE_PATTERN)@JsonFormat (shape = JsonFormat.Shape.STRING, pattern = GlobalEx.STANDERD_DATE_FORMAT, timezone = GlobalEx.TIMEZONE)private Date createTime;@Temporal(TemporalType.TIMESTAMP)@Column(name = "UPDATE_TIME", nullable = false, length = 0)//elasticsearch@Field(type = FieldType.Date, format = DateFormat.custom, pattern = GlobalEx.ES_DATE_PATTERN)@JsonFormat (shape = JsonFormat.Shape.STRING, pattern = GlobalEx.STANDERD_DATE_FORMAT, timezone = GlobalEx.TIMEZONE)private Date updateTime;@Column(name = "CREATE_USER_ID", nullable = true, length = 10, updatable = false)@Field(type = FieldType.Integer)private Integer createUserId;@Column(name = "UPDATE_USER_ID", nullable = true, length = 10)@Field(type = FieldType.Integer)private Integer updateUserId;@Column(name = "TENANT_ID", nullable = true, length = 10)@Field(type = FieldType.Integer)private Integer tenantId;@Overridepublic boolean equals(Object obj) {if(!(getClass().isInstance(obj))){return false;}if(this == obj){return true;}BaseEntity other = (BaseEntity)obj;return new EqualsBuilder().append(getId(),other.getId()).isEquals();}@Overridepublic int hashCode() {return new HashCodeBuilder().append(getId()).toHashCode();}}

然后EntityListeners里实现持久化新增和更新逻辑

package org.ccframe.commons.helper;import org.ccframe.commons.auth.TokenUser;
import org.ccframe.commons.base.BaseEntity;
import org.ccframe.commons.base.IBaseEntity;
import org.ccframe.config.GlobalEx;
import org.slf4j.MDC;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;import javax.persistence.PrePersist;
import javax.persistence.PreRemove;
import javax.persistence.PreUpdate;
import java.util.Date;public class EntityOperationListener {private ICcTransactionHelper ccTransactionHelper;private ICcTransactionHelper getCcTransactionHelper(){if(ccTransactionHelper == null){ccTransactionHelper = SpringContextHelper.getBean(ICcTransactionHelper.class);}return ccTransactionHelper;}@PrePersistprotected void onCreate(Object baseEntity) {IBaseEntity targetEntity = (IBaseEntity)baseEntity;//采用dubbo的MDC透传操作用户userId及请求IDString userId = MDC.get(GlobalEx.TRACE_USER_ID);if(userId != null) { // MDC上下文里有targetEntity.setCreateUserId(Integer.valueOf(userId));}Date now = new Date();targetEntity.setCreateTime(now);targetEntity.setUpdateTime(now);getCcTransactionHelper().pushSave(targetEntity);}@PreUpdateprotected void onUpdate(Object baseEntity) {IBaseEntity targetEntity = (IBaseEntity)baseEntity;//采用dubbo的MDC透传操作用户userId及请求IDString userId = MDC.get(GlobalEx.TRACE_USER_ID);if(userId != null) { // MDC上下文里有targetEntity.setUpdateUserId(Integer.valueOf(userId));}targetEntity.setUpdateTime(new Date());getCcTransactionHelper().pushSave(targetEntity);}@PreRemoveprotected void onRemove(BaseEntity baseEntity) {getCcTransactionHelper().pushDelete(baseEntity);}
}

新增和修改操作一下,看到对应的新增用户和修改用户记录都写入了。这样每个表都具备了简单操作记录

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/616506.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在 Elasticsearch 中扩展 ML 推理管道:如何避免问题并解决瓶颈

作者&#xff1a;来自 Elastic Iulia Feroli 是时候考虑语义搜索运营了吗&#xff1f; 无论你是一位经验丰富的搜索工程师&#xff0c;希望探索新的人工智能功能&#xff0c;还是一位机器学习专家&#xff0c;希望更多地利用搜索基础设施来增强语义相似性模型 —— 充分利用这…

anaconda创建了虚拟python环境,且安装了pytorch,但是pycharm中import torch运行时报错

报错如下&#xff1a; C:\Users\tashi\.conda\envs\test1\python.exe D:\project\python\transformer\main.py C:\Users\tashi\.conda\envs\test1\lib\site-packages\numpy\__init__.py:127: UserWarning: mkl-service package failed to import, therefore Intel(R) MKL init…

供应链复杂业务实时数仓建设之路

供应链复杂业务实时数仓建设之路 背景 供应链业务是纷繁复杂的&#xff0c;我们既有 JIT 的现货模式中间夹着这大量的仓库作业环节&#xff0c;又有到仓的寄售&#xff0c;品牌业务&#xff0c;有非常复杂的逆向链路。在这么复杂的业务背后&#xff0c;我们需要精细化关注人货…

Redis搭建主从

Redis搭建主从: 1:拉取Redis镜像 docker pull redis2:创建主从对应的目录结构 3:对redis6379.log,redis6380.log,redis6381.log进行授权 chmod 777 redis6379.log chmod 777 redis6380.log chmod 777 redis6381.log4:修改主(master)的配置文件 5:创建主(master) redis_6379 …

express服务器 authorization 前端获取不到的问题

服务器生成token 设置在响应头&#xff0c;但是前端获取不到 const token JWT.generate({ id: new Date().getTime(), userName }, 10s) res.header(Authorization, token) axios.interceptors.response.use((response) > {console.log(response);if (response.data?.co…

【opencv】示例-neural_network.cpp 使用机器学习模块创建并训练一个简单的多层感知机(神经网络)模型...

#include <opencv2/ml/ml.hpp> // 引入OpenCV的机器学习模块using namespace std; // 使用标准命名空间 using namespace cv; // 使用OpenCV命名空间 using namespace cv::ml; // 使用OpenCV机器学习命名空间int main() {//创建随机训练数据Mat_<float> data(100, …

Linux的内存管理子系统

大家好&#xff0c;今天给大家介绍Linux的内存管理子系统&#xff0c;文章末尾附有分享大家一个资料包&#xff0c;差不多150多G。里面学习内容、面经、项目都比较新也比较全&#xff01;可进群免费领取。 Linux的内存管理子系统是Linux内核中一个非常重要且复杂的子系统&#…

文件IO/标准IO

1.标准I/O 标准&#xff1a; 任何操作系统皆可使用&#xff1b;使用范围很广 系统调用和库函数 系统调用&#xff1a;操作系统给我们提供的接口 man 2 printf 可以调用内核中的接口 直接驱动显卡运行 printf实际是库函数中的函数&#xff0c;然后调用系…

openkylin系统通过网线连接ubuntukylin系统上网攻略

openkylin系统通过网线连接ubuntukylin系统上网攻略 主机1&#xff1a;x64 amd &#xff0c;系统&#xff1a;ubuntukylin 22.04 &#xff0c;状态&#xff1a;通过wifi连接热点进行上网&#xff0c;并共享网络。 主机2&#xff1a;x64 intel &#xff0c;系统&#xff1a;ope…

字体体积压缩

环境:python3 关键步骤: pip install fontTools目录详情: 执行 pyftsubset.exe SourceHanSansCN-Medium.ttf --text-file3500.txt然后打开:TTF To Woff2,选择文件上传,等待处理,下载,使用 附常用汉字,字体文件请善用百度 3500.txt 工才下寸丈大与万上小口山巾千乞川亿个…

Redis队列与Stream

Redis队列与Stream、Redis 6多线程详解 Redis队列与StreamStream总述常用操作命令生产端消费端单消费者消费组消息消费 Redis队列几种实现的总结基于List的 LPUSHBRPOP 的实现基于Sorted-Set的实现PUB/SUB&#xff0c;订阅/发布模式基于Stream类型的实现与Java的集成消息队列问…

thinkphp6入门(23)-- 如何导入excel

1. 安装phpexcel composer require phpoffice/phpexcel composer update 2. 前端 <form class"forms-sample" action"../../xxxx/xxxx/do_import_users" method"post" enctype"multipart/form-data"><div class"cont…