[flink 实时流基础]源算子和转换算子

文章目录

    • 1. 源算子 Source
        • 1. 从集合读
        • 2. 从文件读取
        • 3. 从 socket 读取
        • 4. 从 kafka 读取
        • 5. 从数据生成器读取数据
    • 2. 转换算子
        • 基本转换算子(map/ filter/ flatMap)


1. 源算子 Source

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
image.png
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));// 2. 直接填元素DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);source.print();env.execute();}
2. 从文件读取
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency>
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/world.txt")).build();env.fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource").print();env.execute();}
3. 从 socket 读取
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 7777);source.print();env.execute();}

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setTopics("topic_1").setGroupId("atguigu").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");stream.print("Kafka");env.execute();}
5. 从数据生成器读取数据
		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency>
 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}}, 10, // 自动生成的数字序列RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条Types.STRING // 返回类型);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();env.execute();}

2. 转换算子

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
image.png

基本转换算子(map/ filter/ flatMap)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
image.png
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
image.png
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
:::info
消费一个元素,可以产生0到多个元素。
:::
flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/578012.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git——分布式版本控制工具详解

一、了解Git基本概念 &#xff08;o&#xff09;git github gitee的区别 &#xff08;一&#xff09;开发中的实际场景 &#xff08;二&#xff09;版本控制器的方式 1、集中式版本控制工具 集中式版本控制工具&#xff0c;版本库是集中存放在中央服务器的&#xff0c;team里…

如何检查电脑的最近历史记录?这里提供详细步骤

如果你怀疑有人在使用你的计算机,并且你想查看他们在做什么,下面是如何查看是否有访问内容的痕迹。 如何检查我的计算机的最近历史记录 要检查计算机的最近历史记录,应该从web浏览器历史记录开始,然后移动到文件。但是,可以修改或删除浏览器历史记录,也可以隐藏Windows…

Qt 图片预览(等比例显示、放大、缩小、平移图像)显示

使用Qt的Painter绘制图像并显示&#xff0c;根据窗口的大小计算图片显示的尺寸&#xff0c;并可以对图片放大和缩小的显示&#xff0c;还可以对已经放大了的图片进行平移预览的操作。 效果如下&#xff1a; 使用函数 painter.translate() 对画布进行平移操作。使用函数 painte…

Linux 系统快速安装 MySQL数据库(新手版)

Linux 系统快速安装 MySQL数据库&#xff08;新手版&#xff09; 1.删除原有的mariadb&#xff0c;不然mysql装不进去 查询MAriaDB命令 rpm -qa|grep mariadb 删除 rpm -e --nodeps mariadb-libs-5.5.60-1.el7_5.x86_64 &#xff08;yum -y remove mysql 如需要…

python爬虫之selenium4使用(万字讲解)

文章目录 一、前言二、selenium的介绍1、优点&#xff1a;2、缺点&#xff1a; 三、selenium环境搭建1、安装python模块2、selenium4新特性3、安装驱动WebDriver驱动选择驱动安装和测试 基础操作1、属性和方法2、单个元素定位通过id定位通过class_name定位一个元素通过xpath定位…

CTK插件框架学习-插件注册调用(03)

CTK插件框架学习-新建插件(02)https://mp.csdn.net/mp_blog/creation/editor/136923735 一、CTK插件组成 接口类&#xff1a;对外暴露的接口&#xff0c;供其他插件调用实现类&#xff1a;实现接口内的方法激活类&#xff1a;负责将插件注册到CTK框架中 二、接口、插件、服务…

spring-boot之接口文档Swagger配置使用

Swagger 前后端分离 Vue SpringBoot 后端时代:前端只用管理静态页面; html> 后端。模板引擎JSP >后端是主力 前后端分离式时代: ●后端:后端控制层&#xff0c;服务层,数据访问层[后端团队] ●前端:前端控制层&#xff0c;视图层[前端团队] 。伪造后端数据&#xff0c;…

List操作add,clear,addall报错UnsupportedOperationException的解决办法

ArrayList和Arrays.ArrayList是两码事 ArrayList 支持 add&#xff0c;clear&#xff0c;addall Arrays.ArrayList不支持add&#xff0c;clear&#xff0c;addall 这个方法的使用时候&#xff0c;传递的数组必须是对象数组&#xff0c;而不是基本数据类型 JDK源码 /** *返回由…

SAMRTFORMS 转换PDF 发送邮件

最终成果&#xff1a; *&---------------------------------------------------------------------**& Report ZLC_FIND_EXIT*&---------------------------------------------------------------------**&根据T-CODE / 程序名查询出口、BADI增强*&-------…

钉钉 AI 升级多种功能;智谱AI PC智能助手发布;百度回应与苹果合作

▶ 钉钉 AI 升级上线多种功能 3 月 28 日&#xff0c;钉钉 AI 助理升级。升级后上线了图片理解、文档速读、工作流等产品能力&#xff0c;率先探索多模态、长文本与 RPA 技术在 AI 应用的落地。 基于阿里通义千问大模型&#xff0c;升级后的钉钉 AI 助理可以做到&#xff1a; …

【昇腾系列产品应用】英码科技EA500I边缘计算盒子接口使用示例和目标检测算法演示(附视频)

EA500I是英码科技联合华为昇腾精心打造的AI边缘计算盒子&#xff0c;其搭载昇腾310系列处理器&#xff0c;可提供20TOPS INT8 的计算能力&#xff0c;并设计了丰富的外围接口&#xff0c;包括Type-C系统调试口、LINE音频接口、USB3.0*2、千兆LAN*8、WAN*1、5G/4G、GNSS天线口、…

每天五分钟深度学习:使用神经网络完成人脸的特征点检测

本文重点 我们上一节课程中学习了如何利用神经网络对图片中的对象进行定位,也就是通过输出四个参数值bx、by、bℎ和bw给出图片中对象的边界框。 本节课程我们学习特征点的检测,神经网络可以通过输出图片中对象的特征点的(x,y)坐标来实现对目标特征的识别,我们看几个例子。…