SeaTunnel扩展Source插件,自定义connector-webservice

代码结构

在seatunnel-connectors-v2中新建connector-webservice模块,可以直接赋值connector-http-base模块,webservice和http的方式比较类似,有些类直接复制了http中的代码。

核心类有WebserviceConfig,WebserviceParameter,WebserviceSource,WebserviceSourceReader

配置文件

env {# You can set engine configuration here STREAMING BATCHexecution.parallelism = 1job.mode = "BATCH"#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}source {# This is a example source plugin **only for test and demonstrate the feature source plugin**Webservice {url = "http://www.xxx.com.cn/xxx/WeatherWebService.asmx?wsdl"method = "getSupportCity"namespaceUri = "http://xxx.com.cn/"params = {"byProvinceName"="xxx"}result_table_name="table_3"}
}
transform {Sql {source_table_name = "table_3"result_table_name = "table_4"query = "select content  as fname from table_3"}
}
sink {Jdbc {removeDatabase="true"code="target"_compsName="sss"description=""mapType="1"source_table_name="table_4"writeMode="0"type="5"database="xxx"password="xxx"driver="com.mysql.cj.jdbc.Driver"url="jdbc:mysql://192.168.xxx:3306/xxx_test"pluginName="Jdbc"datasource="197"emptyType="2"user="xxx"table="xxx"generate_sink_sql="true"}
}

代码说明

WebserviceConfig
package org.apache.seatunnel.connectors.seatunnel.webservice.config;import lombok.Data;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;import java.io.Serializable;
import java.util.Map;@Data
public class WebserviceConfig  implements Serializable {public static final boolean DEFAULT_ENABLE_MULTI_LINES = false;public static final Option<String> FORMAT =Options.key("format").stringType().defaultValue("JSON").withDescription("Http response format");public static final Option<String> URL =Options.key("url").stringType().noDefaultValue().withDescription("Webservice request url");public static final Option<String> METHOD =Options.key("method").stringType().noDefaultValue().withDescription("Webservice request method");public static final Option<String> NAMESPACE_URI =Options.key("namespaceUri").stringType().noDefaultValue().withDescription("Webservice request namespaceUri");public static final Option<Map<String, String>> PARAMS =Options.key("params").mapType().noDefaultValue().withDescription("Webservice request params");}

WebserviceParameter

package org.apache.seatunnel.connectors.seatunnel.webservice.config;import lombok.Data;import java.io.Serializable;
import java.util.Map;
import java.util.stream.Collectors;import org.apache.seatunnel.shade.com.typesafe.config.Config;
@Data
public class WebserviceParameter implements Serializable {protected String url;protected String method;protected String namespaceUri;protected Map<String, String> params;protected String body;public void buildWithConfig(Config pluginConfig) {this.setUrl(pluginConfig.getString(WebserviceConfig.URL.key()));this.setMethod(pluginConfig.getString(WebserviceConfig.METHOD.key()));this.setNamespaceUri(pluginConfig.getString(WebserviceConfig.NAMESPACE_URI.key()));if (pluginConfig.hasPath(WebserviceConfig.PARAMS.key())) {this.setParams(pluginConfig.getConfig(WebserviceConfig.PARAMS.key()).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,entry -> String.valueOf(entry.getValue().unwrapped()),(v1, v2) -> v2)));}}}

DeserializationCollector

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.seatunnel.connectors.seatunnel.webservice.source;import lombok.AllArgsConstructor;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;import java.io.IOException;@AllArgsConstructor
public class DeserializationCollector {private DeserializationSchema<SeaTunnelRow> deserializationSchema;public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {if (deserializationSchema instanceof JsonDeserializationSchema) {((JsonDeserializationSchema) deserializationSchema).collect(message, out);} else {SeaTunnelRow deserialize = deserializationSchema.deserialize(message);out.collect(deserialize);}}
}

SimpleTextDeserializationSchema

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.seatunnel.connectors.seatunnel.webservice.source;import lombok.AllArgsConstructor;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;@AllArgsConstructor
public class SimpleTextDeserializationSchema implements DeserializationSchema<SeaTunnelRow> {private SeaTunnelRowType rowType;@Overridepublic SeaTunnelRow deserialize(byte[] message) {return new SeaTunnelRow(new Object[] {new String(message)});}@Overridepublic SeaTunnelDataType<SeaTunnelRow> getProducedType() {return rowType;}
}

WebserviceSource
package org.apache.seatunnel.connectors.seatunnel.webservice.source;import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.webservice.config.WebserviceConfig;
import org.apache.seatunnel.connectors.seatunnel.webservice.config.WebserviceParameter;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;@AutoService(SeaTunnelSource.class)
public class WebserviceSource extends AbstractSingleSplitSource<SeaTunnelRow> {protected final WebserviceParameter webserviceParameter = new WebserviceParameter();protected SeaTunnelRowType rowType;protected JobContext jobContext;protected String contentField;protected DeserializationSchema<SeaTunnelRow> deserializationSchema;@Overridepublic String getPluginName() {return "Webservice";}@Overridepublic void prepare(Config pluginConfig) throws PrepareFailException {CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, WebserviceConfig.URL.key());if (!result.isSuccess()) {throw new RuntimeException(String.format("PluginName: %s, PluginType: %s, Message: %s",getPluginName(), PluginType.SOURCE, result.getMsg()));}this.webserviceParameter.buildWithConfig(pluginConfig);buildSchemaWithConfig(pluginConfig);}protected void buildSchemaWithConfig(Config pluginConfig) {if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {this.rowType = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();// default use json formatString format = WebserviceConfig.FORMAT.defaultValue();if (pluginConfig.hasPath(WebserviceConfig.FORMAT.key())) {format = pluginConfig.getString(WebserviceConfig.FORMAT.key());}switch (format.toLowerCase()) {case "json":this.deserializationSchema =new JsonDeserializationSchema(false, false, rowType);break;default:// TODO: use format SPIthrow new RuntimeException(String.format("Unsupported data format [%s], http connector only support json format now",format));}} else {this.rowType = CatalogTableUtil.buildSimpleTextSchema();this.deserializationSchema = new SimpleTextDeserializationSchema(this.rowType);}}@Overridepublic void setJobContext(JobContext jobContext) {this.jobContext = jobContext;}@Overridepublic Boundedness getBoundedness() {return Boundedness.BOUNDED;}@Overridepublic SeaTunnelDataType<SeaTunnelRow> getProducedType() {return this.rowType;}@Overridepublic AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {return new WebserviceSourceReader(webserviceParameter, readerContext,deserializationSchema,contentField);}
}

WebserviceSourceReader
package org.apache.seatunnel.connectors.seatunnel.webservice.source;import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.template.Template;
import cn.hutool.http.webservice.SoapClient;
import cn.hutool.http.webservice.SoapProtocol;
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ReadContext;
import org.apache.seatunnel.connectors.seatunnel.webservice.config.WebserviceParameter;import java.io.IOException;
import java.util.HashMap;@Slf4j
public class WebserviceSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {protected final SingleSplitReaderContext context;private static final Option[] DEFAULT_OPTIONS = {Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, Option.DEFAULT_PATH_LEAF_TO_NULL};private final String contentJson;private final Configuration jsonConfiguration =Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);protected final WebserviceParameter webserviceParameter;private final DeserializationCollector deserializationCollector;public WebserviceSourceReader(WebserviceParameter webserviceParameter,SingleSplitReaderContext context,DeserializationSchema<SeaTunnelRow> deserializationSchema,String contentJson) {this.webserviceParameter = webserviceParameter;this.context = context;this.contentJson = contentJson;this.deserializationCollector = new DeserializationCollector(deserializationSchema);}@Overridepublic void open() throws Exception {log.info("WebserviceSourceReader open");}@Overridepublic void close() throws IOException {log.info("WebserviceSourceReader close");}@Overridepublic void pollNext(Collector<SeaTunnelRow> output) throws Exception {try {SoapClient client = SoapClient.create(webserviceParameter.getUrl()).setMethod(webserviceParameter.getMethod(), webserviceParameter.getNamespaceUri());for (String key : webserviceParameter.getParams().keySet()) {String param = webserviceParameter.getParams().get(key);client = client.setParam(key, param);}String result = client.send(false);
//        deserializationCollector.collect(result.getBytes(), output);SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{getSoapBody(result)});output.collect(seaTunnelRow);log.info("WebserviceSourceReader pollNext");} catch (Exception e) {e.printStackTrace();} finally {if (Boundedness.BOUNDED.equals(context.getBoundedness())) {// signal to the source that we have reached the end of the data.log.info("Closed the bounded http source");context.signalNoMoreElement();}}}public String getSoapBody(String xml) {if (xml.indexOf("<soap:Body>") != -1) {return StrUtil.subBetween(xml, "<soap:Body>", "</soap:Body>");} else {return StrUtil.subBetween(xml, "<soap12:Body>", "</soap12:Body>");}}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--Licensed to the Apache Software Foundation (ASF) under one or morecontributor license agreements.  See the NOTICE file distributed withthis work for additional information regarding copyright ownership.The ASF licenses this file to You under the Apache License, Version 2.0(the "License"); you may not use this file except in compliance withthe License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License.-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.apache.seatunnel</groupId><artifactId>seatunnel-connectors-v2</artifactId><version>2.3.3-SNAPSHOT</version></parent><artifactId>connector-webservice</artifactId><name>SeaTunnel : Connectors V2 : Webservice</name><properties><rabbitmq.version>5.9.0</rabbitmq.version><json-path.version>2.7.0</json-path.version></properties><dependencies><dependency><groupId>org.apache.seatunnel</groupId><artifactId>connector-common</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.apache.seatunnel</groupId><artifactId>seatunnel-format-json</artifactId><version>${project.version}</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.22</version></dependency><dependency><groupId>com.jayway.jsonpath</groupId><artifactId>json-path</artifactId><version>${json-path.version}</version></dependency></dependencies><scm><tag>2.3.2</tag></scm>
</project>

执行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/238092.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WPF Mvvm模式下面如何将事件映射到ViewModel层

前言 平常用惯了Command绑定,都快忘记传统的基于事件编程模式了,但是Commond模式里面有个明显的问题,就是你无法获取到事件源的参数。很多大聪明肯定会说,这还不简单,通过自己写控件,给控件加个自定义属性不就行了,想要啥事件就写啥事件进去,完全自主可控。但是对于写…

列表插槽使用

{label: 是否展示,prop: isShow,solt: true, }<!--自定义列--><template slot-scope"scope" slot"isShow"><div style"color: red;cursor: pointer" focus"getIsShow(scope.row)" ><el-switch v-model"sco…

良品铺子“降价不降质”:利他主义,零食新成长逻辑

最近&#xff0c;男大学生组团穿军大衣&#xff0c;女大学生集体穿花棉袄&#xff0c;火遍全网。 相当一批大学生发现&#xff0c;军大衣、花棉袄在保暖上不输羽绒服&#xff0c;而且价格还便宜。这股风潮背后&#xff0c;其实映射出当下年轻人在消费上变得愈发&#xff1a; …

C# 动态编译代码并执行

写在前面 本文采用动态编译的方式&#xff0c;对目标文件code.txt中的C#代码进行实时编译并调用&#xff1b;当然也可以直接在代码中直接装配或读取已有的代码文本&#xff0c;动态编译可以用于很多需要热更新的场景&#xff0c;实现无需重启程序也能达到更新代码的需求。 代…

算法题--排椅子(贪心)

题目链接 code #include<bits/stdc.h> using namespace std;struct node{int indx;//用来存储数组下标int cnt;//用来计数 };bool cmp(node a,node b){ //判断是否是数字最大的一个就是经过最多谈话人的道return a.cnt>b.cnt; } node row[2010],cow[2010];bool cmp…

MatrixOne Meetup回顾 | 深圳站

11月11日&#xff0c;MatrixOne 社区在深圳成功举办了第二次 MatrixOne Meetup。活动当天&#xff0c;数十位外部小伙伴到场参与&#xff0c;一同分享云原生数据库相关知识内容。此次活动&#xff0c;我们也邀请了来自深圳素问智能的外部讲师&#xff0c;分享了目前火爆的大模型…

什么是死锁?如何产生死锁?死锁的必要条件?怎么解决死锁?

🔒1、什么是死锁 死锁是一个非常让程序猿烦恼的问题,一旦所写的程序有了死锁,那么程序就无法执行下去,会出现严重的 bug,并且死锁非常隐蔽,我们不会轻易发现它,在开发阶段,不经意期间我们就会写出死锁,很难检测出来。 那么什么是死锁呢?竟然让我们如此烦恼。 “死…

为何要隐藏IP地址?网络上哪些行为需要隐藏IP和更换IP?

网络已经成为现代人生活的重要组成部分&#xff0c;人们在网络上交流、学习、娱乐、购物等。但是&#xff0c;在享受网络带来的便利时&#xff0c;我们也需要时刻保护自己的隐私和安全。其中&#xff0c;IP地址作为网络通信中的重要标识&#xff0c;如何隐藏以及在哪些情况下需…

数据中台具体是怎么解决数据孤岛的?_光点科技

在数字化时代&#xff0c;数据已成为企业的核心资产。然而&#xff0c;由于历史遗留问题、部门壁垒等因素&#xff0c;很多企业面临着“数据孤岛”的问题。数据孤岛是指在一个组织内&#xff0c;数据被分散在不同的系统中&#xff0c;彼此隔离&#xff0c;不能有效整合和利用。…

建设银行RPA应用实践

当下&#xff0c;银行业正在从“互联网金融”时代向“新科技金融”时代迈进&#xff0c;在目前经济形势严峻、人力成本持续增加的经营背景下&#xff0c;以科技解放人力将是智能化银行发展的必然趋势。RPA技术为解决上述问题提供了崭新的路径。 RPA&#xff08;机器人流程自动…

软件开发的生命周期:从构想到维护

目录 需求分析阶段 设计阶段 实现阶段 测试阶段 部署阶段 维护阶段 结语 软件开发是一项复杂而又精密的工程&#xff0c;它的整个过程被称为软件开发生命周期。这一生命周期涵盖了从最初构想到最终维护的各个阶段&#xff0c;每个阶段都有其独特的任务和活动。在本文中&…

与原有视频会议系统对接

要实现与原有视频会议系统对接&#xff0c;需要确保通信协议的一致性。连通宝视频会议系统可与第三方视频会议系统对接。实现与第三方会议系统对接还可以使用会议室连接器&#xff0c;可以确保不同系统之间的数据传输和交互。 具体对接流程可能因不同品牌和类型的视频会议系统而…