创建第一个 Flink 项目

一、运行环境介绍

Flink执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助Hadoop Yarnk8sMesos等不同的资源管理器部署自己的应用。

环境依赖:
【1】JDK环境:Flink核心模块均使用 Java开发,所以运行环境需要依赖JDKJDK版本需要保证在1.8以上。
【2】Maven编译环境:Flink的源代码目前仅支持通过 Maven进行编译,所以如果需要对源代码进行编译,或通过IDE开发Flink Application,则建议使用Maven作为项目工程编译方式。需要注意的是,Flink程序需要Maven的版本在3.0.4及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
【3】IDEA:需要安装scala插件以及scala环境等;

二、Flink项目 Scala版 DataSet 有界流

需求:同进文件文件中的单词出现的次数;

【1】创建Maven项目,pom.xml文件中配置如下依赖

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.10.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.10.0</version></dependency>
</dependencies><build><plugins><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><!--声明绑定到 maven 的compile阶段--><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

【2】resource目录中添加需要进行统计的文件文件及内容
[点击并拖拽以移动] ​

【3】WordCount.java文件内容如下,需要注意隐私转换问题,需要引入scala._

 import org.apache.flink.api.scala._/**
* @Description 批处理 word count
* @Author zhengzhaoxiang
* @Date 2020/7/12 18:55
* @Param
* @Return
*/
object WordCount {def main(args: Array[String]): Unit = {//创建一个批处理的执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//从文件中读取数据var inputDateSet: DataSet[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//基于Dataset 做转换,首先按空格打散,然后按照 word作为key做group byval resultDataSet: DataSet[(String,Int)] = inputDateSet.flatMap(_.split(" "))//分词得到所有 word构成的数据集.map((_,1))//_表示当前 word 转换成一个二元组(word,count).groupBy(0)//以二元组中第一个元素作为key.sum(1) //1表示聚合二元组的第二个元素的值//打印输出resultDataSet.print()}
}

【4】统计结果展示:
[点击并拖拽以移动] ​

三、Flink项目 Scala版 DataStream 无界流

【1】StreamWordCount.java文件内容如下

package com.zzx.flinkimport org.apache.flink.streaming.api.scala._object StreamWordCount {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 接受 socket 文本流val inputDataStream: DataStream[String] = env.socketTextStream("hadoop1",6666);//定义转换操作 word countval resultDataStream: DataStream[(String,Int)] = inputDataStream.flatMap(_.split(" "))//以空格分词,得到所有的 word.filter(_.nonEmpty).map((_,1))//转换成 word count 二元组.keyBy(0)//按照第一个元素分组.sum(1)//按照第二个元素求和resultDataStream.print()//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束env.execute("stream word count word")}
}

【2】我这里在Hadoop1中通过nc -lk xxx打开一个socket通信
点击并拖拽以移动​

【3】查看IDEA输出统计内容如下:输出word的顺序不是按照输入的顺序,是因为它有并行度(多线程)是并行执行的。最前面的数字是并行子任务的编号类似线程号。最大的数字其实跟你cpu核数是息息相关的。这个并行度也可以通过env.setParallelism进行设置。我们也可以给每一个任务(算子)设置不同的并行度;
[点击并拖拽以移动] ​

【4】当我们需要将Java文件打包上传到Flink的时候,这里的hostport可以从参数中进行获取,代码修改如下:

package com.zzx.flinkimport org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._object StreamWordCount {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 接受 socket 文本流  hostname:prot 从程序运行参数中读取val params: ParameterTool = ParameterTool.fromArgs(args);val hostname: String = params.get("host");val port: Int = params.getInt("port");val inputDataStream: DataStream[String] = env.socketTextStream(hostname,port);//定义转换操作 word countval resultDataStream: DataStream[(String,Int)] = inputDataStream.flatMap(_.split(" "))//以空格分词,得到所有的 word.filter(_.nonEmpty).map((_,1))//转换成 word count 二元组.keyBy(0)//按照第一个元素分组.sum(1)//按照第二个元素求和resultDataStream.print()//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束env.execute("stream word count word")}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/256070.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

11.7QT界面制作

#include "widget.h"Widget::Widget(QWidget *parent): QWidget(parent) {this->resize(881,550);this->setStyleSheet("backgroud-color:rgb(33,35,40)");this->setWindowFlag(Qt::FramelessWindowHint);//标签类QLabel *l1 new QLabel(this);/…

多表操作、其他字段和字段参数、django与ajax(回顾)

多表操作 1 基于对象的跨表查 子查询----》执行了两句sql&#xff0c;没有连表操作 2 基于双下滑线的连表查 一次查询&#xff0c;连表操作 3 正向和反向 放在ForeignKey,OneToOneField,ManyToManyField的-related_namebooks&#xff1a;双下滑线连表查询&#xff0c;反向…

Python函数默认参数设置

在某些情况下&#xff0c;程序需要在定义函数时为一个或多个形参指定默认值&#xff0c;这样在调用函数时就可以省略为该形参传入参数值&#xff0c;而是直接使用该形参的默认值。 为形参指定默认值的语法格式如下&#xff1a; 形参名 默认值 从上面的语法格式可以看出&…

JVM 类的加载器的基本特征和作用

Java全能学习面试指南&#xff1a;https://javaxiaobear.cn 1、作用 类加载器是 JVM 执行类加载机制的前提 ClassLoader的作用&#xff1a; ClassLoader是Java的核心组件&#xff0c;所有的Class都是由ClassLoader进行加载的&#xff0c;ClassLoader负责通过各种方式将Class信…

探索Scrapy-spider:构建高效网络爬虫

Spider简介 Scrapy中的Spider是用于定义和执行数据抓取逻辑的核心组件。Spider负责从指定的网站抓取数据&#xff0c;并定义了如何跟踪链接、解析内容以及提取数据的规则。它允许您定制化地指定要抓取的网站、页面和所需的信息。Spider的作用是按照预定的规则爬取网页&#xf…

数据结构 图的广度优先搜索和深度优先搜索

一、广度优先搜索 广度优先搜索等价于树的层次遍历&#xff0c;将起点的每一层进行遍历 当这一层结点全部被遍历完时&#xff0c;再遍历下一层次&#xff0c;从图中可以根据距离遍历起点的长度进行层次选择 例&#xff1a; 以a结点作为开始结点 a的下一层次有b c e三个结点 所以…

添加新公司代码的配置步骤-Part4

原文地址&#xff1a;配置公司代码 概述 这是一系列讨论和列出向系统添加新公司代码时必须完成的事务的四篇博客中的最​​后一篇。以下是这四个文档涵盖的主题列表&#xff1a; 企业结构 - 第 1 部分 FI 配置 – 第 2 部分 SD 配置 – 第 3 部分 物流 – 概述 – 第 3 部分…

静态网站生成器与服务器端渲染有啥区别

在将网站部署到服务器之前&#xff0c;在构建阶段生成HTML页面被称为“静态网站生成&#xff08;Static Site Generation&#xff09;”。这种方法涉及使用网站模板创建预构建页面&#xff0c;并在用户请求时立即交付给他们。以下是静态生成网站的一些好处&#xff1a; 更快的页…

【从零开始学习JVM | 第五篇】快速了解运行时数据区

前言&#xff1a; 当谈论 Java 程序的运行机制时&#xff0c;JVM&#xff08;Java 虚拟机&#xff09;的运行时数据区是一个必不可少的话题。JVM 运行时数据区是 Java 程序在运行过程中分配内存和管理数据的重要区域&#xff0c;它包括了方法区、堆、虚拟机栈、程序计数器和本地…

【Spring 源码】 贯穿 Bean 生命周期的核心类之 AbstractAutowireCapableBeanFactory

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

小白学习java理解栈手写栈——第四关(青铜挑战)

内容1.理解栈的基本特征2.理解如何使用数组来构造栈3.理解如何使用链表来构造栈 1.栈的基础知识 1.1栈的特征 栈和队列是比较特殊的线性表&#xff0c;又称为访问受限的线性表。栈是很多表达式、符号等运算的基础&#xff0c;也是递归的底层实现&#xff0c;理论上递归能做的…

linux学习之详解文件

目录 1.先认识文件 2.c语言中常用文件接口 fopen&#xff08;打开文件&#xff09; 3.系统接口操作文件 open write 文件的返回值以及打开文件的本质 理解struct_file内核对象 了解文件描述符&#xff08;fd&#xff09;分配规则 重定向 dup接口 标准错误流 文件缓冲…