Flink Sql 自定义实现 kudu connector
- 原理
- 实现
众所周知啊,flinksql 中与其他的存储做数据的传输连接的时候,是需要有独特的连接器的,mysql redis es hbase kudu ,不同的存储他们自己使用的协议与操作都不一样,所以需要相应的连接器来连接,这个帖子主要讲一下怎么实现自定义的flink sql connector ,不只局限于kudu ,其他的连接器都是这个原理
原理
其实原理跟从网上下载的 mysql连接器一样,打包编译,添加好pom文件,sql解析时,会根据程序中配置的connector 来做判断是那种解析器,然后与pom中引入的解析器做匹配。
那么具体要如何开发引入一个connector呢?
简单来说需要三个东西
sink类实例 : KuduDynamicTableSink
工厂类: KuduDynamicTableFactory
以及一个配置文件:org.apache.flink.table.factories.Factory
其实主要利用了java的SPI原理,用户需要在工程的resources中新建一个文件
这里放什么呢,放的是 用户开发的 工厂类的地址
com.datacenter.connectors.kudu.table.KuduDynamicTableFactory
原因就是,SPI会从这个文件中找到工厂类,然后由工厂类来构造出sink实例供sql解析出的对象使用
实现
KuduDynamicTableSink:
package com.datacenter.streaming.sql.connectors.kudu.table;import com.datacenter.streaming.sql.connectors.kudu.KuduOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduOutputFormat;
import com.datacenter.streaming.sql.connectors.kudu.KuduSinkOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.types.RowKind;import java.util.List;import static org.apache.flink.util.Preconditions.checkState;/*** KuduDynamicTableSink* @author loveyou*/
public class KuduDynamicTableSink implements DynamicTableSink {private final KuduOptions kuduOptions;private final KuduSinkOptions kuduSinkOptions;private TableSchema physicalSchema;private int bufferFlushInterval;private int maxRetries;private List<String> keyFields;public KuduDynamicTableSink(KuduOptions kuduOptions, KuduSinkOptions kuduSinkOptions, TableSchema physicalSchema) {this.kuduOptions = kuduOptions;this.kuduSinkOptions = kuduSinkOptions;this.physicalSchema = physicalSchema;UniqueConstraint uniqueConstraint = physicalSchema.getPrimaryKey().orElse(null);if (uniqueConstraint != null) {this.keyFields = uniqueConstraint.getColumns();}this.bufferFlushInterval = (int) kuduSinkOptions.getBatchIntervalMs();this.maxRetries = kuduSinkOptions.getMaxRetries();}@Overridepublic ChangelogMode getChangelogMode(ChangelogMode requestedMode) {validatePrimaryKey(requestedMode);return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_AFTER).build();}@Overridepublic SinkRuntimeProvider getSinkRuntimeProvider(Context context) {KuduOutputFormat kuduOutputFormat = new KuduOutputFormat(kuduOptions.getMaster(),kuduOptions.getTable(),physicalSchema.getFieldNames(),physicalSchema.getFieldDataTypes(),bufferFlushInterval,maxRetries);return OutputFormatProvider.of(kuduOutputFormat);}@Overridepublic DynamicTableSink copy() {return new KuduDynamicTableSink(kuduOptions,kuduSinkOptions,physicalSchema);}@Overridepublic String asSummaryString() {return null;}private void validatePrimaryKey(ChangelogMode requestedMode) {checkState(ChangelogMode.insertOnly().equals(requestedMode) || keyFields == null,"please declare primary key for sink table when query contains update/delete record.");}
}
package com.datacenter.streaming.sql.connectors.kudu.table;import com.datacenter.streaming.sql.connectors.kudu.KuduLookupOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduSinkOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;/*** KuduDynamicTableFactory* @author loveyou*/
public class KuduDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {// common optionspublic static final String IDENTIFIER = "kudu";public static final ConfigOption<String> MASTER = ConfigOptions.key("master").stringType().noDefaultValue().withDescription("the kudu master address.");public static final ConfigOption<String> TABLE = ConfigOptions.key("table").stringType().noDefaultValue().withDescription("the jdbc table name.");public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name.");public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password.");// lookup optionsprivate static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows").longType().defaultValue(-1L).withDescription("the max number of rows of lookup cache, over this value, the oldest rows will " +"be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " +"specified. Cache is not enabled as default.");private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions.key("lookup.cache.ttl").durationType().defaultValue(Duration.ofSeconds(-1)).withDescription("the cache time to live.");private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries").intType().defaultValue(3).withDescription("the max retry times if lookup database failed.");// write options//private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions// .key("sink.buffer-flush.max-rows")// .intType()// .defaultValue(100)// .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +// " of records, will flush data. The default value is 100.");private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key("sink.buffer-flush.interval").durationType().defaultValue(Duration.ofSeconds(1)).withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +"default value is 1s.");private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions.key("sink.max-retries").intType().defaultValue(3).withDescription("the max retry times if writing records to database failed.");/*** DynamicTableSource 实例** @param context* @return*/@Overridepublic DynamicTableSource createDynamicTableSource(Context context) {final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);final ReadableConfig config = helper.getOptions();helper.validate();validateConfigOptions(config);TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());return new KuduDynamicTableSource(getKuduOptions(helper.getOptions()),getKuduLookupOptions(helper.getOptions()),physicalSchema);}/*** DynamicTableSink 实例** @param context* @return*/@Overridepublic DynamicTableSink createDynamicTableSink(Context context) {final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);final ReadableConfig config = helper.getOptions();helper.validate();validateConfigOptions(config);TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());return new KuduDynamicTableSink(getKuduOptions(config),getKuduSinkOptions(config),physicalSchema);}private KuduOptions getKuduOptions(ReadableConfig readableConfig) {KuduOptions.KuduOptionsBuilder builder = KuduOptions.builder().master(readableConfig.get(MASTER)).table(readableConfig.get(TABLE));readableConfig.getOptional(USERNAME).ifPresent(builder::username);readableConfig.getOptional(PASSWORD).ifPresent(builder::password);return builder.build();}private KuduLookupOptions getKuduLookupOptions(ReadableConfig readableConfig) {KuduLookupOptions.KuduLookupOptionsBuilder builder = KuduLookupOptions.builder();builder.cacheMaxSize(readableConfig.get(LOOKUP_CACHE_MAX_ROWS));builder.cacheExpireMs(readableConfig.get(LOOKUP_CACHE_TTL).toMillis());builder.maxRetryTimes(readableConfig.get(LOOKUP_MAX_RETRIES));return builder.build();}private KuduSinkOptions getKuduSinkOptions(ReadableConfig config) {KuduSinkOptions.KuduSinkOptionsBuilder builder = KuduSinkOptions.builder();//builder.batchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));builder.batchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());builder.maxRetries(config.get(SINK_MAX_RETRIES));return builder.build();}/*** 工厂唯一标识符** @return*/@Overridepublic String factoryIdentifier() {return IDENTIFIER;}/*** 必选项** @return*/@Overridepublic Set<ConfigOption<?>> requiredOptions() {Set<ConfigOption<?>> requiredOptions = new HashSet<>();requiredOptions.add(MASTER);requiredOptions.add(TABLE);return requiredOptions;}/*** 可选项** @return*/@Overridepublic Set<ConfigOption<?>> optionalOptions() {Set<ConfigOption<?>> optionalOptions = new HashSet<>();optionalOptions.add(USERNAME);optionalOptions.add(PASSWORD);optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);optionalOptions.add(LOOKUP_CACHE_TTL);optionalOptions.add(LOOKUP_MAX_RETRIES);//optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);optionalOptions.add(SINK_MAX_RETRIES);return optionalOptions;}/*** 验证配置** @param config*/private void validateConfigOptions(ReadableConfig config) {checkAllOrNone(config, new ConfigOption[]{USERNAME,PASSWORD});checkAllOrNone(config, new ConfigOption[]{LOOKUP_CACHE_MAX_ROWS,LOOKUP_CACHE_TTL});Preconditions.checkArgument(config.get(SINK_BUFFER_FLUSH_INTERVAL).compareTo(Duration.ofSeconds(1)) >= 0,SINK_BUFFER_FLUSH_INTERVAL.key() + " must >= 1000");}/*** 要么一个都没有,要么都要有** @param config* @param configOptions*/private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions) {int presentCount = 0;for (ConfigOption configOption : configOptions) {if (config.getOptional(configOption).isPresent()) {presentCount++;}}String[] propertyNames = Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);Preconditions.checkArgument(configOptions.length == presentCount || presentCount == 0,"Either all or none of the following options should be provided:\n" + String.join("\n", propertyNames));}
}
调用工厂类,由工厂类实现的DynamicTableSinkFactory接口汇总的create方法,来返回一个KuduDynamicTableSink 方法实例。
完成后 可以把这个代码片段拷贝进你的java项目中 ,在 pom中 添加
<modules><module>connector-kudu</module>
</modules>
或者直接打包成jar 用 idea 自带的 package 直接 打包
,将jar拷贝进你的本地仓库,像引入mysql connector一样的方式来引入文件
所有代码都在我的git上,需要的同学可以自取,如果找不到可以私信我