SpringBoot集成flink

Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。
最大亮点是流处理,最适合的应用场景是低时延的数据处理。
场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

环境搭建:

①、安装flink

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/

②、安装Netcat

Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。
用于测试网络中的端口,发送文件等操作。
进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作

yum install -y nc # 安装nc命令nc -lk 8888 # 启动socket端口

无界流之读取socket文本流

一、依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>flink</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- 添加 Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.17.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>1.17.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"><resource>META-INF/spring.factories</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.et.flink.job.SocketJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

二、SoketJob

public class SocketJob{public static void main(String[] args)throws Exception{// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 指定并行度,默认电脑线程数env.setParallelism(3);// 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);// 处理数据: 切换、转换、分组、聚合 得到统计结果SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).setParallelism(2)// // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据.returns(new TypeHint<Tuple2<String, Integer>>() {})
//                .returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(value -> value.f0).sum(1);// 输出sum.print();// 执行env.execute();}
}

测试:

启动socket流:

nc -l 8888

本地执行:直接ideal启动main程序,在socket流中输入

abc bcd cde
bcd cde fgh
cde fgh hij

在这里插入图片描述

集群执行:
执行maven打包,将打包的jar上传到集群中
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/513666.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python】新手入门(2):避免将关键字作为标识符

Python新手入门&#xff08;2&#xff09;&#xff1a;避免将关键字作为标识符 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1…

【Redis】Redis持久化模式RDB

目录 引子 RDB RDB的优缺点 小节一下 引子 不论把Redis作为数据库还是缓存来使用&#xff0c;他肯定有数据需要持久化&#xff0c;这里我们就来聊聊两种持久化机制。这两种机制&#xff0c;其实是 快照 与 日志 的形式。快照:就是当前数据的备份&#xff0c;我可以拷贝到磁…

Win UI3开发笔记(四)设置主题续2

本机深色主题下设置的背景颜色可以作用于整个对话框&#xff0c;本机浅色模式下设置的背景颜色只作用与下边的部分。 如果本机选深色&#xff0c;程序选浅色&#xff0c;设置为light只对上部分管用&#xff0c;下部分不管用。如图&#xff0c;左边那个hello按钮要看不见了。。…

GIS软件应用(一)

任务&#xff1a; 1.加载南京市边界数据、查看投影坐标系并完成投影转换 2.加载科教文卫POI数据、查看投影坐标系并完成投影转换 3.出图要求添加完整出图要素 步骤&#xff1a; 选中shp文件&#xff0c;加载南京市边界数据 在ArcToolbox工具箱中选中Projections and Transf…

5G 网络切片VLAN ID配置错误导致业务不可用

【摘要】随着电联5G共建共享工作的开展&#xff0c;无法及时有效观测到单逻辑站点的相关指标&#xff0c;导致单运营商用户业务出现异常。本案例中着重对单运营商用户无法使用网络进行相关参数排查&#xff0c;从KPI性能指标结合故障告警发生时间&#xff0c;从而分析由于网络切…

基于GitBucket的Hook构建ES检索PDF等文档全栈方案

背景 之前已简单使用ES及Kibana和在线转Base64工具实现了检索文档的demo&#xff0c;预期建设方案是使用触发器类型从公共的文档源拉取最新的文件&#xff0c;然后调用Java将文件转Base64后入ES建索引&#xff0c;再提供封装接口给前端做查询之用。 由于全部内容过长&#xff…

web前端之uniApp实现选择时间功能

MENU 1、孙子组件1.1、html部分1.2、JavaScript部分1.3、css部分 2、子组件2.1、html部分2.2、JavaScript部分2.3、css部分 3、父组件3.1、html部分3.2、JavaScript部分 4、效果图 1、孙子组件 1.1、html部分 <template><view><checkbox-group change"ch…

l从0开始学习NEON(2)

1、前言 继上一个例子&#xff0c;本次继续来学习NEON&#xff0c;本次学习NEON中向量拼接的操作&#xff0c;主要应用在图像的padding中。 https://blog.csdn.net/weixin_42108183/article/details/136440707 2、案例 2.1 案例1 在某些情况下&#xff0c;需要取在每个向量…

UD效果广告

1.定义 全称Unidesk&#xff0c;是由阿里旗下大数据运营平台“阿里妈妈”推出的数字营销引流平台。UD投放将其他媒体的流量通过相关的广告创意导入到天猫店铺。 2.UD投放优化技巧 &#xff08;1&#xff09;不起量排查&#xff1a; 可以从账户问题、计划数量不足、计划设置…

1、Ajax、get、post、ajax,随机颜色

一、Ajax初始 1、什么是Ajax&#xff1f; 异步的JavaScript和xml 2、xml是什么&#xff1f; 一种标记语言&#xff0c;传输和存储数据----------现在用JSON传输数据 3、Ajax的作用 局部加载 可以使网页异步更新 4、Ajax的原理或者步骤(6步) 创建Ajax对象 if (window.X…

微信小程序开发系列(十三)·如何使用iconfont、微信小程序中如何使用字体图标

目录 1. 如何使用iconfont 2. 微信小程序中如何使用字体图标 3. 背景图的使用 1. 如何使用iconfont 在项目中使用到的小图标&#xff0c;一般由公司设计师进行设计&#xff0c;设计好以后上传到阿里巴巴矢量图标库&#xff0c;然后方便程序员来进行使用。 小程序中的字体…

spark 实验二 RDD编程初级实践

目录 一. pyspark交互式编程示例&#xff08;学生选课成绩统计&#xff09; 该系总共有多少学生&#xff1b; 该系DataBase课程共有多少人选修&#xff1b; 各门课程的平均分是多少&#xff1b; 使用累加器计算共有多少人选了DataBase这门课。 二.编写独立应用程序实现数…