【极数系列】Flink集成DataSource读取Socket请求数据(09)

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于socket套接字读取数据
    • 3.1 从套接字读取。元素可以由分隔符分隔。
    • 3.2 windows安装netcat工具
      • (1)下载netcat工具
      • (2)安装部署
      • (3)启动socket端口监听
  • 04 源码实战demo
    • 4.1 pom.xm依赖
    • 4.2创建socket数据流作业
    • 4.3实时cmd窗口输入数据

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkSocketSourceJob(socket请求)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于socket套接字读取数据

3.1 从套接字读取。元素可以由分隔符分隔。

3.2 windows安装netcat工具

(1)下载netcat工具

下载地址:https://eternallybored.org/misc/netcat/

(2)安装部署

注意:不是拷贝整个文件夹,而是文件夹里面的全部文件。

将解压后的单个文件全部拷贝到C:\Windows\System32的文件夹下。

(3)启动socket端口监听

注意:该端口需要跟代码中监听的端口一致,否则获取不到数据

nc -l -p 8081

04 源码实战demo

4.1 pom.xm依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xsy</groupId><artifactId>aurora_flink</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>11</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.18.0</flink.version><!--scala版本--><scala.binary.version>2.11</scala.binary.version><!--log4j依赖--><log4j.version>2.17.1</log4j.version></properties><!--通用依赖--><dependencies><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--================================集成外部依赖==========================================--><!--集成日志框架 start--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!--集成日志框架 end--></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build><!--配置Maven项目中需要使用的远程仓库--><repositories><repository><id>aliyun-repos</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><!--用来配置maven插件的远程仓库--><pluginRepositories><pluginRepository><id>aliyun-plugin</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></project>

4.2创建socket数据流作业

package com.aurora.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;/*** @description flink的socket请求的source应用* @author 浅夏的猫* @datetime 23:03 2024/1/28
*/
public class FlinkSocketSourceJob {private static final Logger logger = LoggerFactory.getLogger(FlinkSocketSourceJob.class);public static void main(String[] args) throws Exception {//1.创建Flink运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.设置Flink运行模式://STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//3.基于socket请求的source使用DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",8081);//4.输出打印dataStreamSource.print();//5.启动运行env.execute();}}

4.3实时cmd窗口输入数据

注意:先启动cmd窗口监听再启动程序,否则会报端口连接失败

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/442741.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

苍穹外卖项目可以写的简历和如何优化简历

文章目录 重点写中规写添加自己个性的项目面试会问道的问题 我是一名双非大二计算机本科生&#xff0c;希望我的分享对你有帮助&#xff0c;点赞关注不迷路。 简历编写一直是很多人求职人的心病&#xff0c;我自己上学期有一门课程是去校内企业面试&#xff0c;当时我就感受出…

服装行业ERP系统解决方案

我国的服装企业大多属于劳动密集型&#xff0c;主要有三种类型&#xff1a;自有品牌服装生产销售企业、接订单生产型企业及处于产业链下游的零售分销企业。在经营过程中&#xff0c;服装行业面临诸多挑战&#xff0c;如流行周期短、季节性强&#xff0c;市场变化快&#xff1b;…

福禄克万用表内置保险丝的测量技巧

所需设备&#xff1a; 1、Fluke ADPT连接器&#xff1b; 2、Fluke 15B / 17B / 18B&#xff1b; 3、Fluke 15B Max / 17B Max&#xff1b; 在不打开万用表后盖的情况下&#xff0c;判断内部的保险丝是否完好&#xff1b; 将万用表档位调整到 “电阻” 档&#xff1b;如下…

java基于springboot的美妆化妆品商城购物网站ssm+vue

美妆购物网站分为管理员&#xff0c;商家&#xff0c;用户三种权限。 用户可以注册&#xff0c;可以登录&#xff0c;用户进入到首页可以看到热门化妆品和新品化妆品&#xff0c;可以选购化妆品&#xff08;可以通过搜索查询&#xff09;加入购物车&#xff0c;查看化妆品详细情…

Android渗透测试工具之Drozer-安装

一、介绍&#xff1a; drozer&#xff08;以前称为 Mercury&#xff09;是领先的 Android 安全测试框架。 drozer 允许您通过扮演应用程序的角色并与 Dalvik VM、其他应用程序的 IPC 端点和底层操作系统交互来搜索应用程序和设备中的安全漏洞。 drozer 提供的工具可帮助您使…

JAVA处理类似饼状图占比和100%问题,采用最大余额法

前言&#xff1a; 在做数据统计报表的时候&#xff0c;有两种方式解决占比总和达不到100%或者超过100%问题。 第一种方式是前端echart图自带的算分框架。 第二种方式是java后端取处理这个问题。 现存问题&#xff1a; 前端不通过饼状图的方式去展示各个分类的占比累加和为100%问…

Java学习之基础语法

Java学习之基础语法 本文主要是对于有了其他语言基础的人总结的资料&#xff0c;因此本文只写出了Java与C语言&#xff0c;C等语言的区别之处与部分重点。 1.基础语法&#xff1a; 1.1.包与类&#xff1a; 1.1.1.包&#xff1a; 在Java中&#xff0c;包&#xff08;packag…

【C++】类和对象_1_定义和定义域

1.类的定义&#xff1a; class className {//类体&#xff1a;由成员函数和成员变量组成 };//注意后面的分号class为定义类的关键字&#xff0c;ClassName为类的名字&#xff0c;{}中为类的主体&#xff0c;注意类定义结束时后面分号不能省略。 类体中内容称为类的成员&#x…

docker安全与https协议

一、docker存在的安全问题 1、docker 自身漏洞 docker 应用本身实现上会有代码缺陷&#xff0c;docker 历史版本共有超过 20 项漏洞 2、docker公有仓库安全问题 docker 提供了 docker hub&#xff0c;可以让用户上传创建的镜像&#xff0c;以便其他用户下载&#xff0c;快速…

05 MyBatis之表关系的声明+事务+SqlSession三件套的作用域

MyBatis 支持一对一&#xff0c;一对多&#xff0c;多对多查询。XML 文件和注解都能实现关系的操作。多对多实质就是一对多 1. 表关系的维护 1.1 One一对一 一对一查询和多表(两表)查询很相似, 都能查询两表的全部属性 区别是一对一可以在对象中嵌套对象, 呈现包含关系; 多表…

elementUI之用label传值给v-model的el-radio

如图 这个控件比较特别&#xff0c;是用label作为传给v-model的值&#xff0c;其他控件大多都是用value来传值。 el-radio-group的作用是将其中的单选控件作为一组。

1 计算机网络概述(一):概述

目录 目标1 计算机网络概述1.1 计算机网络的定义和演变1、计算机网络的定义2、计算机网络的演变3、网络发展的里程碑4、我国的网络发展 1.2 计算机网络的分类、组成和网络性能1、计算机网络的分类2、计算机网络的组成3、网络主要性能 目标 了解计算机网络的产生和发展过程了解…