【Apache Flink】Flink DataStream API的基本使用

Flink DataStream API的基本使用

文章目录

  • 前言
  • 1. 基本使用方法
  • 2. 核心示例代码
  • 3. 完成工程代码
    • pom.xml
    • WordCountExample
    • 测试验证
  • 4. Stream 执行环境
  • 5. 参考文档

前言

Flink DataStream API主要用于处理无界和有界数据流 。
无界数据流是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。

有界数据流是一个具有明确开始和结束点的数据集,例如一个文件或数据库表。这种类型的数据流通常在批处理场景中使用,其中所有数据都已经可用,并可以一次性处理。

Flink的DataStream API提供了一套丰富的操作符,如map、filter、reduce、aggregations、windowing、join等,以支持各种复杂的数据处理和分析需求。此外,DataStream API还提供了容错保证,能确保在发生故障时,应用程序能从最近的检查点(checkpoint)恢复,从而实现精确一次(exactly-once)的处理语义。

1. 基本使用方法

  1. 创建执行环境:

    每一个Flink程序都需要创建一个StreamExecutionEnvironment(执行环境),它可以被用来设置参数和创建从外部系统读取数据的流。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
  2. 创建数据流:

    你可以从各种数据源中创建数据流,如本地集合,文件,socket等。下面的代码是从本地集合创建数据流的示例:

    DataStream<String> dataStream = env.fromElements("hello", "flink");
    
  3. 转换操作:

    Flink提供了丰富的转换操作,如mapfilterreduce等。以下代码首先将每个字符串映射为其长度,然后过滤出长度大于5的元素:

    DataStream<Integer> transformedStream = dataStream.map(s -> s.length()).filter(l -> l > 5);
    
  4. 数据输出:

    Flink支持将数据流输出到各种存储系统,如文件,socket,数据库等。下面的代码将数据流输出到标准输出:

    transformedStream.print();
    
  5. 执行程序:

    将上述所有步骤放在main函数中,并在最后调用env.execute()方法来启动程序。Flink程序是懒加载的,只有在调用execute方法时才会真正开始执行。

    env.execute("Flink Basic API Usage");
    

2. 核心示例代码

使用Flink DataStream API构建一个实时Word Count程序,它会从一个socket端口读取文本数据,统计每个单词的出现次数,并将结果输出到标准输出。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountExample {public static void main(String[] args) throws Exception {// 1. 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器DataStream<String> textStream = env.socketTextStream("localhost", 9000);// 3. 转换操作DataStream<Tuple2<String, Integer>> wordCountStream = textStream.flatMap(new LineSplitter()) // 将文本行切分为单词.keyBy(0) // 按单词分组.sum(1); // 对每个单词的计数求和// 4. 数据输出wordCountStream.print();// 5. 执行程序env.execute("Socket Word Count Example");}// 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}
}

3. 完成工程代码

下面是一个基于Apache Flink的实时单词计数应用程序的完整工程代码,包括Pom.xml文件和所有Java类。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>flink-wordcount-example</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><properties><flink.version>1.13.2</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>

WordCountExample

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountExample {public static void main(String[] args) throws Exception {// 1. 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器DataStream<String> textStream = env.socketTextStream("localhost", 9000);// 3. 转换操作DataStream<Tuple2<String, Integer>> wordCountStream = textStream.flatMap(new LineSplitter())  // 将文本行切分为单词.keyBy(0)  // 按单词分组.sum(1);  // 对每个单词的计数求和// 4. 数据输出wordCountStream.print();// 5. 执行程序env.execute("Socket Word Count Example");}// 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) {for (String word : line.split(" ")) {out.collect(new Tuple2<>(word, 1));}}}
}

现在,你可以使用Maven编译并运行这个程序。在启动程序之前,你需要在本地启动一个端口为9000的Socket服务器。这可以通过使用Netcat工具 (nc -lk 9000) 或者其他任何能打开端口的工具实现。然后,你可以输入文本行,Flink程序会统计每个单词出现的次数,并实时打印结果。

测试验证

用py在本地启动一个socket服务器,监听9000端口,

python比较简单实现一个socket通信 。写一个Python来验证上面写的例子。

import socketserver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 9000))
server_socket.listen(1)print("Waiting for connection...")
client_socket, client_address = server_socket.accept()
print("Connected to:", client_address)while True:data = input("Enter text: ")client_socket.sendall(data.encode())

运行Flink程序和Python socket服务器,然后在Python程序中输入文本, 会看到Flink程序实时统计每个单词出现的次数并输出到控制台。

4. Stream 执行环境

开发学习过程中,不需要关注。每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment。
在这里插入图片描述

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

Flink runtime: client, job manager, task managers
此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。

5. 参考文档

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/datastream_api/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/158121.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

unraid 安装并设置 zerotier 内网穿透安装 unraid 局域网内其他设备

Read Original 最近看了以下两个文章&#xff0c;感谢发布的各种精彩文章&#xff0c;让我受益匪浅。OPENWRT 的固件在设置了&#xff0c;【自动允许客户端 NAT】后&#xff0c;可以直接访问局域网其他设备&#xff0c;而我 unraid 部署 zerotier 后&#xff0c;只能访问 unra…

API接口安全设计

简介 HTTP接口是互联网各系统之间对接的重要方式之一&#xff0c;使用HTTP接口开发和调用都很方便&#xff0c;也是被大量采用的方式&#xff0c;它可以让不同系统之间实现数据的交换和共享。 由于HTTP接口开放在互联网上&#xff0c;所以我们就需要有一定的安全措施来保证接口…

C++异常处理

C 异常处理涉及到三个关键字&#xff1a;try、catch、throw。 在 c程序中&#xff0c;任何需要检测异常的语句&#xff0c;都必须在 try 语句块中执行&#xff0c;异常必须由紧跟着 try 语句后面的 catch 语句来捕获并处理&#xff0c;因此 try 与 catch 总是结合使用&#xff…

C++并发编程实战——05.内存模型与原子操作

文章目录 内存模型与原子操作内存模型原子操作和原子类型标准原子类型std::atomic_flagstd::atomic\<bool>std::atomic<T\*>std::atomic<user_define_type> 类模板非成员函数 同步操作和强制排序同步发生与先行发生内存序**顺序一致性**(memory_order_seq_cs…

图解系列--路由器和它庞大的功能

03.01 何为路由器 路由器是指主要负责 OSI参考模型中网络层的处理工作&#xff0c;并根据路由表信息在不同的网络 之间转发IP 分组的网络硬件(图3-1)。这里的网络一般是指IP 子网&#xff0c;也可以称为广播域。此外&#xff0c;现在的路由器还会搭载其他各种各样的功能。 0…

【Midjourney入门教程3】写好prompt常用的参数

文章目录 1、图片描述词&#xff08;图片链接&#xff09;文字描述词后缀参数2、权重划分3、后缀参数版本选择&#xff1a;--v版本风格&#xff1a;--style长宽比&#xff1a;--ar多样性: --c二次元化&#xff1a;--niji排除内容&#xff1a;--no--stylize--seed--tile、--q 4、…

openeuler 使用指令查找U盘:输入fdisk -l,内核崩溃 ,系统重启,使用lsblk显示正常,数据传输正常

报错日志&#xff1a; [rootedgenode1 ~]# fdisk -l Disk /dev/ram0: 4 MiB, 4194304 bytes, 8192 sectors Units: sectors of 1 * 512 512 bytes Sector size (logical/physical): 512 bytes / 4096 bytes I/O size (minimum/optimal): 4096 bytes / 4096 bytes Disk /d…

数据结构与算法:使用数组模拟队列Java版

逻辑分析 代码实现 package com.haimeng.queue;import java.util.Scanner;public class ArrayQueueDemo {public static void main(String[] args) {//测试一把//创建一个队列ArrayQueue queue new ArrayQueue(3);char key ; //接收用户输入Scanner scanner new Scanner(S…

web3 在React dapp中全局管理web3当前登录用户/智能合约等信息

上文 Web3 React项目Dapp获取智能合约对象我们在自己的前端dapp项目中链接获取到了 自己的智能合约 我们继续 我们还是先启动ganache环境 终端输入 ganache -d然后发布一下我们的智能合约 打开我们的合约项目 终端输入 truffle migrate --reset这样 我们的智能合约就部署到区…

L1和L2正则化通俗理解

机器学习中&#xff0c;如果参数过多&#xff0c;模型过于复杂&#xff0c;容易造成过拟合&#xff08;overfit&#xff09;。即模型在训练样本数据上表现的很好&#xff0c;但在实际测试样本上表现的较差&#xff0c;不具备良好的泛化能力。为了避免过拟合&#xff0c;最常用的…

腾讯云域名备案后,如何解析到华为云服务器Linux宝塔面板

一、购买域名并且进行备案和解析&#xff0c;正常情况下&#xff0c;购买完域名&#xff0c;如果找不到去哪备案&#xff0c;可以在腾讯云上搜索“备案”关键词就会出现了&#xff0c;所以这里不做详细介绍&#xff0c;直接进行步骤提示&#xff1a; 二、申请ssl证书&#xff0…

LeetCode算法题解|​ 669. 修剪二叉搜索树​、108. 将有序数组转换为二叉搜索树、​538. 把二叉搜索树转换为累加树​

一、LeetCode 669. 修剪二叉搜索树​ 题目链接&#xff1a;669. 修剪二叉搜索树 题目描述&#xff1a; 给你二叉搜索树的根节点 root &#xff0c;同时给定最小边界low 和最大边界 high。通过修剪二叉搜索树&#xff0c;使得所有节点的值在[low, high]中。修剪树 不应该 改变…