尚硅谷大数据Flink1.17实战教程-笔记03【Flink运行时架构】

  • 尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】
  • 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili
  1. 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】
  2. 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】
  3. 尚硅谷大数据Flink1.17实战教程-笔记03【Flink运行时架构】
  4. 尚硅谷大数据Flink1.17实战教程-笔记04【】
  5. 尚硅谷大数据Flink1.17实战教程-笔记05【】
  6. 尚硅谷大数据Flink1.17实战教程-笔记06【】
  7. 尚硅谷大数据Flink1.17实战教程-笔记07【】
  8. 尚硅谷大数据Flink1.17实战教程-笔记08【】

目录

基础篇

第04章-Flink部署

P023【023_Flink运行时架构_系统架构】07:13

P024【024_Flink运行时架构_核心概念_并行度】06:45

P025【025_Flink运行时架构_核心概念_并行度设置&优先级】18:40

P026【026_Flink运行时架构_核心概念_算子链】08:34

P027【027_Flink运行时架构_核心概念_算子链演示】17:11

P028【028_Flink运行时架构_核心概念_任务槽】09:52

P029【029_Flink运行时架构_核心概念_任务槽的共享组】07:59

P030【030_Flink运行时架构_核心概念_slot与并行度的关系&演示】21:27

P031【031_Flink运行时架构_提交流程_Standalone会话模式&四张图】09:49

P032【032_Flink运行时架构_提交流程_Yarn应用模式】05:18


基础篇

第04章-Flink部署

P023【023_Flink运行时架构_系统架构】07:13

Flink运行时架构——Standalone会话模式为例

P024【024_Flink运行时架构_核心概念_并行度】06:45

  • 一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
  • 例如:如上图所示,当前数据流中有source、map、window、sink四个算子,其中sink算子的并行度为1,其他算子的并行度都为2。所以这段流处理程序的并行度就是2。

P025【025_Flink运行时架构_核心概念_并行度设置&优先级】18:40

4.2.1 并行度(Parallelism)

2)并行度的设置

在Flink中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。

1)代码中设置

我们在代码中,可以很简单地在算子后跟着调用setParallelism()方法,来设置当前算子的并行度:

stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);

这种方式设置的并行度,只针对当前算子有效。

另外,我们也可以直接调用执行环境的setParallelism()方法,全局设定并行度:

env.setParallelism(2);

这样代码中所有算子,默认的并行度就都为2了。我们一般不会在程序中设置全局并行度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。

这里要注意的是,由于keyBy不是算子,所以无法对keyBy设置并行度。

2)提交应用时设置

在使用flink run命令提交应用时,可以增加-p参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:

bin/flink run p 2 c com.atguigu.wc.SocketStreamWordCount

./FlinkTutorial-1.0-SNAPSHOT.jar

如果我们直接在Web UI上提交作业,也可以在对应输入框中直接添加并行度。

package com.atguigu.wc;import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** TODO DataStream实现Wordcount:读socket(无界流)** @author* @version 1.0*/
public class WordCountStreamUnboundedDemo {public static void main(String[] args) throws Exception {// TODO 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// IDEA运行时,也可以看到webui,一般用于本地测试// 需要引入一个依赖 flink-runtime-web// 在idea运行,不指定并行度,默认就是 电脑的 线程数// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(3);// TODO 2.读取数据: socketDataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);// TODO 3.处理数据: 切换、转换、分组、聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).setParallelism(2).returns(Types.TUPLE(Types.STRING,Types.INT))// .returns(new TypeHint<Tuple2<String, Integer>>() {}).keyBy(value -> value.f0).sum(1);// TODO 4.输出sum.print();// TODO 5.执行env.execute();}
}/**并行度的优先级:代码:算子 > 代码:env > 提交时指定 > 配置文件*/

并行度优先级:代码:算子 > 代码:全局env > 提交时指定命令 > 配置文件。

P026【026_Flink运行时架构_核心概念_算子链】08:34

4.2.2 算子链(Operator Chain)

2)合并算子链

在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task,这样原来的算子就成为了真正任务里的一部分,如下图所示。每个task会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)。

P027【027_Flink运行时架构_核心概念_算子链演示】17:11

package com.atguigu.wc;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** TODO DataStream实现Wordcount:读socket(无界流)** @author* @version 1.0*/
public class OperatorChainDemo {public static void main(String[] args) throws Exception {// TODO 1.创建执行环境// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// IDEA运行时,也可以看到webui,一般用于本地测试// 需要引入一个依赖 flink-runtime-webStreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());// 在idea运行,不指定并行度,默认就是 电脑的 线程数env.setParallelism(1);// 全局禁用 算子链//env.disableOperatorChaining();// TODO 2.读取数据:socketDataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);// TODO 3.处理数据: 切换、转换、分组、聚合SingleOutputStreamOperator<Tuple2<String,Integer>> sum = socketDS//.disableChaining().flatMap((String value, Collector<String> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(word);}}).startNewChain()//.disableChaining().returns(Types.STRING).map(word -> Tuple2.of(word, 1)).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(value -> value.f0).sum(1);// TODO 4.输出sum.print();// TODO 5.执行env.execute();}
}/**1、算子之间的传输关系:一对一重分区2、算子 串在一起的条件:1) 一对一2) 并行度相同3、关于算子链的api:1)全局禁用算子链:env.disableOperatorChaining();2)某个算子不参与链化:  算子A.disableChaining(),  算子A不会与 前面 和 后面的算子 串在一起3)从某个算子开启新链条:  算子A.startNewChain(), 算子A不与 前面串在一起,从A开始正常链化*/

P028【028_Flink运行时架构_核心概念_任务槽】09:52

4.2.3 任务槽(Task Slots)

P029【029_Flink运行时架构_核心概念_任务槽的共享组】07:59

3)任务对任务槽的共享

默认情况下,Flink是允许子任务共享slot的。如果我们保持sink任务并行度为1不变,而作业提交时设置全局并行度为6,那么前两个任务节点就会各自有6个并行子任务,整个流处理程序则有13个子任务。如上图所示,只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个slot上执行。所以对于第一个任务节点source→map,它的6个并行子任务必须分到不同的slot上,而第二个任务节点keyBy/window/apply的并行子任务却可以和第一个任务节点共享slot。

package com.atguigu.wc;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** TODO DataStream实现Wordcount:读socket(无界流)** @author* @version 1.0*/
public class SlotSharingGroupDemo {public static void main(String[] args) throws Exception {// TODO 1.创建执行环境// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// IDEA运行时,也可以看到webui,一般用于本地测试// 需要引入一个依赖 flink-runtime-webStreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());// 在idea运行,不指定并行度,默认就是 电脑的 线程数env.setParallelism(1);// TODO 2.读取数据:socketDataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);// TODO 3.处理数据: 切换、转换、分组、聚合SingleOutputStreamOperator<Tuple2<String,Integer>> sum = socketDS.flatMap((String value, Collector<String> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(word);}}).returns(Types.STRING).map(word -> Tuple2.of(word, 1)).slotSharingGroup("aaa").returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(value -> value.f0).sum(1);// TODO 4.输出sum.print();// TODO 5.执行env.execute();}
}/**1、slot特点:1)均分隔离内存,不隔离cpu2)可以共享:同一个job中,不同算子的子任务 才可以共享 同一个slot,同时在运行的前提是,属于同一个 slot共享组,默认都是“default”2、slot数量 与 并行度 的关系1)slot是一种静态的概念,表示最大的并发上限并行度是一种动态的概念,表示 实际运行 占用了 几个2)要求: slot数量 >= job并行度(算子最大并行度),job才能运行TODO 注意:如果是yarn模式,动态申请--> TODO 申请的TM数量 = job并行度 / 每个TM的slot数,向上取整比如session: 一开始 0个TaskManager,0个slot--> 提交一个job,并行度10--> 10/3,向上取整,申请4个tm,--> 使用10个slot,剩余2个slot*/

P030【030_Flink运行时架构_核心概念_slot与并行度的关系&演示】21:27

4.2.4 任务槽和并行度的关系

任务槽和并行度都跟程序的并行执行有关,但两者是完全不同的概念。简单来说任务槽是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;而并行度是动态概念,也就是TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。

slot数量 与 并行度 的关系
    1)slot是一种静态的概念,表示最大的并发上限
       并行度是一种动态的概念,表示 实际运行 占用了 几个

    2)要求: slot数量 >= job并行度(算子最大并行度),job才能运行
       TODO 注意:如果是yarn模式,动态申请
         --> TODO 申请的TM数量 = job并行度 / 每个TM的slot数,向上取整
       比如session: 一开始 0个TaskManager,0个slot
         --> 提交一个job,并行度10
            --> 10/3,向上取整,申请4个tm
            --> 使用10个slot,剩余2个slot

P031【031_Flink运行时架构_提交流程_Standalone会话模式&四张图】09:49

4.3 作业提交流程

4.3.1 Standalone会话模式作业提交流程

4.3.2 逻辑流图/作业图/执行图/物理流图

逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)。

P032【032_Flink运行时架构_提交流程_Yarn应用模式】05:18

4.3.3 Yarn应用模式作业提交流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/28534.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

宋浩线性代数笔记(一)行列式的计算

本帖更新b站宋浩老师的线代网课笔记&#xff0c;内容较为细致详细&#xff0c;参考书用的是科学出版社的第三版&#xff0c;之后会附加同济出版社第六版的教材内容。 &#xff08;字不好看大家将就看吧QAQ&#xff09;

css+js实现点击特效效果

话不多说&#xff0c;先上效果图 实现代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title…

Linux(CentOS7)下源码编译 PostgreSQL13.10 安装手册

Linux&#xff08;CentOS7&#xff09;下PostgreSQL安装手册 文章目录 一、准备PostgreSQL二、安装PostgreSQL2.1解压安装包2.2编译PG2.3查看PG安装目录2.4配置PG环境变量2.5查看PG版本2.6创建postgres用户2.7创建PG数据库数据存放目录2.8授权PG数据库数据存放目录2.9切换postg…

在Windows下安装Anaconda平台

Anaconda介绍 安装Python的方法有很多&#xff0c;其中利用Anaconda来安装&#xff0c;是最为安全和便捷的方法之一。在Python中安装类库&#xff0c;各个类库之间可能存在相互依赖、版本冲突等问题。为了解决这个问题&#xff0c;Python社区提供了方便的软件包管理工具&#…

《数据结构》数据结构概念,顺序表,链表

目录 1. 为什么学习数据结构&#xff1f; 2. 数据结构 2.1. 数据 2.2. 逻辑结构 2.3. 存储结构 2.4. 操作 3. 算法 3.1. 算法与程序 3.2. 算法与数据结构 3.3. 算法的特性 3.4. 如何评价一个算法的好坏 4. 线性表 4.1. 顺序表 4.2. 单向链表 4.3. 单向循环链表&…

如何在Windows中将任务导入任务计划程序

任务计划程序使你能够在选定的计算机上自动执行例行任务。任务调度程序通过监视你选择的启动任务的任何条件&#xff08;称为触发器&#xff09;&#xff0c;然后在满足条件时执行任务来实现这一点。 你可以导入导出的任务&#xff0c;这将把导入的任务添加到任务文件夹中&…

复习java基础

复习一天有点忘了的知识&#xff1a; 结构化编程 结构化程式设计(英语:Structured programming)是1960年代开始发展起来的一种编程典范。它采用子程序、程式码区块、for循环以及while循环等结构来取代传统的goto。 指导思想 自顶向下、逐步求精、模块化 编程过程 流程图是…

SOMEIP协议----第一节(概述)

SOMEIP协议 概述 1.什么是SOME/IP? SOME/IP: 如上图所述,连起来就是基于车载以太网技术的面向服务的可扩展中间件 汽车某ECU软件算法如果需要和其他ECU交互,大部分都通过跨ECU之间的服务来实现,即可以通过车载以太网异步调用其他ECU上的服务,应用开发者只需要关注服务…

Linux下安装Mysql (CentOS 7) 详解

文章目录 前言环境检查查看是否安装MySql查看系统版本 源安装安装mysql的yum源官网下载从windows上传到linuxrz命令 方法2&#xff1a; 安装Mysql常见错误密钥问题安装后查看mysql是否可以工作查看是否安装成功启动服务 登录mysql配置文件方法&#xff08;免密码&#xff09; 使…

【ARM Cortex-M 系列 1 -- Cortex-M0, M3, M4, M7, M33 差异】

文章目录 Cortex-M 系列介绍Cortex-M0/M0 介绍Cortex-M3/M4 介绍Cortex-M7 介绍Cotex-M33 介绍 下篇文章&#xff1a;ARM Cortex-M 系列 2 – CPU 之 Cortex-M7 介绍 Cortex-M 系列介绍 Cortex-M0/M0 介绍 Cortex-M0 是 ARM 公司推出的一款微控制器&#xff08;MCU&#xff0…

http连接处理(下)(四)

1.结合代码分析请求报文响应 下面我们将介绍服务器如何响应请求报文&#xff0c;并将该报文发送给浏览器端。首先介绍一些基础API&#xff0c;然后结合流程图和代码对服务器响应请求报文进行详解。 基础API部分&#xff0c;介绍stat、mmap、iovec、writev。 流程图部分&…

go语言中的string类型简介

在 Go 中&#xff0c;String 是一种不可变的类型&#xff0c;不能被修改。 在 Go 语言中&#xff0c;字符串由 Unicode 字符组成&#xff0c;每个字符都可以用一个或多个字节来表示。我们使用双引号或反引号来定义字符串&#xff0c;使用反引号定义的字符串不会对其内容进行任何…