PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件

组件说明

upsert方式从Kafka topic中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分反序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分反序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
propertiesPROPERTIES“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

ReadFromUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{"flow": {"name": "ReadFromUpsertKafkaTest","uuid": "1234","stops": [{"uuid": "5555","name": "ReadFromUpsertKafka1","bundle": "cn.piflow.bundle.flink.kafka.ReadFromUpsertKafka","properties": {"kafka_host": "hadoop01:9092","topic": "result_total_pv_uv_min","key_format": "json","value_format": "json","value_fields_include": "ALL","tableDefinition": "{\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null}","properties": "{\"value.json.fail-on-missing-field\": false,\"properties.group.id\": \"test\"}"}},{"uuid": "6666","name": "ShowChangeLogData1","bundle": "cn.piflow.bundle.flink.common.ShowChangeLogData","properties": {"showNumber": "5000"}}],"paths": [{"from": "ReadFromUpsertKafka1","outport": "","inport": "","to": "ShowChangeLogData1"}]}
}
示例说明
  1. 通过k.kafka.ReadFromUps从kafka的result_total_pv_uv_min topic中读取数据(使用WriteToUpsertKafka组件写入到result_total_pv_uv_min中的数据);

  2. 通过ShowChangeLogData组件将数据输出到控制台。

tableDefinition属性结构
{"ifNotExists": true,"physicalColumnDefinition": [{"columnName": "do_date","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计日期"}, {"columnName": "do_min","columnType": "STRING","nullable": false,"primaryKey": true,"partitionKey": false,"comment": "统计分钟"}, {"columnName": "pv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "点击量"}, {"columnName": "uv","columnType": "BIGINT","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "一天内同个访客多次访问仅计算一个UV"}, {"columnName": "currenttime","columnType": "TIMESTAMP","nullable": false,"primaryKey": false,"partitionKey": false,"comment": "当前时间"}],"metadataColumnDefinition": null,"computedColumnDefinition": null
}

演示DEMO

在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/314318.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023-12-11 LeetCode每日一题(最小体力消耗路径)

2023-12-11每日一题 一、题目编号 1631. 最小体力消耗路径二、题目链接 点击跳转到题目位置 三、题目描述 你准备参加一场远足活动。给你一个二维 rows x columns 的地图 heights ,其中 heights[row][col] 表示格子 (row, col) 的高度。一开始你在最左上角的格…

【电商项目实战】商品详情显示与Redis存储购物车信息

🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是Java方文山,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的专栏《电商项目实战》。🎯🎯 &am…

JAR文件如何在没有安装JDK的电脑上运行(指定运行环境)

一、JAR包是什么? 首先,我们来了解一下JAR(Java Archive)包。JAR包是一种文件格式,用于将Java类、资源和元数据打包到一个文件中。它通常用于将Java库、应用程序或模块分发给其他开发人员或部署到不同的环境中。JAR包可…

使用flutter开发一个简单的轮播图带指示器的组件

使用PageView开发一个带指示器的轮播图组件,当轮播图切换的时候,指示器也会跟着切换,切换到当前轮播图所在的索引时,指示器的背景色会变成蓝色,否则是灰色。使用了一个curIndex变量来记录当前激活的轮播图索引。并使用…

高通平台开发系列讲解(驱动篇)如何修改UART节点名字

平台内核版本高通平台Linux4.14文章目录 一、背景二、分析过程三、解决方案一、背景 /dev/ttyMSM1强行改成/dev/ttyMSM2 以适配应用: 二、分析过程 解决思路:Uart对应的驱动源码位于kernel/msm-4.9/drivers/tty/serial/msm_serial.c,在msm_serial_probe函数的最后通过uart…

Go语言实战:如何使用Timeout Context优雅地取消任务

Go语言实战:如何使用Timeout Context优雅地取消任务 引言Go语言和并发编程简介什么是ContextTimeout Context的原理实战演示最佳实践和注意事项总结 引言 在现代软件开发中,尤其是在处理高并发系统时,正确地管理和取消正在进行的任务成为一项…

iToF人脸识别

iToF(间接飞行时间)是一种测量光飞行时间的技术,主要应用于人脸识别。 iToF人脸识别技术在哪些场景下会用到 iToF人脸识别技术可以应用于许多场景,以下是一些常见的应用场景: 平安城市:在城市监控系统中,iToF人脸识别技术可以用于实时监控、目标检测和识别,以及异常行为…

UDP协议基本原理

前言 本文主要讲解传输层中的UDP协议,我准备从UDP的特点出发,深入理解UDP协议,从UDP协议的结构推出UDP协议的特点; 一、理解端口号 前面我们总是说用IP加端口号的方式定位全网的唯一进程,通常在TCP/IP中,我…

系统学习Python——装饰器:函数装饰器-[对方法进行装饰:基础知识]

分类目录:《系统学习Python》总目录 我们在前面的文章中编写了第一个基于类的tracer函数装饰器的时候,我们简单地假设它也应该适用于任何方法一一一被装饰的方法应该同样地工作,并且自带的self实例参数应该直接包含在*args的前面。但这一假设…

在多Module项目中,给IDEA底部选项卡区域添加Services选项卡

一般一个spring cloud项目中大大小小存在几个十几个module编写具体的微服务项目。此时,如果要调试测需要依次启动各个项目比较麻烦。 idea其实提供了各module的启动管理工具了,可以快速启动和关闭各个服务,也能批量操作,比如一次…

简单FTP客户端软件开发——VMware安装Linux虚拟机(命令行版)

VMware安装包和Linux系统镜像: 链接:https://pan.baidu.com/s/1UwF4DT8hNXp_cV0NpSfTww?pwdxnoh 提取码:xnoh 这个学期做计网课程设计【简单FTP客户端软件开发】需要在Linux上配置 ftp服务器,故此用VMware安装了Linux虚拟机&…

普中STM32-PZ6806L开发板(USART2 串口 + HI-LINK-V20离线语音模块控制LED灯)

简介 买了HI-LINK-V20型号的离线语音识别模块, 为了后面可以做有意思的东西, 现在先来用用, 使用USART2 串口 接收来自我在HI-LINK-V20中预设的动作, 当识别到词条时发送对应的指令到串口, HI-LINK串口接的就是STM32F03ZET6的USART2, 且往下看。 电路原理图 连线图 连线引脚表…