Flink Flink中的分流

一、什么是分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
在这里插入图片描述

二、基于filter算子的简单实现分流

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

package com.flink.DataStream.SplitStream;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkSplitStreamByFilter {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));//.getExecutionEnvironment();//TODO 设置全局并行度为2streamExecutionEnvironment.setParallelism(2);DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO 先将输入流转为Integer类型SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {int i = Integer.parseInt(input);return i;});//TODO 使用匿名函数分流偶数流SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a % 2 == 0;}});//TODO 使用lamda表达式分流奇数流SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);ds1.print("偶数流");ds2.print("奇数流");streamExecutionEnvironment.execute();}
}

执行结果

奇数流:1> 1
偶数流:2> 2
偶数流:1> 2
偶数流:2> 4
奇数流:1> 3
奇数流:2> 1Process finished with exit code 130 (interrupted by signal 2: SIGINT)

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

三、使用测输出流

关于处理函数中侧输出流的用法,我们已经在 7.5 节做了详细介绍。简单来说,只需要调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/215411.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux 家目录和根目录

摘要&#xff1a; 在 Linux 操作系统中&#xff0c;家目录和根目录是两个非常重要的概念。它们是 Linux 文件系统中的两个关键节点&#xff0c;为用户和系统进程提供存储、管理和访问文件和目录的接口。本文旨在深入探讨和理解这两个目录的结构、功能和使用方式&#xff0c;同时…

【MATLAB源码-第89期】基于matlab的灰狼优化算法(GWO)无人机三维路径规划,输出做短路径图和适应度曲线

操作环境&#xff1a; MATLAB 2022a 1、算法描述 灰狼优化算法&#xff08;Grey Wolf Optimizer, GWO&#xff09;是一种模仿灰狼捕食行为的优化算法。灰狼是群居动物&#xff0c;有着严格的社会等级结构。在灰狼群体中&#xff0c;通常有三个等级&#xff1a;首领&#xff…

字节序

计算机硬件有两种储存数据的方式&#xff1a;大端字节序big endian 和 小端字节序 little endian。 数值0x2211使用两个字节储存&#xff1a;高位字节是0x22&#xff0c;低位字节是0x11。 大端字节序&#xff1a;低位放高地址&#xff0c;高位字节在低地址&#xff0c;地址空间…

芯片的测试方法

半导体的生产流程包括晶圆制造和封装测试&#xff0c;在这两个环节中分别需要完成晶圆检测(CP, Circuit Probing)和成品测试(FT, Final Test)。无论哪个环节&#xff0c;要测试芯片的各项功能指标均须完成两个步骤&#xff1a;一是将芯片的引脚与测试机的功能模块连接起来&…

了解5G安全标准,看这一篇就够了

随着移动通信系统在社会生活中的使用越来越广泛&#xff0c;特别是5G进一步以企业级应用作为核心应用场景&#xff0c;安全成为了包括5G在内的移动通信系统不可忽视的因素。本文梳理了全球主流移动通信标准化组织在安全方面的标准制定&#xff0c;从而可以快速了解5G协议层面对…

用SOLIDWORKS画个高尔夫球,看似简单的建模却大有学问

SOLIDWORKS软件提供了大量的建模功能&#xff0c;如果工程师能灵活使用这些功能&#xff0c;就可以绘制得到各式各样的模型&#xff0c;我们尝试使用SOLIDWORKS绘制高尔夫球模型&#xff0c;如下图所示。 为什么选用solid works进行建模&#xff1f; solid works是一款功能强大…

在字节4年,跳槽了...

前言 某211本科&#xff0c;计算机专业的&#xff0c;先简单说下在字节的工作时间吧&#xff01;基本早上8点起床&#xff0c;中午一般是11点50左右和同组的同事去吃饭&#xff0c;下午是2点上班&#xff0c;下午是最难熬滴。中间会有下午茶&#xff0c;偶尔会发麦当劳的小吃&…

HarmonyOS开发:ArkTs常见数据类型

前言 无论是Android还是iOS开发&#xff0c;都提供了多种数据类型用于常见的业务开发&#xff0c;但在ArkTs中&#xff0c;数据类型就大有不同&#xff0c;比如int&#xff0c;float&#xff0c;double&#xff0c;long统一就是number类型&#xff0c;当然了也不存在char类型&…

面试:ShardingSphere问题

文章目录 什么是ShardingSphere&#xff0c;它的主要功能是什么&#xff1f;ShardingSphere的核心模块有哪些&#xff1f;他们是如何工作的&#xff1f;ShardingSphere 的读写分离是如何实现的&#xff1f;如何配置ShardingSphere的数据分片策略&#xff1f;ShardingSphere支持…

【Django笔记】10大模块md文档第6篇:Django视图、Cookie和session状态、模板和过滤器

Django的主要目的是简便、快速的开发数据库驱动的网站。它强调代码复用&#xff0c;多个组件可以很方便的以"插件"形式服务于整个框架&#xff0c;Django有许多功能强大的第三方插件&#xff0c;你甚至可以很方便的开发出自己的工具包。这使得Django具有很强的可扩展…

Unity中Shader的Standard材质解析(二)

文章目录 前言一、我们对 Standard 的 PBR 的 GI 进行解析1、我们先创建一个PBR的.cginc文件&#xff0c;用于整理用到的函数2、然后在Standard的Shader中引用该cginc文件 二、依次整理函数到该cginc文件中我们来看一下PBR中GI的镜面反射做了些什么 二、最终代码.cginc代码&…

基于食肉植物算法优化概率神经网络PNN的分类预测 - 附代码

基于食肉植物算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于食肉植物算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于食肉植物优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神…