大数据-玩转数据-Flink-Transform(上)

一、Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
在这里插入图片描述

二、基本转换算子

2.1、map(映射)

将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_map {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);s_num.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer values) throws Exception {return values*values;}}).print();}
}

2.2、filter(过滤)

根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

package com.lyh.flink05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_filter {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        elements.filter(new FilterFunction<Integer>() {
//            @Override
//            public boolean filter(Integer integer) throws Exception {
//                if (integer % 2 == 0 )
//                        return false;
//                        else {
//                            return true;
//                }
//            }
//        }).print();elements.filter(value -> value % 2 != 0).print();}
}

2.3、flatMap(扁平映射)

消费一个元素并产生零个或多个元素

package com.lyh.flink05;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;import java.util.concurrent.ExecutionException;public class flatMap {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {@Overridepublic void flatMap(Integer integer, Collector<Integer> collector) throws Exception {collector.collect(integer*integer);collector.collect(integer*integer*integer);}}).print();}
}

三、聚合算子

3.1、keyBy(按键分区)

把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1).print();env.execute("execute");}
}

3.2、sum,min,max,minBy,maxBy(简单聚合)

KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.sum(1).print();env.execute("execute");}
}

3.3、reduce(归约聚合)

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
在这里插入图片描述

package com.lyh.flink05;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyByReduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L)).keyBy(0).reduce(new ReduceFunction<Tuple2<Long, Long>>() {@Overridepublic Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {return new Tuple2<>(values1.f0,values1.f1+values2.f1);}}).print();env.execute();}
}

3.4、process(底层处理)

process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。

package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Process_s {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 3 == 0) {//测流数据ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);}if (value % 3 == 1) {//测流数据ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);}//主流 ,数据out.collect(value);}});DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));output1.print();env.execute();}
}

四、下节合流算子、用户自定义函数、分区算子

待续。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/56555.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spring】(三)Spring 使用注解存储和读取 Bean对象

文章目录 前言一、使用注解储存 Bean 对象1.1 配置扫描路径1.2 类注解储存 Bean 对象1.2.1 Controller&#xff08;控制器存储&#xff09;1.2.2 Service&#xff08;服务储存&#xff09;1.2.3 Repository&#xff08;仓库存储&#xff09;1.2.4 Component&#xff08;组件储存…

如何用DHTMLX组件为Web应用创建甘特图?(二)

dhtmlxGantt是用于跨浏览器和跨平台应用程序的功能齐全的Gantt图表。可满足项目管理应用程序的所有需求&#xff0c;是最完善的甘特图图表库。甘特图仍然是项目管理应用程序中最需要的工具之一&#xff0c;DHTMLX Gantt组件提供了能提升研发甘特图功能所需的重要工具。 在这篇…

拆分PDBQT文件并将其转换为PDB格式

拆分PDBQT文件转为PDB格式 1. vina_split拆分PDBQT文件 假设你用AutoDock Vina做了对接&#xff0c;那么所有预测的结合构象都被放入一个多构象 PDBQT 文件中&#xff0c;如果需要拆分后进行可视化分析&#xff0c;那么Vina官方自带了vina_split来进行拆分。下面是vina_split…

恒盛策略:15亿成立地产公司?华为紧急回应!10倍大牛股闪崩

今天上午A股商场和港股整体低位震动&#xff0c;但走势整体平稳&#xff0c;动摇不大。 A股商场方面&#xff0c;TMT赛道股走强&#xff0c;近期火热的券商板块则现分解&#xff0c;昨夜发布半年报的公司今天上午股价也分解明显。 港股商场&#xff0c;大都股票动摇不大&#…

python中数据可视化

1.掷一个D6和一个D10 50000次的结果 die.py from random import randintclass Die:def __init__(self, num_sides6):self.num_sides num_sidesdef roll(self):return randint(1, self.num_sides) die_visual.py from die import Die from plotly.graph_objs import Bar, L…

Java课题笔记~6个重要注解参数含义

1、[掌握]Before 前置通知-方法有 JoinPoint 参数 在目标方法执行之前执行。被注解为前置通知的方法&#xff0c;可以包含一个 JoinPoint 类型参数。 该类型的对象本身就是切入点表达式。通过该参数&#xff0c;可获取切入点表达式、方法签名、目标对象等。 不光前置通知的方…

C++ STL vector

目录 一.认识vector 二.vector的使用 1.vector的构造函数 2.vector的迭代器 2.1 begin&#xff08;&#xff09;&#xff0c;end&#xff08;&#xff09; 2.2 rbegin&#xff08;&#xff09;&#xff0c;rend&#xff08;&#xff09; 2.3 迭代器初始化对象 3. vector…

小白到运维工程师自学之路 第六十五集 (docker-compose)

一、概述 Docker Compose 的前身是 Fig&#xff0c;它是一个定义及运行多个 Docker 容器的工具。可以使用YAML文件来配置应用程序的服务。然后&#xff0c;使用单个命令&#xff0c;您可以创建并启动配置中的所有服务。Docker Compose 会通过解析容器间的依赖关系&#xff08;…

分布式应用:Zookeeper 集群与kafka 集群部署

目录 一、理论 1.Zookeeper 2.部署 Zookeeper 集群 3.消息队列 4.Kafka 5.部署 kafka 集群 6.FilebeatKafkaELK 二、实验 1.Zookeeper 集群部署 2.kafka集群部署 3.FilebeatKafkaELK 三、问题 1.解压文件异常 2.kafka集群建立失败 3.启动 filebeat报错 4.VIM报错…

使用 React Native CLI 创建项目

React Native 安装的先决条件和设置 需要掌握的知识点 掌握 JavaScript 基础知识掌握 React 相关基础知识掌握 TypeScript 相关基础知识 安装软件前需要首先安装Chocolatey。Chocolatey 是一种流行的 Windows 包管理器。 安装 nodejs 和 JDK choco install -y nodejs-lts …

数据互通,版本管理优化图文档与BOM数据

在现代企业的产品开发过程中&#xff0c;图文档和BOM数据是不可或缺的关键要素。图文档记录了产品的设计和工程信息&#xff0c;而BOM数据则明确了产品所需物料的清单和规格。然而&#xff0c;由于数据的复杂性和版本变更的频繁性&#xff0c;图文档与BOM数据之间的协作和管理常…

JDK下载与安装

一、下载JDK安装包 在JDK官网&#xff08;https://www.oracle.com/java/technologies/downloads/&#xff09;或其他渠道提前下载好JDK安装包。 二、开始安装JDK 1、进入/opt/software目录&#xff08;可自定义&#xff09;&#xff0c;将JDK压缩包上传到该目录下&#xff1a; …