trino-435:dynamic catalog数据库存储代码实现

一、dynamic catalog数据库存储源码分析

dynamic catalog的实现主要涉及到两个类:CoordinatorDynamicCatalogManager、WorkerDynamicCatalogManager,这两个类的详细信息如下:
在这里插入图片描述

在这里插入图片描述
这两个类主要提供了对catalog的增删改查的方法。trino-435源码中WorkerDynamicCatalogManager类并没有实现CatalogManager接口,需要对该类进行修改实现CatalogManager接口并实现接口中的方法,完成worker节点对catalog增删改查功能

二、JdbcStroeCatalog类的具体实现

该类的详细信息如下:
在这里插入图片描述
在代码试下中在构造方法中完成从数据库中加载catalog,并通过内部类中的loadProperties方法完成catalog属性加载,代码具体实现如下:

public final class JdbcCatalogStoreimplements CatalogStore
{private static final Logger log = Logger.get(JdbcCatalogStore.class);private final boolean readOnly;private final Jdbi catalogsJdbi;private final Boolean isCoordinator;private final ConcurrentMap<String, StoredCatalog> catalogs = new ConcurrentHashMap<>();@Injectpublic JdbcCatalogStore(JdbcCatalogStoreConfig config, ServerConfig serverConfig){requireNonNull(config, "config is null");readOnly = config.isReadOnly();isCoordinator = serverConfig.isCoordinator();String catalogsUrl = config.getCatalogConfigDbUrl();String catalogsUser = config.getCatalogConfigDbUser();String catalogsPassword = config.getCatalogConfigDbPassword();loaderJdbcDriver(this.getClass().getClassLoader(), "com.mysql.cj.jdbc.Driver", catalogsUrl);catalogsJdbi = Jdbi.create(catalogsUrl, catalogsUser, catalogsPassword);List<String> disabledCatalogs = firstNonNull(config.getDisabledCatalogs(), ImmutableList.of());List<JdbcStoredCatalog> dbCatalogs = catalogsJdbi.withHandle(handle -> {handle.execute("CREATE TABLE IF NOT EXISTS `catalogs`( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL COMMENT 'catalog名称', `properties` text, `create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间', `update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间', PRIMARY KEY (`id`), UNIQUE KEY index_name (`name`))");return handle.createQuery("SELECT name, properties FROM catalogs").mapToBean(JdbcStoredCatalog.class).list();});for (JdbcStoredCatalog catalog : dbCatalogs) {String catalogName = catalog.getName();checkArgument(!catalogName.equals(GlobalSystemConnector.NAME), "Catalog name SYSTEM is reserved for internal usage");if (disabledCatalogs.contains(catalogName)) {log.info("Skipping disabled catalog %s", catalogName);continue;}catalogs.put(catalog.getName(), catalog);}}@Overridepublic Collection<StoredCatalog> getCatalogs(){return ImmutableList.copyOf(catalogs.values());}@Overridepublic CatalogProperties createCatalogProperties(String catalogName, ConnectorName connectorName, Map<String, String> properties){checkModifiable();return new CatalogProperties(createRootCatalogHandle(catalogName, computeCatalogVersion(catalogName, connectorName, properties)),connectorName,ImmutableMap.copyOf(properties));}@Overridepublic void addOrReplaceCatalog(CatalogProperties catalogProperties){checkModifiable();String catalogName = catalogProperties.getCatalogHandle().getCatalogName();Properties properties = new Properties();properties.setProperty("connector.name", catalogProperties.getConnectorName().toString());properties.putAll(catalogProperties.getProperties());String stringProperties = JSONObject.toJSONString(properties);log.info("add catalog %s with properties %s", catalogName, stringProperties);JdbcStoredCatalog jdbcCatalog = new JdbcStoredCatalog(catalogName, stringProperties);if (isCoordinator) {log.info("The coordinator node catalog needs to be persisted to the database");catalogsJdbi.withHandle(handle -> {handle.createUpdate("INSERT INTO catalogs (name,properties) VALUES (:name, :properties)").bind("name", catalogName).bind("properties", stringProperties).execute();return null;});}catalogs.put(catalogName, jdbcCatalog);}@Overridepublic void removeCatalog(String catalogName){checkModifiable();if (isCoordinator) {log.info("The coordinator node catalog must support persistent deletion");catalogsJdbi.withHandle(handle -> {handle.createUpdate("DELETE FROM catalogs WHERE name = :name").bind("name", catalogName).execute();return null;});}catalogs.remove(catalogName);}private void checkModifiable(){if (readOnly) {throw new TrinoException(NOT_SUPPORTED, "Catalog store is read only");}}/*** This is not a generic, universal, or stable version computation, and can and will change from version to version without warning.* For places that need a long term stable version, do not use this code.*/static CatalogVersion computeCatalogVersion(String catalogName, ConnectorName connectorName, Map<String, String> properties){Hasher hasher = Hashing.sha256().newHasher();hasher.putUnencodedChars("catalog-hash");hashLengthPrefixedString(hasher, catalogName);hashLengthPrefixedString(hasher, connectorName.toString());hasher.putInt(properties.size());ImmutableSortedMap.copyOf(properties).forEach((key, value) -> {hashLengthPrefixedString(hasher, key);hashLengthPrefixedString(hasher, value);});return new CatalogVersion(hasher.hash().toString());}private static void hashLengthPrefixedString(Hasher hasher, String value){hasher.putInt(value.length());hasher.putUnencodedChars(value);}public static class JdbcStoredCatalogimplements StoredCatalog{private String name;private String properties;public JdbcStoredCatalog() {}public JdbcStoredCatalog(String name, String properties){this.name = name;this.properties = properties;}@ColumnName("properties")public String getProperties(){return properties;}public void setProperties(String properties){this.properties = properties;}@ColumnName("name")@Overridepublic String getName(){return name;}public void setName(String name){this.name = name;}@Overridepublic CatalogProperties loadProperties(){final Properties properties = convertStringToProperties(this.properties);Map<String, String> props = new HashMap<>(fromProperties(properties).entrySet().stream().collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().trim())));String connectorNameValue = props.remove("connector.name");checkState(connectorNameValue != null, "Catalog configuration %s does not contain 'connector.name'", this.name);if (connectorNameValue.indexOf('-') >= 0) {String deprecatedConnectorName = connectorNameValue;connectorNameValue = connectorNameValue.replace('-', '_');log.warn("Catalog '%s' is using the deprecated connector name '%s'. The correct connector name is '%s'", name, deprecatedConnectorName, connectorNameValue);}ConnectorName connectorName = new ConnectorName(connectorNameValue);CatalogHandle catalogHandle = createRootCatalogHandle(name, computeCatalogVersion(name, connectorName, props));return new CatalogProperties(catalogHandle, connectorName, ImmutableMap.copyOf(props));}}public static Properties convertStringToProperties(String json) {ObjectMapper objectMapper = new ObjectMapper();Properties properties = new Properties();try {Object jsonObject = objectMapper.readValue(json, Object.class);if (jsonObject instanceof Map) {Map<String, String> map = (Map<String, String>) jsonObject;for (Map.Entry<String, String> entry : map.entrySet()) {properties.setProperty(entry.getKey(), entry.getValue());}} else {throw new IllegalArgumentException("The JSON string should contain a Map object");}} catch (Exception e) {throw new RuntimeException(e.getMessage(), e);}return properties;}private static void loaderJdbcDriver(ClassLoader classLoader, String driverClassName, String catalogUrl) {try {final Class<?> clazz = Class.forName(driverClassName, true, classLoader);final Driver driver = (Driver) clazz.newInstance();if (!driver.acceptsURL(catalogUrl)) {log.error("Jdbc driver loading error. Driver {} cannot accept url.", driverClassName);throw new RuntimeException("Jdbc driver loading error.");}} catch (final Exception e) {throw new RuntimeException("Jdbc driver loading error.", e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/326769.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

R304S 指纹识别模块指令系统

1 指令集 1. GR_GetImage 指令代码&#xff1a;01H 功能&#xff1a;从传感器上读入图像存于图像缓冲区 2. GR_GenChar 指令代码&#xff1a;02H 功能&#xff1a;根据原始图像生成指纹特征存于 CharBuffer1 或 CharBuffer2 3. GR_Match 指令代码&#xff1a;03H 功能&a…

Vue-2、初识Vue

1、helloword小案列 代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>初始Vue</title><!--引入vue--><script type"text/javascript" src"https://cdn.jsdelivr.n…

深度学习(学习记录)

题型&#xff1a;填空题判断题30分、简答题20分、计算题20分、综合题&#xff08;30分&#xff09; 综合题&#xff08;解决实际工程问题&#xff0c;不考实验、不考代码、考思想&#xff09; 一、深度学习绪论&#xff08;非重点不做考察&#xff09; 1、传统机器学习&…

Qt6入门教程 3:创建Hello World项目

一.新建一个项目 程序员的职业生涯都是从一声问候开始的&#xff0c;我们的第一个Qt项目也是HelloWorld 首先要说明的是&#xff0c;IDE不一定要用Qt Creator&#xff0c;用Visual Studio、VSCode、CLion也可以搭建Qt开发环境&#xff0c;它们都相应的插件来支持Qt开发。当然这…

未完成销量任务的智己汽车突发大规模车机故障,竞争压力不小

2024年刚开年&#xff0c;智己汽车便上演了一出“开门黑”。 近日&#xff0c;不少车主在社交平台发帖&#xff0c;反映智己LS6出现大规模车机故障&#xff0c;包括但不限于主驾驶屏幕不显示车速、档位、行驶里程&#xff0c;左右转盲区显示失效&#xff0c;无转向灯、雷达提醒…

Linux部署Yearning并结合内网穿透工具实现公网访问本地web管理界面

文章目录 前言1. Linux 部署Yearning2. 本地访问Yearning3. Linux 安装cpolar4. 配置Yearning公网访问地址5. 公网远程访问Yearning管理界面6. 固定Yearning公网地址 前言 Yearning 简单, 高效的MYSQL 审计平台 一款MYSQL SQL语句/查询审计工具&#xff0c;为DBA与开发人员使用…

损失函数篇 | YOLOv8 引入 Shape-IoU 考虑边框形状与尺度的度量

作者导读&#xff1a;Shape-IoU&#xff1a;考虑边框形状与尺度的度量 论文地址&#xff1a;https://arxiv.org/abs/2312.17663 作者视频解读&#xff1a;https://www.bilibili.com 开源代码地址&#xff1a;https://github.com/malagoutou/Shape-IoU/blob/main/shapeiou.py…

【激活函数】PReLU 激活函数

1、介绍 PReLU&#xff08;Parametric Rectified Linear Unit&#xff09;激活函数是ReLU&#xff08;Rectified Linear Unit&#xff09;激活函数的一种改进。它是由 He et al. 在 2015 年提出的&#xff0c;旨在解决ReLU激活函数的一些局限性。 # 定义 PReLU 激活函数 prelu…

合合TextIn团队发布 - 文档图像多模态大模型技术发展、探索与应用

合合信息TextIn&#xff08;Text Intelligence&#xff09;团队在2023年12月31日参与了中国图象图形学学会青年科学家会议 - 垂直领域大模型论坛。在会议上&#xff0c;丁凯博士分享了文档图像大模型的思考与探索&#xff0c;完整阐述了多模态大模型在文档图像领域的发展与探索…

FSMC—扩展外部SRAM

一、SRAM控制原理 STM32控制器芯片内部有一定大小的SRAM及FLASH作为内存和程序存储空间&#xff0c;但当程序较大&#xff0c;内存和程序空间不足时&#xff0c;就需要在STM32芯片的外部扩展存储器了。STM32F103ZE系列芯片可以扩展外部SRAM用作内存。 给STM32芯片扩展内存与给…

9个最受欢迎的开源自动化测试框架盘点

如果想学习提升找不到资料&#xff0c;没人答疑解惑时&#xff0c;请及时加入群&#xff1a;1150305204&#xff0c;里面有各种测试开发资料和技术可以一起交流哦。 自动化测试框架可以帮助测试人员评估多个web和移动应用程序的功能&#xff0c;安全性&#xff0c;可用性和可访…

Mac启动时候出现禁止符号

Mac启动时候出现禁止符号 启动时候出现禁止符号,意味着 选定的启动磁盘 包含 Mac 操作系统&#xff0c;但它不是 您的 Mac 可以使用的 macOS 。您应该在这个磁盘上 重新安装 macOS 。 可以尝试以下苹果提供的方法&#xff1a; Mac启动时候出现禁止符号 不要轻易抹除磁盘&am…