[自研开源] MyData 数据集成之数据过滤 v0.7.2

开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673

概述

本篇基于 数据集成之任务流程 介绍任务执行时的数据过滤的使用场景和配置操作。

使用场景

业务系统与mydata集成时,核心是数据的来和去,在这两个方向上分别实现:数据预清洗数据权限控制

  1. 数据预清洗,从api获取数据后 过滤排除掉“脏”数据,然后再入库用于数据集成;

    在这里插入图片描述

    例如:接口返回的某字段值不能为空、字段值长度在指定范围等;

    以下代码是 提供数据 类型的任务执行过程:

    // 提供数据
    case MdConstant.DATA_PRODUCER:// 调用api 获取jsonString json = ApiUtil.read(taskInfo);// 将json按字段映射 解析为业务数据jobDataService.parseData(taskInfo, json);// 根据条件过滤数据jobDataFilterService.doFilter(taskInfo);// 保存业务数据jobDataService.saveTaskData(taskInfo);// 更新环境变量jobVarService.saveVarValue(taskInfo, json);break;
    

    jobDataFilterService.doFilter 是对数据的预过滤处理,详见 JobDataFilterService.java

    public void doFilter(TaskInfo task) {Assert.notNull(task);// 获取业务数据List<Map> dataList = task.getProduceDataList();// 获取配置的过滤条件List<BizDataFilter> dataFilters = task.getDataFilters();if (CollUtil.isEmpty(dataList) || CollUtil.isEmpty(dataFilters)) {return;}// 定义新的数据集合,用于存储 过滤后的数据List<Map> filterDatas = ListUtil.toList();// 遍历数据,并进行过滤dataList.forEach(data -> {boolean isCorrect = false;for (BizDataFilter filter : dataFilters) {String key = filter.getKey();Object filterValue = filter.getValue();String op = filter.getOp();// 当数据中 不包含 过滤的字段名,则执行下一项过滤if (!data.containsKey(key)) {continue;}// 当数据中 指定字段的值 无效,则过滤该数据Object dataValue = data.get(key);if (ObjectUtil.isNull(dataValue)) {isCorrect = true;break;}// 判断业务数据值 和 过滤数据值 都可对比,否则过滤条件无效if (!(dataValue instanceof Comparable && filterValue instanceof Comparable)) {break;}String cDataValue = dataValue.toString();String cFilterValue = filterValue.toString();// 根据op类型,过滤数据switch (op) {case MdConstant.DATA_OP_EQ:// 等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) == 0);break;case MdConstant.DATA_OP_NE:// 不等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) != 0);break;case MdConstant.DATA_OP_GT:// 大于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) > 0);break;case MdConstant.DATA_OP_GTE:// 大于等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) >= 0);break;case MdConstant.DATA_OP_LT:// 小于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) < 0);break;case MdConstant.DATA_OP_LTE:// 小于等于isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) <= 0);break;default:throw new RuntimeException("JobDataFilter: 不支持的过滤操作");}}// 当 未被过滤,则添加到过滤结果if (isCorrect) {filterDatas.add(data);}});task.setProduceDataList(filterDatas);task.appendLog("过滤前的业务数据:{}", dataList);task.appendLog("过滤条件:{}", dataFilters);task.appendLog("过滤后的业务数据:{}", filterDatas);
    }
    

    注:目前0.7版本暂时实现了关系运算,后续增加函数处理;

  2. 数据权限控制,限制应用接收的数据范围,即符合条件的数据才能共享给应用;

    在这里插入图片描述

    以下代码是 消费数据 类型任务的执行过程:

    // 消费数据
    case MdConstant.DATA_CONSUMER:List<BizDataFilter> filters = taskInfo.getDataFilters();if (CollUtil.isNotEmpty(filters)) {// 解析过滤条件值中的 自定义字符串parseFilterValue(filters);// 排除值为null的条件filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());}String dataCode = taskInfo.getDataCode();if (StrUtil.isNotEmpty(dataCode)) {// 根据过滤条件 查询数据List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);taskInfo.setConsumeDataList(dataList);// 根据字段映射转换为api参数jobDataService.convertData(taskInfo);}// 调用api传输数据ApiUtil.write(taskInfo);break;
    

    bizDataDAO.list 方法支持按配置条件查询数据,详见 BizDataDAO.java

    public List<Map> list(String dbCode, String dataCode, List<BizDataFilter> bizDataFilters) {MongoTemplate mongoTemplate = mongoFactory.getTemplate(dbCode);Query query = new Query();// 遍历数据过滤条件if (CollUtil.isNotEmpty(bizDataFilters)) {// mongodb的查询条件集合List<Criteria> criteriaList = CollUtil.newArrayList();for (BizDataFilter bizDataFilter : bizDataFilters) {// 条件keyString key = bizDataFilter.getKey();// 条件操作String op = bizDataFilter.getOp();// 条件值Object value = bizDataFilter.getValue();// 根据条件操作类型 调用mongodb对应的查询方法Criteria criteria = Criteria.where(key);switch (op) {case MdConstant.DATA_OP_EQ:criteria.is(value);break;case MdConstant.DATA_OP_NE:criteria.ne(value);break;case MdConstant.DATA_OP_GT:criteria.gt(value);break;case MdConstant.DATA_OP_GTE:criteria.gte(value);break;case MdConstant.DATA_OP_LT:criteria.lt(value);break;case MdConstant.DATA_OP_LTE:criteria.lte(value);break;default:throw new RuntimeException("BizDataDAO: 不支持的过滤操作");}// 存入mongodb的查询条件集合criteriaList.add(criteria);}// mongodb查询条件集合 加入查询中query.addCriteria(new Criteria().andOperator(criteriaList));}// 执行查询return mongoTemplate.find(query, Map.class, dataCode);}
    

配置操作

  1. 创建任务过程请参考 使用手册

  2. 在创建任务界面中,添加数据过滤条件

    如下图过滤条件是 salary > 600
    在这里插入图片描述

  3. 执行任务后 通过日志详情可以看到数据入库前预清洗;
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/545126.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

操作系统镜像文件(win,centos)

下载镜像 下载地址 MSDN, 我告诉你 - 做一个安静的工具站 如果想要在vm中全屏的建议下载待spl的镜像文件&#xff0c;不然会搞不起来全屏 找到自己写想要的镜像文件&#xff0c;这边我选了win7 复制圈起来的路径&#xff0c;到迅雷中新建下载即可 安装window可能需要密钥 …

线程是如何在 6 种状态之间转换的

线程是如何在 6 种状态之间转换的 线程的 6 种状态New 新创建Runnable 可运行阻塞状态Blocked 被阻塞Waiting 等待Timed Waiting 限期等待 注意点 主要学习线程是如何在 6 种状态之间转换。 线程的 6 种状态 就像生物从出生到长大、最终死亡的过程一样&#xff0c;线程也有自己…

4.1_1 初识文件管理

文章目录 4.1_1 初识文件管理&#xff08;一&#xff09;文件的属性&#xff08;二&#xff09;文件内部的数据应该怎样组织起来&#xff08;三&#xff09;文件之间应该怎样组织起来&#xff08;四&#xff09;操作系统应该向上提供哪些功能&#xff08;五&#xff09;从上往下…

android studio 连接mumu模拟器调试

1、打开mumu模拟器 2、在Android Studio 中 控制台 cd 到 sdk 目录下 platform-tools 文件夹&#xff0c;有一个adb.exe 可运行程序 一般指令&#xff1a; adb connect 127.0.0.1:7555 但是这个执行在window环境下可能会报错 解决方法是在 adb 之前加 ".\", 问题…

JavaWeb一些开发问题

一、Restful package com.example.crudtest1.pojo;import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;Data NoArgsConstructor AllArgsConstructor public class Result {private Integer code;//响应码&#xff0c;1 代表成功; 0 代表失…

【现代C++】移动语义和右值引用

现代C++中的移动语义和右值引用是C++11引入的重要特性,旨在优化资源管理和提升性能。这些特性允许资源的转移而非复制,特别是对于临时对象和大型对象。 1. 右值引用 右值引用是对临时对象的引用,它允许你修改临时对象。使用&&来声明右值引用。 #include <iost…

【NTN 卫星通信】 TN和多NTN配合的应用场景

1 场景描述 此场景描述了农村环境&#xff0c;其中MNO (运营商TerrA)仅在城市附近提供本地地面覆盖&#xff0c;而MNO (SatA)提供广泛的NTN覆盖。SatA使用GSO轨道和NGSO轨道上的卫星。SatA与TerrA有漫游协议&#xff0c;允许:   所有TerrA用户的连接&#xff0c;当这些用户不…

论文浅尝 | GPT-RE:基于大语言模型针对关系抽取的上下文学习

笔记整理&#xff1a;张廉臣&#xff0c;东南大学硕士&#xff0c;研究方向为自然语言处理、信息抽取 链接&#xff1a;https://arxiv.org/pdf/2305.02105.pdf 1、动机 在很多自然语言处理任务中&#xff0c;上下文学习的性能已经媲美甚至超过了全资源微调的方法。但是&#xf…

第二十五天-Seaborn数据可视化库

目录 1.介绍 2.使用 1.seaborn官网&#xff1a; 2.安装 3.基础用法 4.导入数据 5.分析基金数据 1.绘制每个月收盘价的趋势线 2.计算涨跌幅 3.设置统计基点 4.分布图&#xff1a;分析涨跌幅数量 5.箱型图 6.回归图 7.热力图 1.介绍 1.与matplotlib区别 2.基于matp…

矩阵中移动的最大次数

文章目录 所属专栏:BFS算法 题目链接 思路如下&#xff1a; 1.首先我们需要从第一列开始遍历&#xff0c;寻找每一个都能够满足条件的位置&#xff0c;将它插入到数组里面 2.第一列遍历完了后我们先判断第一列的数是否都满足条件插入到数组里面&#xff0c;如果数组为空&#…

Tomcat Seeion 集群

部署&#xff1a;nginx服务器&#xff1a;11-11&#xff1b;tomcat1:11-3; tomcat2:11-6 nginx服务器11-11做搭建&#xff1a; [rootmcb-11 ~]# systemctl stop firewalld [rootmcb-11 ~]# setenforce 0 [rootmcb-11 ~]# yum install epel-release.noarch -y [rootmcb…

【C++】类和对象终章

&#x1f525;博客主页&#xff1a; 小羊失眠啦. &#x1f3a5;系列专栏&#xff1a;《C语言》 《数据结构》 《C》 《Linux》 《Cpolar》 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 文章目录 一、初始化列表1.1 初始化列表的形式1.2 初始化列表的注意事项 二、explicit关键…