导入JDBC元数据到Apache Atlas

前言

前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美

本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas

数据库schema与catalog

按照SQL标准的解释,在SQL环境下CatalogSchema都属于抽象概念,可以把它们理解为一个容器或者数据库对象命名空间中的一个层次,主要用来解决命名冲突问题。从概念上说,一个数据库系统包含多个Catalog,每个Catalog又包含多个Schema,而每个Schema又包含多个数据库对象(表、视图、字段等),反过来讲一个数据库对象必然属于一个Schema,而该Schema又必然属于一个Catalog,这样我们就可以得到该数据库对象的完全限定名称,从而解决命名冲突的问题了;例如数据库对象表的完全限定名称就可以表示为:Catalog名称.Schema名称.表名称。这里还有一点需要注意的是,SQL标准并不要求每个数据库对象的完全限定名称是唯一的。

从实现的角度来看,各种数据库系统对CatalogSchema的支持和实现方式千差万别,针对具体问题需要参考具体的产品说明书,比较简单而常用的实现方式是使用数据库名作为Catalog名,使用用户名作为Schema名,具体可参见下表:

表1 常用数据库

供应商Catalog支持Schema支持
Oracle不支持Oracle User ID
MySQL不支持数据库名
MS SQL Server数据库名对象属主名,2005版开始有变
DB2指定数据库对象时,Catalog部分省略Catalog属主名
Sybase数据库名数据库属主名
Informix不支持不需要
PointBase不支持数据库名

原文:https://www.cnblogs.com/ECNB/p/4611309.html

元数据模型层级抽象

不同的关系型数据库,其数据库模式有所区别,对应与下面的层级关系

在这里插入图片描述

  • Datasource -> Catalog -> Schema -> Table -> Column
  • Datasource -> Catalog -> Table -> Column
  • Datasource -> Schema -> Table -> Column

元数据转换设计

在这里插入图片描述

提供元数据

借鉴Apache DolphinScheduler中获取Connection的方式,不多赘述。

public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);logger.info("Get connection from datasource {}", datasourceUniqueId);DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());if (null == dataSourceChannel) {throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));}return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);});return dataSourceClient.getConnection();}

转换元数据

  1. 元数据模型

创建数据库的元数据模型

private AtlasEntityDef createJdbcDatabaseDef() {AtlasEntityDef typeDef = createClassTypeDef(DatabaseProperties.JDBC_TYPE_DATABASE,Collections.singleton(DatabaseProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(DatabaseProperties.ATTR_URL, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_DRIVER_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_VERSION, "string"));typeDef.setServiceType(DatabaseProperties.ENTITY_SERVICE_TYPE);return typeDef;
}

创建数据库模式的元数据模型

private AtlasEntityDef createJdbcSchemaDef() {AtlasEntityDef typeDef = AtlasTypeUtil.createClassTypeDef(SchemaProperties.JDBC_TYPE_SCHEMA,Collections.singleton(SchemaProperties.ENTITY_TYPE_DATASET));typeDef.setServiceType(SchemaProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "tables");}});return typeDef;
}

创建数据库表的元数据模型

private AtlasEntityDef createJdbcTableDef() {AtlasEntityDef typeDef = createClassTypeDef(TableProperties.JDBC_TYPE_TABLE,Collections.singleton(TableProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(TableProperties.ATTR_TABLE_TYPE, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "columns");}});return typeDef;
}

创建数据库列的元数据模型

private AtlasEntityDef createJdbcColumnDef() {AtlasEntityDef typeDef = createClassTypeDef(ColumnProperties.JDBC_TYPE_COLUMN,Collections.singleton(ColumnProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_TYPE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_IS_PRIMARY_KEY, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_IS_NULLABLE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_DEFAULT_VALUE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_AUTO_INCREMENT, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);HashMap<String, String> options = new HashMap<>() {{put("schemaAttributes", "[\"name\", \"isPrimaryKey\", \"columnType\", \"isNullable\" , \"isAutoIncrement\", \"description\"]");}};typeDef.setOptions(options);return typeDef;
}

创建实体之间的关系模型

private List<AtlasRelationshipDef> createAtlasRelationshipDef() {String version = "1.0";// 数据库和模式的关系AtlasRelationshipDef databaseSchemasDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "schemas", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "database", SINGLE, false));databaseSchemasDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);AtlasRelationshipDef databaseTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_TABLES,BaseProperties.RELATIONSHIP_DATABASE_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "database", SINGLE, false));databaseTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 模式和数据表的关系// 注意 schema 已经被使用, 需要更换否则会冲突, 例如改为 Jschema(jdbc_schema)AtlasRelationshipDef schemaTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_SCHEMA_TABLES,BaseProperties.RELATIONSHIP_SCHEMA_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "Jschema", SINGLE, false));schemaTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 表和数据列的关系AtlasRelationshipDef tableColumnsDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_TABLE_COLUMNS,BaseProperties.RELATIONSHIP_TABLE_COLUMNS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "columns", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_COLUMN, "table", SINGLE, false));tableColumnsDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);return Arrays.asList(databaseSchemasDef, databaseTablesDef, schemaTablesDef, tableColumnsDef);
}
  1. 提取元数据

    不再赘述

  2. 转换元数据

使用工厂模式,提供不同类型的元数据转换方式

public interface JdbcTransferFactory {JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client);boolean supportType(String type);String getName();
}

List ignorePatterns 用来过滤不想导入的数据库元数据,例如mysqlinformation_schema

public interface JdbcTransfer {void transfer();JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns);
}

举例:JdbcMysqlTransfer 和 MysqlTransferFactory

@AutoService(JdbcTransferFactory.class)
public class MysqlTransferFactory implements JdbcTransferFactory {public static final String MYSQL = "mysql";@Overridepublic JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {return new JdbcMysqlTransfer(metaData, client);}@Overridepublic boolean supportType(String type) {return MYSQL.equalsIgnoreCase(type);}@Overridepublic String getName() {return MYSQL;}
}
public class JdbcMysqlTransfer implements JdbcTransfer {private final Jdbc jdbc;private final AtlasService atlasService;private List<Pattern> ignorePatterns;public JdbcMysqlTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {this.jdbc = new Jdbc(new JdbcMetadata(metaData));this.atlasService = new AtlasService(client);this.ignorePatterns = Collections.emptyList();}@Overridepublic JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns) {this.ignorePatterns = ignorePatterns;return this;}private boolean tableIsNotIgnored(String tableName) {return ignorePatterns.stream().noneMatch(regex -> regex.matcher(tableName).matches());}@Overridepublic void transfer() {// 1.数据库实体转换DatabaseTransfer databaseTransfer = new DatabaseTransfer(atlasService);AtlasEntity databaseEntity = databaseTransfer.apply(jdbc);// 2.表实体转换String catalog = (String) databaseEntity.getAttribute(BaseProperties.ATTR_NAME);List<AtlasEntity> tableEntities = jdbc.getTables(catalog, catalog).parallelStream().filter(jdbcTable -> tableIsNotIgnored(jdbcTable.getTableName())).map(new TableTransfer(atlasService, databaseEntity)).toList();// 3.列转换for (AtlasEntity tableEntity : tableEntities) {String tableName = (String) tableEntity.getAttribute(BaseProperties.ATTR_NAME);List<JdbcPrimaryKey> primaryKeys = jdbc.getPrimaryKeys(catalog, tableName);jdbc.getColumns(catalog, catalog, tableName).parallelStream().forEach(new ColumnTransfer(atlasService, tableEntity, primaryKeys));}}}
  1. 元数据存入Atlas
public class DatabaseTransfer implements Function<Jdbc, AtlasEntity> {private final AtlasService atlasService;public DatabaseTransfer(AtlasService atlasService) {this.atlasService = atlasService;}@Overridepublic AtlasEntity apply(Jdbc jdbc) {String userName = jdbc.getUserName();String driverName = jdbc.getDriverName();String productName = jdbc.getDatabaseProductName();String productVersion = jdbc.getDatabaseProductVersion();String url = jdbc.getUrl();String urlWithNoParams = url.contains("?") ? url.substring(0, url.indexOf("?")) : url;String catalogName = urlWithNoParams.substring(urlWithNoParams.lastIndexOf("/") + 1);// 特殊处理 Oracleif (productName.equalsIgnoreCase("oracle")){catalogName = userName.toUpperCase();urlWithNoParams = urlWithNoParams + "/" + catalogName;}DatabaseProperties properties = new DatabaseProperties();properties.setQualifiedName(urlWithNoParams);properties.setDisplayName(catalogName);properties.setOwner(userName);properties.setUrl(url);properties.setDriverName(driverName);properties.setProductName(productName);properties.setProductVersion(productVersion);// 1.创建Atlas EntityAtlasEntity atlasEntity = new AtlasEntity(DatabaseProperties.JDBC_TYPE_DATABASE, properties.getAttributes());// 2.判断是否存在实体, 存在则填充GUIDMap<String, String> searchParam = Collections.singletonMap(DatabaseProperties.ATTR_QUALIFIED_NAME, urlWithNoParams);Optional<AtlasEntityHeader> entityHeader = atlasService.checkAtlasEntityExists(DatabaseProperties.JDBC_TYPE_DATABASE, searchParam);entityHeader.ifPresent(header -> atlasEntity.setGuid(header.getGuid()));// 3,存储或者更新到Atlas中if (entityHeader.isPresent()){atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));}else {AtlasEntityHeader header = atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));atlasEntity.setGuid(header.getGuid());}return atlasEntity;}
}

效果展示

  1. 元数据类型定义

在这里插入图片描述

在这里插入图片描述

  1. 测试导入元数据

由于mysql没有采用schema,因此jdbc_schema为空

在这里插入图片描述

如图所示,可以清晰的了解mysql数据库中demo数据库的数据表内容

在这里插入图片描述

数据表元数据,qualifiedName使用数据库连接url.表名
在这里插入图片描述

如同所示,数据表内各个列的元数据;可以清晰的了解该数据表的各个字段信息

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/257202.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DDD架构思想专栏一《初识领域驱动设计DDD落地》

引言 最近准备给自己之前写的项目做重构&#xff0c;这是一个单体架构的小项目&#xff0c;后端采用的是最常见的三层架构。因为项目比较简单&#xff0c;其实采用三层架构就完全够了。但是呢&#xff0c;小编最近在做DDD架构的项目&#xff0c;于是就先拿之前写的一个老项目试…

使用React 18、Echarts和MUI实现温度计

关键词 React 18 Echarts和MUI 前言 在本文中&#xff0c;我们将结合使用React 18、Echarts和MUI&#xff08;Material-UI&#xff09;库&#xff0c;展示如何实现一个交互性的温度计。我们将使用Echarts绘制温度计的外观&#xff0c;并使用MUI创建一个漂亮的用户界面。 本文…

pytorch优化之SAM优化器

1. SAM介绍 人机验证 2. 案例 ❀精度优化❀优化策略1&#xff1a;网络SAM优化器_夏天&#xff5c;여름이다的博客-CSDN博客文章浏览阅读3.3k次&#xff0c;点赞10次&#xff0c;收藏30次。精度优化策略&#xff1a;SAM:Sharpness AwarenessMinimization锐度感知最小化论文&…

css 十字分割线(含四等分布局)

核心技术 伪类选择器含义li:nth-child(2)第2个 lili:nth-child(n)所有的lili:nth-child(2n)所有的第偶数个 lili:nth-child(2n1)所有的第奇数个 lili:nth-child(-n5)前5个 lili:nth-last-child(-n5)最后5个 lili:nth-child(7n)选中7的倍数 border-right: 3px solid white;borde…

Java 简易版 TCP UDP聊天

客户端 import java.io.*; import java.net.Socket; import java.util.Date; import javax.swing.*;public class MyClient {private JFrame jf;private JButton jBsend;private JTextArea jTAcontent;private JTextField jText;private JLabel JLcontent;private Date data;pr…

vue3日常知识点学习归纳

1&#xff0c;父子组件传递&#xff1a; 父组件传递参数 <template><div><!-- 子组件 参数&#xff1a;num 、nums --><child :num"nums.num" :doubleNum"nums.doubleNum" increase"handleIncrease"></child>&l…

HarmonyOS4.0从零开始的开发教程10管理组件状态

HarmonyOS&#xff08;八&#xff09;管理组件状态 概述 在应用中&#xff0c;界面通常都是动态的。如图1所示&#xff0c;在子目标列表中&#xff0c;当用户点击目标一&#xff0c;目标一会呈现展开状态&#xff0c;再次点击目标一&#xff0c;目标一呈现收起状态。界面会根…

【Proteus仿真】【STM32单片机】蓝牙遥控小车

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真STM32单片机控制器&#xff0c;使LCD1602液晶&#xff0c;L298电机&#xff0c;直流电机&#xff0c;HC05/06蓝牙模块等。 主要功能&#xff1a; 系统运行后&#xff0c;LCD1602显…

LAMP和分离式LNMP部署

目录 一.什么是LAMP&#xff1f; 二.安装LAMP 先安装apache&#xff0c;httpd网页服务&#xff1a; 接着安装mysql&#xff1a; 安装php&#xff1a; 创建论坛&#xff1a; 三.安装分布式LNMP&#xff1a; 先安装nginx&#xff1a; 到另一台主机安装php&#xff1a; …

论文阅读:LSeg: LANGUAGE-DRIVEN SEMANTIC SEGMENTATION

可以直接bryanyzhu的讲解&#xff1a;CLIP 改进工作串讲&#xff08;上&#xff09;【论文精读42】_哔哩哔哩_bilibili 这里是详细的翻译工作 原文链接 https://arxiv.org/pdf/2201.03546.pdf ICLR 2022 0、ABSTRACT 我们提出了一种新的语言驱动的语义图像分割模型LSeg。…

PCL 点云最小二乘法拟合二维圆

文章目录 一、原理概述二、实现代码三、实现效果参考资料一、原理概述 二、实现代码 // 标准文件 #include <iostream>// PCL #include <pcl/io/pcd_io.h>

二维码智慧门牌管理系统升级:行政区划维护功能详解

文章目录 前言一、行政区划维护解决方案二、解决方案优势 前言 随着科技不断发展&#xff0c;二维码智慧门牌管理系统已成为物业管理和社区服务等领域的重要工具。在此系统升级解决方案中&#xff0c;行政区划维护功能愈发显得重要。我们将详细介绍这一功能&#xff0c;助您更…