【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、环境或版本说明
  • 三、自定义数据源kafka(Flink 1.13.6 版本)
    • 1、maven依赖
    • 2、实现
    • 3、验证
  • 三、自定义数据源kafka(Flink 1.17.0 版本)
    • 1、maven依赖
    • 2、实现
    • 3、验证


本文主要介绍Flink 的kafka作为数据源的使用,并给出了flink不同版本对kafka作为数据源的不同实现示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文依赖kafka的环境是好用的。

本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

一、maven依赖

本文依赖见【flink番外篇】3、flink的source介绍及示例(1)- File、Socket、Collection,不再赘述。

如果有新增的maven依赖,则会在示例时加以说明,避免篇幅的过大。

二、环境或版本说明

1、该示例需要有kafka的运行环境,kafka的部署与使用参考文章:
1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试

2、Flink关于kafka的使用在不同的版本中有不同的实现,最直观的的变化是由FlinkKafkaConsumer换成了KafkaSource,同理sink也有相应的由FlinkKafkaProducer换成了KafkaSink。

3、由于使用kafka涉及的内容较多,请参考文章:
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版

4、本文会提供关于kafka 作为source的2个版本,即1.13.6和1.17的版本。

5、以下属性在构建 KafkaSource 时是必须指定的:

  • Bootstrap server,通过 setBootstrapServers(String) 方法配置
  • 消费者组 ID,通过 setGroupId(String) 配置
  • 要订阅的 Topic / Partition
  • 用于解析 Kafka 消息的反序列化器(Deserializer)

三、自定义数据源kafka(Flink 1.13.6 版本)

Kafka Source 提供了构建类来创建 FlinkKafkaConsumer 的实例。

以下代码片段展示了如何构建 FlinkKafkaConsumer 来消费 “alan_kafkasource” 最早位点的数据, 使用消费组 “flink_kafka”,并且将 Kafka 消息体反序列化为字符串

1、maven依赖

	<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- flink连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies>

2、实现

package org.datastreamapi.source.custom.kafka;import java.util.Properties;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;/*** @author alanchan**/
public class TestCustomKafkaSourceDemo {public static void main(String[] args) throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 source// 准备kafka连接参数Properties props = new Properties();// 集群地址props.setProperty("bootstrap.servers", "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092");// 消费者组idprops.setProperty("group.id", "flink_kafka");// latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费// earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费props.setProperty("auto.offset.reset", "latest");// 会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测props.setProperty("flink.partition-discovery.interval-millis", "5000");// 自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)props.setProperty("enable.auto.commit", "true");// 自动提交的时间间隔props.setProperty("auto.commit.interval.ms", "2000");// 使用连接参数创建FlinkKafkaConsumer/kafkaSourceFlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("alan_kafkasource", new SimpleStringSchema(), props);// 使用kafkaSourceDataStream<String> kafkaDS = env.addSource(kafkaSource);// 3、 transformation// 4、 sinkkafkaDS.print();// 5、executeenv.execute();}
}

3、验证

1、创建kafka主题alan_kafkasource,kafka命令发送数据

[alanchan@server2 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic alan_kafkasource --partitions 1 --replication-factor 1[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_kafkasource
>alan,18
>alanchan,19
>alanchan,20

2、启动应用程序,并观察控制台输出
在这里插入图片描述

三、自定义数据源kafka(Flink 1.17.0 版本)

Kafka Source 提供了构建类来创建 KafkaSource 的实例。

以下代码片段展示了如何构建 KafkaSource 来消费 “alan_kafkasource” 最早位点的数据, 使用消费组 “flink_kafka”,并且将 Kafka 消息体反序列化为字符串

1、maven依赖

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- flink连接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency></dependencies>

2、实现

为了避免误解,1.13.6版本与1.17.0版本实现不同的地方KafkaSource和FlinkKafkaConsumer的不同,相关属性值是一样的,只是本例中没有将1.13.5的中的所有属性都列出来。

package org.datastreamapi.source.custom.kafka;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;/*** @author alanchan**/
public class TestCustomKafkaSourceDemo {public static void main(String[] args) throws Exception {// 1、envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、 sourceKafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092").setTopics("alan_kafkasource").setGroupId("flink_kafka").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 3、 transformation// 4、 sinkkafkaDS.print();// 5、executeenv.execute();}
}

3、验证

1、创建kafka主题alan_kafkasource,kafka命令发送数据

[alanchan@server2 bin]$ kafka-topics.sh --create --bootstrap-server server1:9092 --topic alan_kafkasource --partitions 1 --replication-factor 1[alanchan@server2 bin]$ kafka-console-producer.sh --broker-list 192.168.10.41:9092 --topic alan_kafkasource
>alan,18
>alanchan,19
>alanchan,20

2、启动应用程序,并观察控制台输出
在这里插入图片描述

以上,本文主要介绍Flink 的kafka作为数据源的使用,并给出了flink不同版本对kafka作为数据源的不同实现示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为以下几篇文章:
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(1) - File、Socket、Collection
【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(3)- kafka
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(4)- redis -异步读取
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(5)- clickhouse
【flink番外篇】3、flink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例 - 完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/268279.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql,树形结构表中,查询所有末节点数据(叶子结点)

需求&#xff1a;在一个可以存放多级目录的表中&#xff0c;查询出某个课程目录下所有末节点&#xff08;因为只有末节点可以挂载资源&#xff09; 例如下图&#xff1a; 其中 1.11.2.12.1 都是末节点&#xff0c;因为他们已经没有下一级了 catalog表中重要字段有&#xff1a;c…

学了4年C++后,我转向了Python

C 已经学不动了&#xff0c;现在换 Python 还来得及吗&#xff1f;一位四年工作经验的 C 程序员亲述转型历程&#xff0c;这不仅仅是语言上的转变&#xff0c;而是代码思维甚至工作环境的转变。 通常&#xff0c;程序员会认为 Python 编码比较简单&#xff0c;即便是在校学生也…

面试 JVM 八股文五问五答第二期

面试 JVM 八股文五问五答第二期 作者&#xff1a;程序员小白条&#xff0c;个人博客 相信看了本文后&#xff0c;对你的面试是有一定帮助的&#xff01; ⭐点赞⭐收藏⭐不迷路&#xff01;⭐ 1.JVM运行时数据区有几部分?&#xff08;JVM内存布局&#xff09;虚拟机栈和本地方…

MySQL数据恢复之binlog2sql的安装和使用,很详细

MySQL数据恢复之binlog2sql的安装和使用&#xff0c;很详细 一、前言二、binlog2sql的介绍三、安装binlog2sql1、安装git&#xff08;已安装可以跳过&#xff09;&#xff08;1&#xff09;、正常安装&#xff08;2&#xff09;、编译安装报错①、安装libcurl &#xff08;2&am…

Makefile语法

一、Makefile规则格式 Makefile 里面是由一系列的规则组成的&#xff0c;这些规则格式如下&#xff1a; 目标…... : 依赖文件集合…… 命令 1 命令 2 ……参考上一节gcc编译器与Makefile入门参考这条规则 1 main: main.o input.o calcu.o2 gcc -o main main.o input.o c…

19-数据结构-查找-散列查找

目录 一、散列查找结构思路图 二、哈希函数 三、解决冲突 1.开放地址法 1.1.线性探测法&#xff08;线性探测再散列法&#xff09; 1.2.平方探测法&#xff08;二次探测再散列&#xff09; 1.3.再散列法&#xff08;双散列法&#xff09; 2.拉链法 2.1简介 四、散列查…

蓝海彤翔元宇宙技术赋能非遗苏绣文化传播

12月8日&#xff0c;第十五届中国刺绣文化艺术节在苏州高新区正式开幕。本次大会旨在激活非遗传承高质量发展内生动力&#xff0c;以民间工艺发展助力乡村振兴&#xff0c;以传统文化创造性转化、创新性发展引领刺绣工艺文化推陈出新。 在项目启动环节&#xff0c;蓝海彤翔“元…

【hcie-cloud】【8】华为云Stack_LLD设计【部署设计、资源设计、服务设计、学习推荐、缩略语】【下】

设计概览、整体架构设计、网络设计 看下面-这篇文章 【hcie-cloud】【7】华为云Stack_LLD设计【设计概览、整体架构设计、网络设计、部署设计、资源设计、服务设计】【上】 部署设计 云平台整体部署架构 图中在Region下每个灰底都代表一个数据中心&#xff0c;AZ1可以跨数据…

LED 底层原理 和 GPIO引脚、寄存器操作

目录 LED 原理 LED 的驱动方式 普适的 GPIO 引脚操作方法 GPIO 寄存器操作 LED 原理 当我们学习 C 语言的时候&#xff0c;我们会写个 Hello 程序。 那当我们写 ARM 程序&#xff0c;也该有一个简单的程序引领我们入门&#xff0c;这个程序就是点亮 LED。 我们怎样去点亮…

书-选择排序法P156

#include<stdio.h> int main(){int b[5]{8,2,6,3,7};int i , j ,k ;for(i0;i<4;i){for(ji1;j<5;j)if(b[i]<b[j]){kb[i];b[i]b[j];b[j]k;} }for(i0;i<5;i)printf("%d ",b[i]); return 0; }选择排序&#xff1a;就是自己跟下一个比较&#xff0c;然后…

BH1750光照传感器——STM32驱动

———————实验效果——————— &#x1f384;硬件外观 &#x1f384; 接线 &#x1f388; VCC接 3.3V &#x1f388; GND接 GND &#x1f388; SCL接 PB2 &#x1f388; SDA接PB3 &#x1f388; ADDR 悬空不接 &#x1f384; 代码获取 &#x1f388; 查看下方 —…

gdb本地调试版本移植至ARM-Linux系统

移植ncurses库 本文使用的ncurses版本为ncurses-5.9.tar.gz 下载地址&#xff1a;https://ftp.gnu.org/gnu/ncurses/ncurses-5.9.tar.gz 1. 将ncurses压缩包拷贝至Linux主机或使用wget命令下载并解压 tar-zxvf ncurses-5.9.tar.gz 2. 解压后进入到ncurses-5.9目录…