【入门Flink】- 07Flink DataStream API【万字篇】

DataStream API 是 Flink 的核心层 API。一个 Flink 程序,其实就是对DataStream的各种转换。

代码基本上都由以下几部分构成:

image-20231104230137211

执行环境(Execution Environment)

1)创建执行环境StreamExecutionEnvironment

StreamExecutionEnvironment 类的对象,这是所有Flink程序的基础。调用这个类的静态方法,具体有以下三种

(1)getExecutionEnvironment √

最简单的方式,就是直接调用 getExecutionEnvironment 方法。这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()

(2)createLocalEnvironment

返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数。

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

(3)createRemoteEnvironment

返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的JAR包
);

在获取到程序执行环境后,还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

2)执行模式(Execution Mode)

从 Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理。不建议使用 DataSet API。

// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream API 执行模式包括:流执行模式、批执行模式和自动模式

  • 流执行模式(Streaming)

    DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 Streaming 执行模式

  • 批执行模式(Batch)

    专门用于批处理的执行模式。

  • 自动模式(AutoMatic)

    在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

(1)通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加 execution.runtime-mode 参数,指定值为BATCH。

(2)通过代码配置

image-20231104231309108

实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

3)触发程序执行

Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”“懒执行”

同步执行 or 异步执行(了解)

execute() or executeAsync()

env.execute(); // 同步
env.executeAsync(); // 异步

exexute总结

  1. 默认 env.execute() 触发一个Flink job
    • 一个main方法可以调用多个execute,但是没有意义,指定一个就会阻塞住(同步)
  2. env.executeAsync(),异步,不阻塞
    • 一个main方法里 executeAsync()个数 = 生成Flink job数

源算子(Source)

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source 是整个处理程序的输入端。

在 Flink1.12 以前,旧的添加 source 的方式,是调用执行环境的addSource()方法:

DataStream<String> stream = env.addSource(...);

方法传入的参数是一个“源函数”(source function),需要实现SourceFunction 接口。

从 Flink1.12 开始,主要使用流批统一的新 Source 架构fromSource()方法):

DataStreamSource<String> stream = env.fromSource();

Flink 直接提供了很多预实现的接口,此外还有很多外部连接工具也实现了对应的Source,通常情况下足以应对实际需求。

1)从集合中读取数据

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.fromElements(1, 2, 3, 4, 5)  // 直接填写元素
//                .fromCollection(Arrays.asList(1, 2, 3, 4, 5))  // 从集合读取元素.print();env.execute();

2)从文件读取数据

通常情况,会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

读取文件,需要添加文件连接器依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency>
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),// 文件路径,相对路径不行就用绝对路径Path.fromLocalFile(new File("D:\\workspace\\IdeaProjects\\second-java\\day5-flink\\src\\main\\resources\\input\\words.txt"))).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource").print();env.execute();

说明

  • 参数可以是目录,也可以是文件;还可以从 HDFS 目录下读取,使用路径hdfs://…;
  • 路径可以是相对路径,也可以是绝对路径;
  • 相对路径是从系统属性 user.dir 获取路径:idea 下是project 的根目录,standalone模式下是集群节点根目录。

3)从 Socket 读取数据

读取 socket 文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStreamSource<String> lineStream = env.socketTextStream("localhost", 7777);

4)从 Kafka 读取数据

Flink 官方提供了连接工具 flink-connector-kafka , 直接实现了一个消费者FlinkKafkaConsumer,它就是用来读取 Kafka 数据的 SourceFunction。

引入Kafka 连接器的依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>

代码如下:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("124.222.253.33:9092") // ip:port.setTopics("topic_1") // topic.setGroupId("group1") // 消费者组// latest 将偏移初始化为最新偏移的OffsetInitializer// earliest 偏移初始化为最早可用偏移的OffsetInitializer.setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build(); // 仅Value反序列化env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source").print();env.execute();

5)从数据生成器读取数据

Flink 从 1.11 开始提供了一个内置的 DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。

需要导入依赖:

  		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency>

代码如下:

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(// 数据转换生成函数new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},// 从0开始递增至这个数Long.MAX_VALUE,// 数据生产效率,每秒10个RateLimiterStrategy.perSecond(10), Types.STRING);env.fromSource(dataGeneratorSource,WatermarkStrategy.noWatermarks(), "dataGenerator").print();env.execute();

Flink 支持的数据类型

1)Flink 的类型系统

Flink 使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink 中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

2)Flink 支持的数据类型

对于常见的 Java 和 Scala 数据类型,Flink 都是支持的。Flink 在内部,Flink 对支持不同的类型进行了划分,这些类型可以在 Types 工具类中找到:

(1)基本类型

所有 Java 基本类型及其包装类,再加上 Void、String、Date、BigDecimal 和BigInteger。

(2)数组类型

包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。

(3)复合数据类型

  • Java 元组类型(TUPLE):这是 Flink 内置的元组类型,是Java API 的一部分。最多25 个字段,也就是从 Tuple0~Tuple25,不支持空字段。
  • 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
  • POJO:Flink 自定义的类似于 Java bean 模式的类。

Flink 对 POJO 类型的要求如下:

  • 类是公有(public)的
  • 有一个无参的构造方法
  • 所有属性都是公有(public)的
  • 所有属性的类型都是可以序列化的

(4)辅助类型

Option、Either、List、Map 等。

(5)泛型类型(GENERIC)

Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的

3)类型提示(Type Hints)

有时,需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据。

.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

或者,使用TypeHint 类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。仍通过.returns()方法,明确地指定转换之后的DataStream 里元素的类型。

returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

转换算子(Transformation)

开始之前,准备 WaterSensor 作为数据模型。

字段分别代表如下含义:

字段名数据类型说明
idString水位传感器类型
tsLong传感器记录时间戳
vcInteger水位记录
public class WaterSensor {public String id;public Long ts;public Integer vc;public WaterSensor() {}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}public String getId() {return id;}public void setId(String id) {this.id = id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc = vc;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;WaterSensor that = (WaterSensor) o;return Objects.equals(id, that.id) && Objects.equals(ts, that.ts) && Objects.equals(vc, that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}
}

数据源读入数据之后,就可以使用各种转换算子,将一个或多个DataStream转换为新的 DataStream。

转换算子一般有三种写法:

  1. 匿名类
  2. lambda表达式
  3. 实现函数接口

1)基本转换算子(map/ filter/ flatMap)

(1)映射(map)

“一一映射”,消费一个元素就产出一个元素。

需求:提取 WaterSensor 中的id 字段

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1L, 1),new WaterSensor("sensor_2", 2L, 2));stream.map(WaterSensor::getId).print();env.execute();}

(2)过滤(filter)

需求:将数据流中传感器 id 为 sensor_1 的数据过滤出来

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1L, 1),new WaterSensor("sensor_2", 2L, 2));stream.filter(el -> "sensor_1".equals(el.getId())).print();env.execute();

(3)扁平映射(flatMap)

flatMap 一进多出

需求:如果输入的数据是 sensor_1,只打印 vc;如果输入的数据是sensor_2,既打印 ts 又打印 vc。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1L, 1),new WaterSensor("sensor_2", 2L, 2));stream.flatMap(new FlatMapFunction<WaterSensor, String>() {@Overridepublic void flatMap(WaterSensor value, Collector<String> out) throws Exception {if ("sensor_1".equals(value.getId())) {out.collect(value.getVc().toString());} else if ("sensor_2".equals(value.getId())) {out.collect(value.getTs().toString());out.collect(value.getVc().toString());}}}).print();env.execute();

输出结果:

image-20231105170456761

map如何控制一进一出:

​ 使用return

flatmap怎么控制一进多出

​ 通过Collector输出,调用几次就输出几条(向下游输送数据)

2) 聚合算子(Aggregation)

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce 中的reduce操作。

(1 )按键分区(keyBy)

在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的。

keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 hashCode()方法。

id 作为 key 做一个分区操作,代码实现如下:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1L, 1),new WaterSensor("sensor_1", 2L, 2),new WaterSensor("sensor_2", 2L, 2),new WaterSensor("sensor_3", 3L, 3));KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(WaterSensor::getId);keyedStream.print();env.execute();

运行结果如下:

image-20231105174334517

keyBy:

  1. 返回的是 KeyedStream ,键控流
  2. 不是转换算子,只是对数据进行重分区,不能设置并行度

keyBy分组 和 分区 的关系:

  1. keyBy对数据分组(相同key的数据被分为一组),同时保证 相同的key的数据 在同一个分区
  2. 分区):一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)

KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce)。

(2)简单聚合(sum/min/max/minBy/maxBy)

有了按键分区的数据流 KeyedStream,就可以基于它进行聚合操作了。

Flink内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

  • sum():在输入流上,对指定的字段做叠加求和的操作。

  • min():在输入流上,对指定的字段求最小值。

  • max():在输入流上,对指定的字段求最大值。

  • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。

  • maxBy():同上

聚合方法调用时,也需要传入参数,聚合指定的字段。指定字段的方式有两种:指定位置,和指定名称。【指定位置索引,适用于 Tuple类型,POJP不行】

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1L, 1),new WaterSensor("sensor_1", 2L, 2),new WaterSensor("sensor_2", 2L, 2),new WaterSensor("sensor_3", 3L, 3));KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(WaterSensor::getId);keyedStream.maxBy("vc") // 指定字段名称.print();env.execute();

keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个 DataStream。

一个聚合算子,会为每一个 key 保存一个聚合的值,在Flink 中把它叫作“状态”(state)

每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的。

使用聚合算子,应该只用在含有有限个 key 的数据流上。

(3)归约聚合(reduce)

案例:使用 reduce 实现 max 和 maxBy 的功能。

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1L, 1),new WaterSensor("sensor_1", 2L, 2),new WaterSensor("sensor_2", 2L, 2),new WaterSensor("sensor_3", 3L, 3));KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(WaterSensor::getId);keyedStream.reduce(new ReduceFunction<WaterSensor>() {// 同组元素规约处理@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("value1: " + value1);System.out.println("value2: " + value2);int maxVc = Math.max(value1.getVc(), value2.getVc());if (value1.getVc() > value2.getVc()) {value1.setVc(maxVc);return value1;} else {value2.setVc(maxVc);return value2;}}}).print();env.execute();

reduce 输入类型 = 输出类型,类型不变

每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出

reduce方法中的两个参数

​ value1:之前计算结果,有状态

value2:现在来的数据

reduce 算子也应该作用在一个有限 key 的流上。

用户自定义函数(UDF)

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。

用户自定义函数分为:函数类、匿名函数、富函数类。

1)函数类

函数类可以传参数更加灵活

public class FilterIdFunction implements FilterFunction<WaterSensor> {private final String id;public FilterIdFunction(String id) {this.id = id;}@Overridepublic boolean filter(WaterSensor value) throws Exception {return id.equals(value.getId());}
}

2)富函数类(Rich Function Classes)

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的Flink 函数类都有其 Rich版本

富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。

富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

  • open()方法,是 Rich Function 的初始化方法,每个子任务启动时,调用一次。
  • close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
    • 如果是Flink程序异常挂掉,不会调用close
    • 正常调用cancel命令,可以close

代码实现:

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(1, 2, 3, 4).map(new RichMapFunction<Integer, Integer>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println(" 索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic Integer map(Integer integer)throws Exception {return integer + 1;}@Overridepublic void close() throws Exception {super.close();System.out.println(" 索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}}).print();env.execute();

物理分区算子(Physical Partitioning)

常见的物理分区策略有:随机分配(Random)轮询分配(Round-Robin)重缩放(Rescale)广播(Broadcast)

1)随机分区(shuffle)

将数据随机地分配到下游算子的并行任务中去。可以打乱数据。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> stream = env.socketTextStream("124.222.253.33", 7777);stream.shuffle().print();env.execute();

2)轮询分区(Round-Robin)

雨露均沾,下游算子一个一个来(所有算子)

stream.rebalance()

3)重缩放分区(rescale)

重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用Round-Robin 算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。(部分算子)

stream.rescale()

4)广播(broadcast)

将输入数据复制并发送到下游算子的所有并行任务中去。

stream.broadcast()

5)全局分区(global)

一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1。

stream.global()

6)自定义分区(Custom)

Flink 内置所有分区策略都不能满足用户的需求时,可以通过使用partitionCustom()方法来自定义分区策略。

(1)自定义分区器

public class MyPartitioner implements Partitioner<String> {// numPartitions: 子任务数量(分区数量)@Overridepublic int partition(String key, int numPartitions) {return Integer.parseInt(key) % numPartitions;}
}

(2)使用自定义分区

public class PartitionCustomDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> socketDS = env.socketTextStream("124.222.253.33", 7777);DataStream<String> myDS = socketDS.partitionCustom(new MyPartitioner(),value -> value);myDS.print();env.execute();}
}

八种分区器

Flink提供了8种分区器,7种内置,1种自定义

image-20231106001644153

分流

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

1)简单实现

使用filter筛选多次,将原始数据流stream复制多份,然后对每一份分别做筛选;不够高效。

2)侧输出流(process)

process算子

调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个**“输出标签”(OutputTag)**,指定了侧输出流的 id 和类型。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("124.222.253.33", 7777).map(new WaterSensorMapFunction());// 定义输出标签OutputTag<WaterSensor> s1 = new OutputTag<WaterSensor>("s1", Types.POJO(WaterSensor.class)) {};OutputTag<WaterSensor> s2 = new OutputTag<WaterSensor>("s2", Types.POJO(WaterSensor.class)) {};// 返回的都是主流SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, ProcessFunction<WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {if ("s1".equals(value.getId())) {ctx.output(s1, value);} else if ("s2".equals(value.getId())) {ctx.output(s2, value);} else {// 主流out.collect(value);}}});// 打印主流ds1.print("主流数据");// 通过主流获取侧边流SideOutputDataStream<WaterSensor> sideOutput1 = ds1.getSideOutput(s1);SideOutputDataStream<WaterSensor> sideOutput2 = ds1.getSideOutput(s2);sideOutput1.printToErr("测流s1");sideOutput2.printToErr("测流s2");env.execute();

合流

1)联合(Union)

联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。(可以将多条流联合在一起)

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");// ds3 类型不一致,不能联合ds1.union(ds2).print();env.execute();

image-20231105224515470

2)连接(Connect)

(1)简单使用

连接一次只能连接2条流,流的数据类型可以不一样

代码实现:

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);DataStreamSource<String> source2 = env.fromElements("a", "b", "c");ConnectedStreams<Integer, String> connect = source1.connect(source2);/*connect:1、一次只能连接 2 条流2、流的数据类型可以不一样3、连接后可以调用 map、flatmap、process 来处理,但是各处理各的*/connect.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) throws Exception {return "来源于数字流:" + value.toString();}@Overridepublic String map2(String value) throws Exception {return "来源于字母流:" + value;}}).print();env.execute();

(2)CoProcessFunction

需求:连接两条流,输出能根据 id 匹配上的数据(类似inner join 效果)

注意:connectedStreams.keyBy(keySelector1, keySelector2);

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(2);DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(Tuple2.of(1, "a1"),Tuple2.of(1, "a2"),Tuple2.of(2, "b"),Tuple2.of(3, "c"));DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(Tuple3.of(1, "aa1", 1),Tuple3.of(1, "aa2", 2),Tuple3.of(2, "bb", 1),Tuple3.of(3, "cc", 1));// 定义HashMap , 缓存来过的数据,key=id,value=list<数据>Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);// 将合流元素,按key分到同一分区才能得到如下结果 ***ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(v -> v.f0, v1 -> v1.f0);connectKey.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {@Overridepublic void processElement1(Tuple2<Integer, String> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;if (!s1Cache.containsKey(id)) {List<Tuple2<Integer, String>> s1Values = new ArrayList<>();s1Values.add(value);s1Cache.put(id, s1Values);} else {s1Cache.get(id).add(value);}if (s2Cache.containsKey(id)) {for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {out.collect("s1:" + value + "<--------->s2:" + s2Element);}}}@Overridepublic void processElement2(Tuple3<Integer, String, Integer> value, CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>.Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;if (!s2Cache.containsKey(id)) {List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();s2Values.add(value);s2Cache.put(id, s2Values);} else {s2Cache.get(id).add(value);}if (s1Cache.containsKey(id)) {for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {out.collect("s1:" + s1Element + "<--------->s2:" + value);}}}}).print();env.execute();

输出算子(Sink)

1)连接到外部系统

Flink1.12 以前,Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

Flink1.12 开始,同样重构了 Sink 架构,使用 .sinkTo()方法实现。

2)输出到文件

FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用 FileSink 的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

代码实现:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有并行度个数的文件在写入env.setParallelism(2);// 【必须开启】 checkpoint,否则一直都是 .inprogressenv.enableCheckpointing(2000,CheckpointingMode.EXACTLY_ONCE);// 数据生成器DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource,WatermarkStrategy.noWatermarks(), "data-generator");// 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("d:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("li-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(newDateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略: 1 分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(newMemorySize(1024 * 1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();

3)输出到 Kafka

添加 Kafka 连接器依赖。

代码实现:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是【精准一次,必须开启】 checkpointenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("124.222.253.33", 7777);/*Kafka Sink:注意:如果要使用 【精准一次】 写入 Kafka,需要满足以下条件,缺一不可1、开启 checkpoint2、设置事务前缀3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的 15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("124.222.253.33:9092")// 指定序列化器:指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到 kafka 的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("li-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15 分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();

自定义序列化器,实现带 key 的 record:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("124.222.253.33", 7777);/*如果要指定写入 kafka 的 key,可以自定义序列化器:1、实现 一个接口,重写 序列化 方法2、指定 key,转成 字节数组3、指定 value,转成 字节数组4、返回一个 ProducerRecord 对象,把 key、value 放进去*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("124.222.253.33:9092").setRecordSerializer(new KafkaRecordSerializationSchema<String>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("li-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();

4)输出到 MySQL(JDBC)

(1)添加 MySQL 驱动

        <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency>

(2)jdbc连接器

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency>

官网案例:

public class JdbcSinkExample {static class Book {public Book(Long id, String title, String authors, Integer year) {this.id = id;this.title = title;this.authors = authors;this.year = year;}final Long id;final String title;final String authors;final Integer year;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*写入 mysql1、只能用老的 sink 写法: addSink2、JDBCSink 的 4 个参数:第一个参数: 执行的 sql,一般就是 insert into第二个参数: 预编译 sql, 对占位符填充值第三个参数: 执行选项 ---》 攒批、重试第四个参数: 连接选项 ---》 url、用户名、密码*/env.fromElements(new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)).addSink(JdbcSink.sink("insert into books (id, title, authors, year) values (?, ?, ?, ?)",(statement, book) -> {statement.setLong(1, book.id);statement.setString(2, book.title);statement.setString(3, book.authors);statement.setInt(4, book.year);},JdbcExecutionOptions.builder().withBatchSize(1000) // 批次的大小:条数.withBatchIntervalMs(200) // 批次的时间.withMaxRetries(5) // 重试次数.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF8").withUsername("root").withPassword("root").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build()));env.execute();}
}

5)自定义Sink输出

实现RichSinkDunction 抽象类,自定义逻辑比较麻烦,不建议。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/166852.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【蓝桥杯嵌入式】蓝桥杯嵌入式第十四届省赛程序真题,真题分析与代码讲解

&#x1f38a;【蓝桥杯嵌入式】专题正在持续更新中&#xff0c;原理图解析✨&#xff0c;各模块分析✨以及历年真题讲解✨都已更新完毕&#xff0c;欢迎大家前往订阅本专题&#x1f38f; &#x1f38f;【蓝桥杯嵌入式】蓝桥杯第十届省赛真题 &#x1f38f;【蓝桥杯嵌入式】蓝桥…

ElasticSearch的集群、节点、索引、分片和副本

Elasticsearch是面向文档型数据库&#xff0c;一条数据在这里就是一个文档。为了方便大家理解&#xff0c;我们将Elasticsearch里存储文档数据和关系型数据库MySQL存储数据的概念进行一个类比 ES里的Index可以看做一个库&#xff0c;而Types相当于表&#xff0c;Documents则相当…

[工业自动化-6]:西门子S7-15xxx编程 - PLC系统硬件组成与架构

目录 一、PLC系统组成 1.1 PLC 单机系统组成 1.2 PLC 分布式系统 二、PLC各个组件 2.1 PLC上位机 2.2 PLC主站&#xff1a;PLC CPU控制中心 &#xff08;1&#xff09;主要功能 &#xff08;2&#xff09;主站组成 2.3 PLC分布式从站: IO模块的拉远 &#xff08;1&am…

基于公共业务提取的架构演进——外部依赖防腐篇

1.背景 有了前两篇的帐号权限提取和功能设置提取的架构演进后&#xff0c;有一个问题就紧接着诞生了&#xff0c;对于诸多业务方来说&#xff0c;关键数据源的迁移如何在各个产品落地&#xff1f; 要知道这些数据都很关键&#xff1a; 对于帐号&#xff0c;获取不到帐号信息是…

山西电力市场日前价格预测【2023-11-11】

日前价格预测 ​ 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2023-11-11&#xff09;山西电力市场全天平均日前电价为311.30元/MWh。其中&#xff0c;最高日前电价为417.73元/MWh&#xff0c;预计出现在08: 00。最低日前电价为151.48元/MWh&#xff0c…

Cesium 相机设置

1.setView 直接跳转到目的地 // 设置相机位置 const position Cesium.Cartesian3.fromDegrees(113, 31, 20000); // setView通过定义相机目的地&#xff08;方向&#xff09;,直接跳转到目的地 viewer.camera.setView({ destination: position, // 位置设置 orientation: { //…

案例 | 3D可视化工具HOOPS助力SolidWorks edrawings成功引入AR/VR技术

HOOPS中文网慧都科技是HOOPS全套产品中国地区指定授权经销商&#xff0c;提供3D软件开发工具HOOPS售卖、试用、中文试用指导服务、中文技术支持。http://techsoft3d.evget.com/达索系统SolidWorks面临的挑战 达索系统SolidWorks公司开发和销售三维CAD设计软件、分析软件和产品…

day3 ARM

【昨日作业】 .text .global start _start: mov r0,#0 存放sum mov r1,#1 存放相加的数值 loop: cmp r1,#100 bhi wh add r0,r0,r1 add r1,r1,#1 b loop wh: b wh .end 【内存读写指令】 通过内存读写指令可以实现向内存中写入指定数据或者读取指定内存地址的数据 c语言内存…

基于自然语言处理的结构化数据库问答机器人系统

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长 Wechat / QQ 名片 :) 1. 项目简介 知识库&#xff0c;就是人们总结出的一些历史知识的集合&#xff0c;存储、索引以后&#xff0c;可以被方便的检索出来供后人查询/学习。QnA Maker是用于建立知识库的工具&#xff0c;使用…

3D RPG Course | Core 学习日记四:鼠标控制人物移动

前言 前边我们做好了Navgation智能导航地图烘焙&#xff0c;并且设置好了Player的NavMeshAgent&#xff0c;现在我们可以开始实现鼠标控制人物的移动了。除了控制人物移动以外&#xff0c;我们还需要实现鼠标指针的变换。 实现要点 要实现鼠标控制人物移动&#xff0c;点击…

Spring Cloud 微服务入门篇

文章目录 什么是微服务架构 Microservice微服务的发展历史微服务的定义微小的服务微服务 微服务的发展历史1. 微服务架构的发展历史2. 微服务架构的先驱 微服务架构 Microservice 的优缺点1. 微服务 e Microservice 优点2. 微服务 Microservice 缺点微服务不是银弹&#xff1a;…

CSS 下拉菜单、提示工具、图片廊、计数器

一、CSS 下拉菜单&#xff1a; CSS下拉菜单用于创建一个鼠标移动上去后显示下拉菜单的效果。示例&#xff1a; <style> .dropdown { position: relative; display: inline-block; } .dropdown-content { display: none; position: absolute; background-color: #f9f…