SeaTunnel扩展Transform插件,自定义转换插件

代码结构

在seatunnel-transforms-v2中新建数据包名,新建XXXTransform,XXXTransformConfig,XXXTransformFactory三个类

自定义转换插件功能说明

这是个适配KafkaSource的转换插件,接收到的原文格式为:

{"path":"xxx.log.gz","code":"011","cont":"{\"ID\":\"1\",\"NAME\":\"zhangsan\",\"TABLE\":\"USER\",\"create_time\":\"20230904\"}","timestamp":"20230823160246"}

需要转换为只保留cont里面的数据

{"create_time":"20230904","NAME":"zhangsan","TABLE":"USER","ID":"999"}

任务配置文件

env {# You can set engine configuration here STREAMING BATCHexecution.parallelism = 1job.mode = "STREAMING"#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"}source {# This is a example source plugin **only for test and demonstrate the feature source plugin**Kafka {bootstrap.servers = "xxxxx:9092"topic = "test_in2"consumer.group = "167321237613format="text"result_table_name="kafka"}}transform {ExtractFromCJ {source_table_name="kafka"result_table_name="kafka1"schema = {fields {NAME = "string"TABLE = "string"create_time = "string"ID="string"}}}}sink {kafka {source_table_name="kafka1"topic = "test_out2"bootstrap.servers = "xxxx:9092"kafka.request.timeout.ms = 60000semantics = EXACTLY_ONCE}}

代码说明

XXXConfig代码,这个类主要用来保存transform的配置项

package org.apache.seatunnel.transform.extract;import lombok.Getter;import lombok.Setter;import org.apache.seatunnel.api.configuration.Option;import org.apache.seatunnel.api.configuration.Options;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import java.io.Serializable;import java.util.Map;@Getter@Setterpublic class ExtractFromCJTransformConfig implements Serializable {public static final Option<Map<String, String>> SCHEMA =Options.key("schema.fields").mapType().noDefaultValue().withDescription("Specify the field mapping relationship between input and output");private Map<String, String> fieldColumns;public static ExtractFromCJTransformConfig of(ReadonlyConfig config) {ExtractFromCJTransformConfig extractFromCJTransformConfig = new ExtractFromCJTransformConfig();Map<String, String> fieldColumns = config.get(SCHEMA);extractFromCJTransformConfig.setFieldColumns(fieldColumns);return extractFromCJTransformConfig;}}

XXXTransformFactory说明,工厂类,主要用来初始化具体的转换类

package org.apache.seatunnel.transform.extract;import com.google.auto.service.AutoService;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import org.apache.seatunnel.api.configuration.util.OptionRule;import org.apache.seatunnel.api.table.catalog.CatalogTable;import org.apache.seatunnel.api.table.connector.TableTransform;import org.apache.seatunnel.api.table.factory.Factory;import org.apache.seatunnel.api.table.factory.TableFactoryContext;import org.apache.seatunnel.api.table.factory.TableTransformFactory;@AutoService(Factory.class)public class ExtractFromCJTransformFactory implements TableTransformFactory {@Overridepublic String factoryIdentifier() {return  "ExtractFromCJ";}@Overridepublic OptionRule optionRule() {return OptionRule.builder().optional(ExtractFromCJTransformConfig.SCHEMA).build();}@Overridepublic TableTransform createTransform(TableFactoryContext context) {CatalogTable catalogTable = context.getCatalogTable();ReadonlyConfig options = context.getOptions();ExtractFromCJTransformConfig extractFromCJTransformConfig =ExtractFromCJTransformConfig.of(options);return () -> new ExtractFromCJTransform(extractFromCJTransformConfig, catalogTable);}}

XXXXTransform,具体的转换类,主要用于对source数据的处理,还有数据结构类型的保存

package org.apache.seatunnel.transform.extract;import cn.hutool.core.collection.CollUtil;import cn.hutool.json.JSONObject;import cn.hutool.json.JSONUtil;import com.google.auto.service.AutoService;import lombok.NoArgsConstructor;import lombok.NonNull;import lombok.extern.slf4j.Slf4j;import org.apache.seatunnel.api.configuration.ReadonlyConfig;import org.apache.seatunnel.api.configuration.util.ConfigValidator;import org.apache.seatunnel.api.table.catalog.CatalogTable;import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;import org.apache.seatunnel.api.table.catalog.Column;import org.apache.seatunnel.api.table.catalog.ConstraintKey;import org.apache.seatunnel.api.table.catalog.PhysicalColumn;import org.apache.seatunnel.api.table.catalog.PrimaryKey;import org.apache.seatunnel.api.table.catalog.TableIdentifier;import org.apache.seatunnel.api.table.catalog.TableSchema;import org.apache.seatunnel.api.table.type.SeaTunnelDataType;import org.apache.seatunnel.api.table.type.SeaTunnelRow;import org.apache.seatunnel.api.table.type.SeaTunnelRowType;import org.apache.seatunnel.api.transform.SeaTunnelTransform;import org.apache.seatunnel.shade.com.typesafe.config.Config;import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;import java.util.ArrayList;import java.util.List;import java.util.stream.Collectors;@AutoService(SeaTunnelTransform.class)@NoArgsConstructor@Slf4jpublic class ExtractFromCJTransform extends AbstractCatalogSupportTransform {private ExtractFromCJTransformConfig config;protected SeaTunnelRowType inputRowType;@Overridepublic String getPluginName() {return "ExtractFromCJ";}public ExtractFromCJTransform(@NonNull ExtractFromCJTransformConfig config, @NonNull CatalogTable catalogTable) {super(catalogTable);this.config = config;}@Overrideprotected void setConfig(Config pluginConfig) {ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig)).validate(new ExtractFromCJTransformFactory().optionRule());this.config = ExtractFromCJTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));}@Overrideprotected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {return inputRowType;}@Overrideprotected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {Object content = inputRow.getFields()[0];String data = content.toString();Object[] outputDataArray = new Object[0];if (JSONUtil.isJson(data)) {JSONObject cont = JSONUtil.parseObj(data).getJSONObject("cont");if (!cont.isEmpty()) {if (!CollUtil.isEmpty(this.config.getFieldColumns())) {outputDataArray = new Object[this.config.getFieldColumns().size()];int t = 0;for (String key : this.config.getFieldColumns().keySet()) {String value = cont.getStr(key);outputDataArray[t] = value;t++;}} else {outputDataArray = new Object[1];outputDataArray[0] = JSONUtil.toJsonStr(cont);}}}SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);outputRow.setRowKind(inputRow.getRowKind());outputRow.setTableId(inputRow.getTableId());return outputRow;}@Overrideprotected TableSchema transformTableSchema() {List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();List<ConstraintKey> outputConstraintKeys =inputCatalogTable.getTableSchema().getConstraintKeys().stream().map(ConstraintKey::copy).collect(Collectors.toList());PrimaryKey copiedPrimaryKey =inputCatalogTable.getTableSchema().getPrimaryKey() == null? null: inputCatalogTable.getTableSchema().getPrimaryKey().copy();if (CollUtil.isEmpty(this.config.getFieldColumns())) {return TableSchema.builder().primaryKey(copiedPrimaryKey).columns(inputColumns).constraintKey(outputConstraintKeys).build();} else {List<Column> transformColumns = new ArrayList<>();for (String key : this.config.getFieldColumns().keySet()) {SeaTunnelDataType<?> dataType = CatalogTableUtil.parseDataType(this.config.getFieldColumns().get(key));transformColumns.add(PhysicalColumn.of(key, dataType, 0, true, null, null));}return TableSchema.builder().primaryKey(copiedPrimaryKey).columns(transformColumns).constraintKey(outputConstraintKeys).build();}}@Overrideprotected TableIdentifier transformTableIdentifier() {return inputCatalogTable.getTableId().copy();}}

文中的转换实现的是AbstractCatalogSupportTransform类,Seatunel还提供SingleFieldOutputTransform和MultipleFieldOutputTransform,分别对应单字段和多字段的数据处理,具体扩展可根据需求来实现对应的类

执行结果

来源消息

结果消息

以上就是对转换插件的扩展分享,有需求的小伙伴可以参考,也欢迎大家一起评论沟通~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/97924.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

本地缓存、Redis数据缓存策略

目录 需求看似简单&#xff0c;一取一传但是&#xff0c;又出现了一个新的问题&#xff0c;数据丢了。 一、缓存缓存有哪些分类&#xff1a; 二、分析一下本地缓存的优势三、本地缓存解决方案&#xff1f;1、基于Guava Cache实现本地缓存2、基于Caffeine实现本地缓存3、基于Enc…

猫头虎博主赠书二期:《Go黑帽子 渗透测试编程之道(安全技术经典译丛) 》

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

Apache实现weblogic集群配置

安装apache&#xff0c;安装相对稳定的版本。如果安装后测试能否正常启动&#xff0c;可以通过访问http://localhost/进行测试。安装Weblogic&#xff0c;参见文档将bea安装目录 weblogic81/server/bin 下的 mod_wl_20.so 文件copy到 apache安装目录下Apache2/modules/目录下A…

PCIe 配置空间:Command 寄存器

在 type 0 header 中,command 寄存器的位置如下图所示: 在 type 1 header 中,command 寄存器的位置如下图所示: Command 寄存器的结构如下图: 对于 PCIe,只有 Bit 0/1/2/6/8/10 是有效的,其他必须配置为 0 。 IO Space Enable 该位用于控制设别如何响应 I/O 空间的访…

OpenCV(二十):图像卷积

1.图像卷积原理 图像卷积是一种在图像上应用卷积核的操作。卷积核是一个小的窗口矩阵&#xff0c;它通过在图像上滑动并与图像的像素进行逐元素相乘&#xff0c;然后求和来计算新图像中每个像素的值。通过滑动卷积核并在图像上进行逐像素运算&#xff0c;可以实现一系列图像处理…

uniapp中UView中 u-form表单在v-for循环下如何进行表单校验

1、数据data格式 注&#xff1a;rule绑定的tableFromRule中要和表单tableFrom下面放置一个同名数组&#xff0c;确保u-form能找到 tableFrom: {tableData: [//数据详情列表]},tableFromRule: {//校验tableData: [//数据详情列表]},formRules:{localation:[{required: true,mes…

OSCS 安全周报第 58 期:VMware Aria Operations SSH 身份验证绕过漏洞 (CVE-2023-34039)

​ 本周安全态势综述 OSCS 社区共收录安全漏洞 3 个&#xff0c;公开漏洞值得关注的是 VMware Aria Operations SSH 身份验证绕过漏洞( CVE-2023-34039 )、Apache Airflow Spark Provider 反序列化漏洞( CVE-2023-40195 )。 针对 NPM 仓库&#xff0c;共监测到 324 个不同版本…

Android ChatCPT集成

准备工作 ChatGPT账号(openai) 集成好网络框架(默认使用Retrofit) 接入 选择modele 这里使用的是 「https://api.openai.com/v1/chat/completions」 创建API Keys 运行效果 POST https://api.openai.com/v1/chat/completions Content-Type: application/json Content-Length:…

Pandas DataFrame 数据存储格式比较

Pandas 支持多种存储格式&#xff0c;在本文中将对不同类型存储格式下的Pandas Dataframe的读取速度、写入速度和大小的进行测试对比。 创建测试Dataframe 首先创建一个包含不同类型数据的测试Pandas Dataframe。 import pandas as pdimport randomimport stringimport numpy …

华为Mate 60系列安装谷歌服务框架,安装Play商店,Google

华为Mate 60 Pro悄悄的上架。但是却震撼市场的强势登场,Mate 60系列默认搭载的就是鸿蒙4.0。那么mate 60加上4.0是否可以安装谷歌服务框架呢&#xff1f;本机到手经过测试是可以安装的&#xff0c;但是在解决play非保护机制认证还通知这个问题上,他和鸿蒙3.0是不一样的。如果我…

U盾难管理?用U盾专用USB集线器

公司有一堆U盾要插着用&#xff0c;但是一台电脑也才两三个接口&#xff0c;怎么办&#xff1f; 三个字&#xff0c;很简单&#xff0c; 一台U盾专用的USB集线器就能解决。 U盾专用集线器为解决网银U盾连接问题而生。 它有四大好处&#xff01; 集中管理 把所有U盾集中到一…

Opencv 图像金字塔----高斯和拉普拉斯

原文&#xff1a;图像金字塔----高斯和拉普拉斯 图像金字塔是图像中多尺度表达的一种&#xff0c;最初用于机器视觉和图像压缩&#xff0c;最主要用于图像的分割、融合。 高斯金字塔 ( Gaussian pyramid): 高斯金字塔是由底部的最大分辨率图像逐次向下采样得到的一系列图像…