Flink Sql 自定义实现 kudu connector

Flink Sql 自定义实现 kudu connector

  • 原理
  • 实现

众所周知啊,flinksql 中与其他的存储做数据的传输连接的时候,是需要有独特的连接器的,mysql redis es hbase kudu ,不同的存储他们自己使用的协议与操作都不一样,所以需要相应的连接器来连接,这个帖子主要讲一下怎么实现自定义的flink sql connector ,不只局限于kudu ,其他的连接器都是这个原理

原理

其实原理跟从网上下载的 mysql连接器一样,打包编译,添加好pom文件,sql解析时,会根据程序中配置的connector 来做判断是那种解析器,然后与pom中引入的解析器做匹配。

那么具体要如何开发引入一个connector呢?

简单来说需要三个东西
sink类实例 : KuduDynamicTableSink
工厂类: KuduDynamicTableFactory
以及一个配置文件:org.apache.flink.table.factories.Factory

其实主要利用了java的SPI原理,用户需要在工程的resources中新建一个文件
在这里插入图片描述
这里放什么呢,放的是 用户开发的 工厂类的地址
com.datacenter.connectors.kudu.table.KuduDynamicTableFactory
原因就是,SPI会从这个文件中找到工厂类,然后由工厂类来构造出sink实例供sql解析出的对象使用

实现

KuduDynamicTableSink:

package com.datacenter.streaming.sql.connectors.kudu.table;import com.datacenter.streaming.sql.connectors.kudu.KuduOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduOutputFormat;
import com.datacenter.streaming.sql.connectors.kudu.KuduSinkOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.types.RowKind;import java.util.List;import static org.apache.flink.util.Preconditions.checkState;/*** KuduDynamicTableSink* @author loveyou*/
public class KuduDynamicTableSink implements DynamicTableSink {private final KuduOptions kuduOptions;private final KuduSinkOptions kuduSinkOptions;private TableSchema physicalSchema;private int bufferFlushInterval;private int maxRetries;private List<String> keyFields;public KuduDynamicTableSink(KuduOptions kuduOptions, KuduSinkOptions kuduSinkOptions, TableSchema physicalSchema) {this.kuduOptions = kuduOptions;this.kuduSinkOptions = kuduSinkOptions;this.physicalSchema = physicalSchema;UniqueConstraint uniqueConstraint = physicalSchema.getPrimaryKey().orElse(null);if (uniqueConstraint != null) {this.keyFields = uniqueConstraint.getColumns();}this.bufferFlushInterval = (int) kuduSinkOptions.getBatchIntervalMs();this.maxRetries = kuduSinkOptions.getMaxRetries();}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode requestedMode) {validatePrimaryKey(requestedMode);return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_AFTER).build();}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {KuduOutputFormat kuduOutputFormat = new KuduOutputFormat(kuduOptions.getMaster(),kuduOptions.getTable(),physicalSchema.getFieldNames(),physicalSchema.getFieldDataTypes(),bufferFlushInterval,maxRetries);return OutputFormatProvider.of(kuduOutputFormat);}@Overridepublic DynamicTableSink copy() {return new KuduDynamicTableSink(kuduOptions,kuduSinkOptions,physicalSchema);}@Overridepublic String asSummaryString() {return null;}private void validatePrimaryKey(ChangelogMode requestedMode) {checkState(ChangelogMode.insertOnly().equals(requestedMode) || keyFields == null,"please declare primary key for sink table when query contains update/delete record.");}
}
package com.datacenter.streaming.sql.connectors.kudu.table;import com.datacenter.streaming.sql.connectors.kudu.KuduLookupOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduSinkOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;/*** KuduDynamicTableFactory* @author loveyou*/
public class KuduDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {// common optionspublic static final String IDENTIFIER = "kudu";public static final ConfigOption<String> MASTER = ConfigOptions.key("master").stringType().noDefaultValue().withDescription("the kudu master address.");public static final ConfigOption<String> TABLE = ConfigOptions.key("table").stringType().noDefaultValue().withDescription("the jdbc table name.");public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name.");public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password.");// lookup optionsprivate static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows").longType().defaultValue(-1L).withDescription("the max number of rows of lookup cache, over this value, the oldest rows will " +"be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " +"specified. Cache is not enabled as default.");private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions.key("lookup.cache.ttl").durationType().defaultValue(Duration.ofSeconds(-1)).withDescription("the cache time to live.");private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries").intType().defaultValue(3).withDescription("the max retry times if lookup database failed.");// write options//private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions//        .key("sink.buffer-flush.max-rows")//        .intType()//        .defaultValue(100)//        .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +//                " of records, will flush data. The default value is 100.");private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key("sink.buffer-flush.interval").durationType().defaultValue(Duration.ofSeconds(1)).withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +"default value is 1s.");private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries").intType().defaultValue(3).withDescription("the max retry times if writing records to database failed.");/*** DynamicTableSource 实例** @param context* @return*/@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);final ReadableConfig config = helper.getOptions();helper.validate();validateConfigOptions(config);TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());return new KuduDynamicTableSource(getKuduOptions(helper.getOptions()),getKuduLookupOptions(helper.getOptions()),physicalSchema);}/*** DynamicTableSink 实例** @param context* @return*/@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);final ReadableConfig config = helper.getOptions();helper.validate();validateConfigOptions(config);TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());return new KuduDynamicTableSink(getKuduOptions(config),getKuduSinkOptions(config),physicalSchema);}private KuduOptions getKuduOptions(ReadableConfig readableConfig) {KuduOptions.KuduOptionsBuilder builder = KuduOptions.builder().master(readableConfig.get(MASTER)).table(readableConfig.get(TABLE));readableConfig.getOptional(USERNAME).ifPresent(builder::username);readableConfig.getOptional(PASSWORD).ifPresent(builder::password);return builder.build();}private KuduLookupOptions getKuduLookupOptions(ReadableConfig readableConfig) {KuduLookupOptions.KuduLookupOptionsBuilder builder = KuduLookupOptions.builder();builder.cacheMaxSize(readableConfig.get(LOOKUP_CACHE_MAX_ROWS));builder.cacheExpireMs(readableConfig.get(LOOKUP_CACHE_TTL).toMillis());builder.maxRetryTimes(readableConfig.get(LOOKUP_MAX_RETRIES));return builder.build();}private KuduSinkOptions getKuduSinkOptions(ReadableConfig config) {KuduSinkOptions.KuduSinkOptionsBuilder builder = KuduSinkOptions.builder();//builder.batchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));builder.batchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());builder.maxRetries(config.get(SINK_MAX_RETRIES));return builder.build();}/*** 工厂唯一标识符** @return*/@Overridepublic String factoryIdentifier() {return IDENTIFIER;}/*** 必选项** @return*/@Overridepublic Set<ConfigOption<?>> requiredOptions() {Set<ConfigOption<?>> requiredOptions = new HashSet<>();requiredOptions.add(MASTER);requiredOptions.add(TABLE);return requiredOptions;}/*** 可选项** @return*/@Overridepublic Set<ConfigOption<?>> optionalOptions() {Set<ConfigOption<?>> optionalOptions = new HashSet<>();optionalOptions.add(USERNAME);optionalOptions.add(PASSWORD);optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);optionalOptions.add(LOOKUP_CACHE_TTL);optionalOptions.add(LOOKUP_MAX_RETRIES);//optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);optionalOptions.add(SINK_MAX_RETRIES);return optionalOptions;}/*** 验证配置** @param config*/private void validateConfigOptions(ReadableConfig config) {checkAllOrNone(config, new ConfigOption[]{USERNAME,PASSWORD});checkAllOrNone(config, new ConfigOption[]{LOOKUP_CACHE_MAX_ROWS,LOOKUP_CACHE_TTL});Preconditions.checkArgument(config.get(SINK_BUFFER_FLUSH_INTERVAL).compareTo(Duration.ofSeconds(1)) >= 0,SINK_BUFFER_FLUSH_INTERVAL.key() + " must >= 1000");}/*** 要么一个都没有,要么都要有** @param config* @param configOptions*/private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions) {int presentCount = 0;for (ConfigOption configOption : configOptions) {if (config.getOptional(configOption).isPresent()) {presentCount++;}}String[] propertyNames = Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);Preconditions.checkArgument(configOptions.length == presentCount || presentCount == 0,"Either all or none of the following options should be provided:\n" + String.join("\n", propertyNames));}
}

看代码中
调用工厂类,由工厂类实现的DynamicTableSinkFactory接口汇总的create方法,来返回一个KuduDynamicTableSink 方法实例。

完成后 可以把这个代码片段拷贝进你的java项目中 ,在 pom中 添加

<modules><module>connector-kudu</module>
</modules>

或者直接打包成jar 用 idea 自带的 package 直接 打包
在这里插入图片描述

,将jar拷贝进你的本地仓库,像引入mysql connector一样的方式来引入文件

所有代码都在我的git上,需要的同学可以自取,如果找不到可以私信我

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/491054.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(详细使用指南)Linux下交叉编译带ffmpeg的opencv并移植到RK3588等ARM端

一 问题背景 瑞芯微RK3588等嵌入式板作为边缘端设备为算法模型的部署提供了便利&#xff0c;目前很多分类或好检测模型针对边缘端做了优化或量化&#xff0c;使得在边缘端也能达到实时稳定的识别和检测效果。 但嵌入式设备普遍的flash emmc不大&#xff0c;一般在32G左…

git之分支管理

一.理解分支 我们看下面这张图片&#xff1a; 在版本回退⾥&#xff0c;你已经知道&#xff0c;每次提交&#xff0c;Git都把它们串成⼀条时间线&#xff0c;这条时间线就可以理解为是⼀个分⽀。截⽌到⽬前&#xff0c;只有⼀条时间线&#xff0c;在Git⾥&#xff0c;这个分⽀…

kuka示教器嵌套UR界面操作ros中rviz的UR机器人

摘要 本例展示了用QT增加一个网页视图&#xff0c;背景是kuka示教器界面&#xff0c;中间增加UR的VNC网页界面显示。本人博客中一起有写过ros2运行UR的操作。 ros2 UR10仿真包运行_基于ros的ur仿真-CSDN博客 效果如下&#xff1a; 1.打开UR机器人的ros2仿真文件 sudo su ros2…

32RTCBKP

目录 一.时间戳 二.BKP简介 三&#xff0e;RTC外设简介 ​编辑四&#xff0e;相关寄存器 五.相关函数 六.代码实现 (1)读写备份寄存器 (2)实时时钟 一.时间戳 最早在Unix系统上使用 Linux,Windos,安卓的底层计时系统 好处&#xff1a; 1.简化硬件电路&#xff0c…

c++的类型转换方法

一、静态类型转换&#xff08;static_cast&#xff09; 静态类型的转换主要用于基本类型之间的转换&#xff0c;比如int类型转换为double类型。但是static_cast也可以支持上下行的转换&#xff08;存在继承关系之间的转换&#xff09; 基本类型之间的转换举例 上下行转换的举…

《Docker 简易速速上手小册》第8章 Docker 在企业中的应用(2024 最新版)

文章目录 8.1 Docker 在开发环境中的应用8.1.1 重点基础知识8.1.2 重点案例&#xff1a;Python Web 应用开发环境8.1.3 拓展案例 1&#xff1a;Python 数据分析环境8.1.4 拓展案例 2&#xff1a;Python 自动化测试环境 8.2 Docker 在生产环境的实践8.2.1 重点基础知识8.2.2 重点…

174基于matlab的雷达数字信号处理

基于matlab的雷达数字信号处理。该程序具备对雷达目标回波的处理能力&#xff0c;能够从噪声中将目标检测出来&#xff0c;并提取目标的距离、速度、角度信息。有相应的试验文档。程序已调通&#xff0c;可直接运行。 174 雷达数字信号处理 目标检测出来 (xiaohongshu.com)

STM32单片机基本原理与应用(八)

温度传感器实验 实验内容&#xff1a; 单片机通过代码模拟1-Wire总线并对DS18B20进行读写&#xff0c;并在TFTLCD屏幕上显示当前实时温度。 电路原理图&#xff1a; 1-Wire总线 1-Wire总线&#xff1a;即单总线协议&#xff0c;采用单根信号线&#xff0c;既传输时钟&#…

高等数学(无穷小与无穷大)

目录 一、无穷小 二、无穷大 三、无穷小与无穷大的关系 四、无穷小量的阶的比较 一、无穷小 二、无穷大 三、无穷小与无穷大的关系 四、无穷小量的阶的比较

直接写就行!EI顶刊组合:多能源微网/综合能源系统两阶段鲁棒优化配置方法代码!

适用平台&#xff1a;MatlabYalmipCplex 参考文献&#xff1a; 《考虑机组禁止运行区间的含风电鲁棒机组组合》-中国电机工程学报 《微电网两阶段鲁棒优化经济调度方法》-中国电机工程学报 程序提出了微电网中电源容量的两阶段鲁棒优化配置模型&#xff0c;第一阶段主要决策…

Swift Combine 使用 handleEvents 操作符调试管道 从入门到精通二十五

Combine 系列 Swift Combine 从入门到精通一Swift Combine 发布者订阅者操作者 从入门到精通二Swift Combine 管道 从入门到精通三Swift Combine 发布者publisher的生命周期 从入门到精通四Swift Combine 操作符operations和Subjects发布者的生命周期 从入门到精通五Swift Com…

LemonSqueezy

信息收集 # nmap -sn 192.168.1.0/24 -oN live.nmap Starting Nmap 7.94 ( https://nmap.org ) at 2024-02-08 11:22 CST Nmap scan report for 192.168.1.1 Host is up (0.00037s latency). MAC Address: 00:50:56:C0:00:08 (VMware) Nmap scan r…