PiflowX组件-WriteToUpsertKafka

WriteToUpsertKafka组件

组件说明

以upsert方式往Kafka topic中写数据。

计算引擎

flink

有界性

Streaming Upsert Mode

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:
"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。
"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
sink_parallelismSinkParallelism“”定义upsert-kafka sink算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。
sink_buffer_flush_max_rowsSinkBufferFlushMaxRows“”缓存刷新前,最多能缓存多少条记录。当sink收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此sink缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。 可以通过设置为 ‘0’ 来禁用它默认,该选项是未开启的。注意,如果要开启sink缓存,需要同时设置 ‘sink.buffer-flush.max-rows’ 和 'sink.buffer-flush.interval两个选项为大于零的值。
sink_buffer_flush_intervalSinkBufferFlushInterval“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
propertiesPROPERTIES“”Kafka source连接器其他配置

WriteToUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{"flow": {"name": "UpsertKafkaTest","uuid": "1234","stops": [{"uuid": "0000","name": "JsonStringParser1","bundle": "cn.piflow.bundle.flink.json.JsonStringParser","properties": {"content": "[{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:32:24\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1201\",\"access_time\":\"2021-01-08 11:32:55\",\"dt\":\"2021-01-08\"},{\"user_id\":\"2\",\"client_ip\":\"192.165.12.1\",\"client_info\":\"pc\",\"page_code\":\"1031\",\"access_time\":\"2021-01-08 11:32:59\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1101\",\"access_time\":\"2021-01-08 11:33:24\",\"dt\":\"2021-01-08\"},{\"user_id\":\"3\",\"client_ip\":\"192.168.10.3\",\"client_info\":\"pc\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:33:30\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:34:24\",\"dt\":\"2021-01-08\"}]","schema": "user_id:STRING,client_ip:STRING,client_info:STRING,page_code:STRING,access_time:TIMESTAMP,dt:STRING"}},{"uuid": "1111","name": "WriteToKafka1","bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka","properties": {"kafka_host": "hadoop01:9092","topic": "user_ip_pv","tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":null,\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"user_id\",\"columnType\":\"STRING\",\"comment\":\"用户ID\"},{\"columnName\":\"client_ip\",\"columnType\":\"STRING\",\"comment\":\"客户端IP\"},{\"columnName\":\"client_info\",\"columnType\":\"STRING\",\"comment\":\"设备机型信息\"},{\"columnName\":\"page_code\",\"columnType\":\"STRING\",\"comment\":\"页面代码\"},{\"columnName\":\"access_time\",\"columnType\":\"TIMESTAMP\",\"comment\":\"请求时间\"},{\"columnName\":\"dt\",\"columnType\":\"STRING\",\"comment\":\"时间分区天\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null}","format": "json","properties": "{\"json.ignore-parse-errors\":\"true\"}"}},{"uuid": "2222","name": "ReadFromKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka","properties": {"kafka_host": "hadoop01:9092","topic": "user_ip_pv","group": "test","startup_mode": "earliest-offset","tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":\"source_ods_fact_user_ip_pv\",\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"user_id\",\"columnType\":\"STRING\",\"comment\":\"用户ID\"},{\"columnName\":\"client_ip\",\"columnType\":\"STRING\",\"comment\":\"客户端IP\"},{\"columnName\":\"client_info\",\"columnType\":\"STRING\",\"comment\":\"设备机型信息\"},{\"columnName\":\"page_code\",\"columnType\":\"STRING\",\"comment\":\"页面代码\"},{\"columnName\":\"access_time\",\"columnType\":\"TIMESTAMP\",\"comment\":\"请求时间\"},{\"columnName\":\"dt\",\"columnType\":\"STRING\",\"comment\":\"时间分区天\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null}","format": "json","properties": "{}"}},{"uuid": "3333","name": "SQLExecute1","bundle": "cn.piflow.bundle.flink.common.SQLExecute","properties": {"sql": "CREATE VIEW view_total_pv_uv_min AS SELECT dt AS do_date, count(client_ip) AS pv, count(DISTINCT client_ip) AS uv,max(access_time) AS access_time FROM source_ods_fact_user_ip_pv GROUP BY dt;"}},{"uuid": "4444","name": "WriteToUpsertKafka1","bundle": "cn.piflow.bundle.flink.kafka.WriteToUpsertKafka","properties": {"kafka_host": "hadoop01:9092","topic": "result_total_pv_uv_min","key_format": "json","value_format": "json","value_fields_include": "ALL","tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":\"result_total_pv_uv_min\",\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null,\"asSelectStatement\":\"SELECT  do_date,cast(DATE_FORMAT(access_time,'HH:mm') AS STRING) AS do_min,pv,uv,NOW() AS currenttime from view_total_pv_uv_min\"}","properties": "{\"value.json.fail-on-missing-field\": false}"}}],"paths": [{"from": "JsonStringParser1","outport": "","inport": "","to": "WriteToKafka1"},{"from": "WriteToKafka1","outport": "","inport": "","to": "ReadFromKafka1"},{"from": "ReadFromKafka1","outport": "","inport": "","to": "SQLExecute1"},{"from": "SQLExecute1","outport": "","inport": "","to": "WriteToUpsertKafka1"}]}
}
示例说明
  1. 通过JsonStringParser将给定的json字符串解析,并输出到下游,通过WriteToKafka组件将数据写入到kafka的user_ip_pv topic中;

  2. 通过ReadFromKafka组件从user_ip_pv topic中读取数据;

  3. 使用SQLExecute组件执行创建视图view_total_pv_uv_min的语句;

  4. 使用WriteToUpsertKafka定义upsert kafka table,并使用tableDefinition属性中定义的asSelectStatement执行语句,将结果写入kafka。

tableDefinition属性结构
{"catalogName": null,"dbname": null,"tableName": "result_total_pv_uv_min","ifNotExists": true,"physicalColumnDefinition": [{"columnName": "do_date","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计日期"},{"columnName": "do_min","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计分钟"},{"columnName": "pv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "点击量"},{"columnName": "uv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "一天内同个访客多次访问仅计算一个UV"},{"columnName": "currenttime","columnType": "TIMESTAMP","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "当前时间"}],"metadataColumnDefinition": null,"computedColumnDefinition": null,"watermarkDefinition": null,"asSelectStatement": "SELECT  do_date,cast(DATE_FORMAT(access_time,'HH:mm') AS STRING) AS do_min,pv,uv,NOW() AS currenttime from view_total_pv_uv_min"
}

演示DEMO
在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/313028.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java集合/泛型篇----第六篇

系列文章目录 文章目录 系列文章目录前言一、HashTable(线程安全)二、TreeMap(可排序)三、LinkHashMap(记录插入顺序)四、泛型类前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去…

chrome浏览器记录不住网站登录状态,退出后再打开就需要重新登陆的解决办法

chrome浏览器记录不住网站登录状态,退出后再打开就需要重新登陆,比较繁琐。 解决办法: 1、chrome浏览器右上角三个竖的点,然后进入“设置”(Settings),选择“隐私与安全”(Privacy…

I.MX8QM flexcan移植

Android SDK:imx8_13.0.0_1.2.0(android 13 u-boot 2022.04 kernel 5.15.74) 一、kernel 内核配置: # 相应的defconfig中添加使能下面两个宏。 # 官方默认的配置可能是以模块的方式编译,这里直接将can驱动编译到内核中 CONFIG_CANy CONFIG…

FingerprintService启动-Android13

FingerprintService启动-Android13 1、指纹服务启动1.1 rc启动Binder对接指纹厂商TA库1.2 FingerprintService启动1.2.1 SystemServer启动FingerprintService1.2.2 注册Binder服务fingerprint 2、获取底层信息2.1 AIDL 对接TA中获取2.2 指纹类型判断 android13-release 1、指纹…

SpringBoot2.7.12整合Knife4j

SpringBoot2.7.12整合Knife4j 是什么 Knife4j是一个集Swagger2 和 OpenAPI3为一体的增强解决方案 添加依赖 <!--引入Knife4j的官方start包,该指南选择Spring Boot版本<3.0,开发者需要注意--> <dependency><groupId>com.github.xiaoymin</groupId>&l…

Intellij建议用String替换StringBuilder

文章目录 前言String 和 StringBuilder 性能对比String 和 StringBuilder 使用的字节码对比总结 本文收发地址 https://blog.csdn.net/CSqingchen/article/details/135324313 最新更新地址 https://gitee.com/chenjim/chenjimblog 前言 最近编码时看到 Intellij 建议使用 Stri…

[附代码]稳态视觉诱发电位SSVEP之预训练模型提高性能

SSVEP 之深度学习 深度学习已经被广泛运用在脑电信号分析来提高脑机接口的性能,这是一个end-to-end的方法,简单来说,只要搭建好深度学习网络,做好特征工程,然后分类即可,对于一个刚刚接触脑机接口领域深度学习的学习者来说,可以先忽略中间的数学相关的东西,先建一个网…

Linux常用命令大全总结及讲解(超详细版)

前言&#xff1a; Linux 是一个基于Linux 内核的开源类Unix 操作系统&#xff0c;Linus Torvalds于 1991 年 9 月 17 日首次发布的操作系统内核。Linux 通常打包为Linux 发行版。 Linux 最初是为基于Intel x86架构的个人计算机开发的&#xff0c;但此后被移植到的平台比任何其…

【大数据面试知识点】分区器Partitioner:HashPartitioner、RangePartitioner

Spark HashParitioner的弊端是什么&#xff1f; HashPartitioner分区的原理很简单&#xff0c;对于给定的key&#xff0c;计算其hashCode&#xff0c;并除于分区的个数取余&#xff0c;如果余数小于0&#xff0c;则用余数分区的个数&#xff0c;最后返回的值就是这个key所属的…

【NLP论文】02 TF-IDF 关键词权值计算

之前写了一篇关于关键词词库构建的文章&#xff0c;没想到反响还不错&#xff0c;最近有空把接下来的两篇补完&#xff0c;也继续使用物流关键词词库举例&#xff0c;本篇文章承接关键词词库构建并以其为基础&#xff0c;将计算各关键词的 TF-IDF 权值&#xff0c;TF-IDF 权值主…

D45D46|动态规划之子序列问题

300.最长递增子序列&#xff1a; 初始思路&#xff1a; 动态规划五部曲&#xff1a; 1&#xff09;dp数组的定义&#xff0c;dp[i]表述数组第i个元素大于前面几个值&#xff1b; 2&#xff09;dp数组的迭代&#xff0c;min nums[x]表示递增数组中的最后一个值&#xff0c;如…

【python_数据分组】

对excel按照标签进行分组&#xff0c;例如按照“开票主体和对方公司”进行分组&#xff0c;并获取对应的明细。 表格如下&#xff1a; def main(excel_data):result {}for d in excel_data:if str(d[0])str(d[1]) in result:result[str(d[0])str(d[1])].append([d[0],d[1],…