Flink使用总结

本文主要是为Flink的java客户端使用和flink-sql使用的大致介绍,具体使用查看文档页面。

java client使用

文档

Apache Flink Documentation | Apache Flink

数据处理模型

 maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink_test</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- Apache Flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.15.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.15.4</version></dependency><!-- Apache Flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.4</version></dependency><!-- Kafka Client --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.1</version></dependency><!--json--><dependency><groupId>org.json</groupId><artifactId>json</artifactId><version>20210307</version></dependency><!-- 解决 No ExecutorFactory found to execute the application--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.15.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.15.4</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version></dependency></dependencies><!--build fat jar--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><archive><manifest><mainClass>com.KafkaDataProcessor</mainClass></manifest><manifestEntries><Encoding>UTF-8</Encoding></manifestEntries></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><compilerArgs><arg>-Xlint:unchecked</arg><arg>-Xlint:deprecation</arg></compilerArgs></configuration></plugin></plugins></build>
</project>

代码样例

读取kafka并打印结果

KafkaFlinkExample

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaFlinkExample {public static void main(String[] args) throws Exception {// 设置 Flink 程序的执行环境// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.10.153:9092");props.setProperty("group.id", "test");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props);env.addSource(consumer).map(data -> "Received: " + data).print();env.execute("Kafka Flink Example");}
}

处理kafka数据并保存结果入新的topic

KafkaDataProcessor

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.json.JSONObject;import java.util.Properties;public class KafkaDataProcessor {public static void main(String[] args) throws Exception {// 设置 Flink 程序的执行环境
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建一个本地流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// 设置 Kafka 的配置信息Properties properties = new Properties();properties.setProperty("bootstrap.servers", "192.168.10.153:9092");properties.setProperty("group.id", "flink-consumer-group");// 创建 Kafka 消费者,并从指定的 topic 中读取数据FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), properties);DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);// 将 JSON 数据解析并添加性别字段DataStream<String> processedDataStream = kafkaDataStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {// 解析 JSON 数据JSONObject jsonObject = new JSONObject(value);String name = jsonObject.getString("name");int id = jsonObject.getInt("id");int age = jsonObject.getInt("age");// 根据姓名判断性别String gender;if (name.equals("jack")) {gender = "male_xxx";} else {gender = "female_xxx";}// 构造新的 JSON 数据JSONObject newJsonObject = new JSONObject();newJsonObject.put("name", name);newJsonObject.put("id", id);newJsonObject.put("age", age);newJsonObject.put("gender", gender);return newJsonObject.toString();}});// 创建 Kafka 生产者,并将新的数据写入指定的 topicFlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties);processedDataStream.addSink(kafkaProducer);// 执行程序env.execute("Kafka Data Processor");}
}

设置执行并行度

LocalWebUI 

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class LocalWebUI {public static void main(String[] args) throws Exception {//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Configuration configuration = new Configuration();//创建一个带webUI的本地执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);int parallelism = env.getParallelism();Properties props = new Properties();props.setProperty("bootstrap.servers", "192.168.10.153:9092");props.setProperty("group.id", "test");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props);System.out.println("执行环境的并行度:" + parallelism);
//        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);DataStreamSource<String> lines = env.addSource(consumer);int parallelism1 = lines.getParallelism();System.out.println("socketTextStream创建的DataStreamSource的并行度:" + parallelism1);SingleOutputStreamOperator<String> uppered = lines.map(line -> line.toUpperCase());int parallelism2 = uppered.getParallelism();System.out.println("调用完map方法得到的DataStream的并行度:" + parallelism2);DataStreamSink<String> print = uppered.print();int parallelism3 = print.getTransformation().getParallelism();System.out.println("调用完print方法得到的DataStreamSink的并行度:" + parallelism3);env.execute();}
}

本地执行

Flink可以和Spark类似,开发过程中,在本地临时执行,需要两个条件

1.需要flink-client依赖引入,否则会报No ExecutorFactory found to execute the application

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.15.4</version>
</dependency>

2.设置flink的执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

 集群执行

打包

mvn clean package

提交任务

flink run -c com.KafkaDataProcessor /root/flink_test-1.0-SNAPSHOT-jar-with-dependencies.jar 

观察任务状态

Job---->>Running Jobs

 结束任务

Job---->>Running Jobs--->>点击任务---->>Cannel Job

 flink-sql

文档

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/common/

需求案例

汇总kafka数据,将结果保存入mysql中

依赖准备

mysql版本是8.0.25,flink版本是1.15.4,connector的版本一定要和flink版本保持一致所有集群节点的lib一定要保持一致然后重启

依赖下载位置:Central Repository:

mysql-connector-java-8.0.25.jar
flink-connector-jdbc-1.15.4.jar
kafka-clients-2.8.1.jar
flink-connector-kafka-1.15.4.jar

启动客户端 

cd /root/flink-1.15.4/bin

./sql-client.sh

准备结果表

CREATE TABLE sync_test_1 (`day_time` varchar(64) NOT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`day_time`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

sql配置

create table flink_test_1 ( id BIGINT,day_time VARCHAR,amnount BIGINT,proctime AS PROCTIME ()
)with ( 'connector' = 'kafka','topic' = 'flink_test','properties.bootstrap.servers' = '192.168.10.153:9092', 'properties.group.id' = 'flink_gp_test1','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true','properties.zookeeper.connect' = '192.168.10.153:2181/kafka');CREATE TABLE sync_test_1 (day_time string,total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.10.151:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false','table-name' = 'sync_test_1','username' = 'root','password' = '123456');INSERT INTO sync_test_1 
SELECT day_time,SUM(amnount) AS total_gmv
FROM flink_test_1
GROUP BY day_time;

 测试数据

./bin/kafka-console-producer.sh --bootstrap-server 192.168.10.153:9092 --topic flink_test

{"day_time": "20201009","id": 7,"amnount":20}

查看数据结果

 来源:

docs/sql_demo/demo_1.md · 无情(朱慧培)/flink-streaming-platform-web - Gitee.com

flink-sql大量使用案例_flink sql使用_第一片心意的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/20641.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

代价函数(Cost Function)

基本概念 代价函数也被称作平方误差函数&#xff0c;有时也被称为平方误差代价函数。我们之所以要求出误差的平方和&#xff0c;是因为误差平方代价函数&#xff0c;对于大多数问题&#xff0c;特别是回归问题&#xff0c;都是一个合理的选择。还有其他的代价函数也能很好地发挥…

微信小程序做登录密码显示隐藏效果 并解决安卓手机端隐藏密码时小黑点显示过大问题

在编辑器和苹果手机上面显示就是正常的大小&#xff0c;在安卓手机上面黑点就非常大&#xff0c;需要单独调 安卓手机显示比较大 wxml 注意&#xff1a;在html中的input是通过切换type的属性值来实现隐藏显示的 在微信小程序的input里面type没有password属性 是通过password属…

Linux开发工具之vim工具的使用介绍

目录 前言 1.vim的基本概念 命令模式(Normal mode) 插入模式(Insert mode) 末行模式(last line mode) 2.vim的基本操作 命令模式的命令集 移动光标 ​编辑 删除文字 复制 替换 撤销操作 更改 vim末行模式命令集 简单vim配置 总结 前言 大家好呀&#xff0c;许久…

数据结构 | 图的最短路径 Floyd算法

一、数据结构定义 typedef int VertexType; typedef int EdgeType;/*图*/ typedef struct {VertexType Vexs[SIZE]; //结点 EdgeType Edges[SIZE][SIZE]; //权值 int vexnum, arcnum; }MGraph;/*路径*/ typedef struct {int path[SIZE][SIZE];EdgeType length; }Path; 1.二维…

微服务 云原生:微服务相关技术简要概述

后端架构演进 单体架构 所谓单体架构&#xff0c;就是只有一台服务器&#xff0c;所有的系统、程序、服务、应用都安装在这一台服务器上。比如一个 bbs 系统&#xff0c;它用到的数据库&#xff0c;它需要存储的图片和文件等&#xff0c;统统都部署在同一台服务器上。 单体架…

白皮书案例解读|数字孪生与港口的结合会碰撞出什么样的火花呢?

以下案例来自于《数字孪生世界白皮书&#xff08;2023版&#xff09;》 领取方式&#xff1a;公众号「EasyV数字孪生」后台回复「白皮书」即可领取&#xff01; 嗨&#xff0c;我又出现啦&#xff5e;今天想和大家聊聊关于港口场景数字孪生技术的应用&#xff0c;欢迎大家踊跃…

基于JavaSwing+MySQL的仓库商品管理系统

点击以下链接获取源码&#xff1a; https://download.csdn.net/download/qq_64505944/88046204?spm1001.2014.3001.5503 JDK1.8 MySQL5.7 功能&#xff1a;管理员与员工两个角色登录&#xff0c;增删改查用户信息&#xff0c;修改密码&#xff0c;增删改查商品信息&#xff0c…

opencv基础:环境配置

最近人工智能很火&#xff0c;所以蹭个热度&#xff0c;聊一个跨平台计算机视觉库----Opencv。 定义 先看一下其定义&#xff1a; OpenCV是一个基于Apache2.0许可&#xff08;开源&#xff09;发行的跨平台计算机视觉和机器学习软件库&#xff0c;可以运行在Linux、Windows、…

vue3中的excel表导出功能(选中导出或导出所有,也可支持vue2)

1.安装模块 npm install xlsx file-saver -S 2.文件导入 import * as XLSX from "xlsx"; import FileSaver from "file-saver" 3.整体代码(可选中导出或导出所有) <template><div><el-button type"warning" click"down&quo…

5.2 基于ROP漏洞挖掘与利用

通常情况下栈溢出可能造成的后果有两种&#xff0c;一类是本地提权另一类则是远程执行任意命令&#xff0c;通常C/C并没有提供智能化检查用户输入是否合法的功能&#xff0c;同时程序编写人员在编写代码时也很难始终检查栈是否会发生溢出&#xff0c;这就给恶意代码的溢出提供了…

Maven引入Jacoco插件后无法生成jacoco.exec执行文件

目录 jacoco.exec网上常见关于未生成jacoco.exec原因最终解决方案不生效原因解决方案 完整jacoco插件配置 jacoco.exec 执行数据文件&#xff0c;只有生成该文件&#xff0c;才表示引入插件jacoco成功生效 网上常见关于未生成jacoco.exec原因 网上找了一下解决方式基本都是…

React Dva修改路由设置,不要井号

我们Dva项目的路由 他默认是设置了带井号的这种 其实我觉得到还可以 但是有些人会觉得不太美观 如果 你想去除他 那么 你先要在终端执行 npm install --save history将 history 引入进来 装好之后 我们来到src下的 index.js 加上如下代码 import {createBrowserHistory as …