flink处理函数--副输出功能

背景

在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出

副输出

本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:

package wikiedits.processfunc.job;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;public class SideOutPutJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> readings = see.addSource(new SensorSource());SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());// 打印附输出monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();// 打印主输出monitoredReadings.print();see.execute();}
}package wikiedits.processfunc.process;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if (value.temperature < 32.0) {ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);}out.collect(value);}}
package wikiedits.processfunc.source;/** Copyright 2015 Fabian Hueske / Vasia Kalavri** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**  http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;import java.util.Calendar;
import java.util.Random;/*** Flink SourceFunction to generate SensorReadings with random temperature values.** Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.** Note: This is a simple data-generating source function that does not checkpoint its state.* In case of a failure, the source does not replay any data.*/
public class SensorSource extends RichParallelSourceFunction<SensorReading> {// flag indicating whether source is still runningprivate boolean running = true;/** run() continuously emits SensorReadings by emitting them through the SourceContext. */@Overridepublic void run(SourceContext<SensorReading> srcCtx) throws Exception {// initialize random number generatorRandom rand = new Random();// look up index of this parallel taskint taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();// initialize sensor ids and temperaturesString[] sensorIds = new String[10];double[] curFTemp = new double[10];for (int i = 0; i < 10; i++) {sensorIds[i] = "sensor_" + (taskIdx * 10 + i);curFTemp[i] = 65 + (rand.nextGaussian() * 20);}while (running) {// get current timelong curTime = Calendar.getInstance().getTimeInMillis();// emit SensorReadingsfor (int i = 0; i < 10; i++) {// update current temperaturecurFTemp[i] += rand.nextGaussian() * 0.5;// emit readingsrcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));}// wait for 100 msThread.sleep(3000);}}/** Cancels this SourceFunction. */@Overridepublic void cancel() {this.running = false;}
}

程序运行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/124262.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git使用【下】

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析&#xff08;3&#xff09; 目录 &#x1f449;&#x1f3fb;标签管理理解标签标签运用 …

苹果ios系统ipa文件企业签名是什么?优势是什么?什么场合需要应用到?

企业签名是苹果开发者计划中的一种签名类型&#xff0c;允许企业开发者签署和分发企业内部使用的应用程序&#xff0c;而无需通过App Store进行公开发布。通过企业签名&#xff0c;企业可以在内部部署自己的应用程序&#xff0c;以满足特定的业务需求。 企业签名能够做到以下…

【C语言】汉诺塔 —— 详解

一、介绍 汉诺塔&#xff08;Tower of Hanoi&#xff09;&#xff0c;又称河内塔&#xff0c;是一个源于印度古老传说的益智玩具。大焚天创造世界的时候做了三根金刚石柱子&#xff0c;在一根柱子上从下往上按照大小顺序摞着64片黄金圆盘。 大焚天命令婆罗门把圆盘从下面开始按…

认知智能最新研究成果

声明&#xff1a;以下内容仅代表个人对现象和本质探索&#xff0c;不代表对学术成果评价。曾有幸和马文明斯基的学生段老师和方老师一起讨论过人工智能问题。随着自己对问题进一步理解&#xff0c;刚好18年左右开始接触认知智能理论核心认知计算部分。 第一&#xff1a;算法是一…

互联网Java工程师面试题·MyBatis 篇·第二弹

目录 16、Xml 映射文件中&#xff0c;除了常见的 select|insert|updae|delete标签之外&#xff0c;还有哪些标签&#xff1f; 17、Mybatis 的 Xml 映射文件中&#xff0c;不同的 Xml 映射文件&#xff0c;id 是否可以重复&#xff1f; 18、为什么说 Mybatis 是半自动 ORM 映射…

Java泛型理解

什么是泛型&#xff1f; 我们都知道 Java 中有形参和实参之分&#xff0c;是在定义函数名和函数体的时候使用的参数,目的是用来接收调用该函数时传入的参数&#xff0c;其本身没有确定的值。在调用函数时&#xff0c;实参将赋值给形参。 而泛型是一种参数化的类型&#xff08…

Linux进程概念(上)

冯诺依曼体系结构 这里谈论的体系结构指的是计算机组成 常见的计算机&#xff0c;如笔记本&#xff0c;不常见的计算机&#xff0c;如服务器&#xff0c;大部分都遵守冯诺依曼体系 计算机&#xff0c;都由一个个的硬件组件组成 输入单元&#xff1a;如键盘&#xff0c;…

多源最短路径的原理及C++实现

时间复杂度 O(n3),n是端点数。 核心代码 template<class T, T INF 1000 * 1000 * 1000> class CNeiBoMat { public: CNeiBoMat(int n, const vector<vector<int>>& edges,bool bDirectfalse,bool b1Base false) { m_vMat.assign(n, vector<…

MySQL5.7版本与8.0版本在Ubuntu(WSL环境)系统安装

目录 前提条件 1. MySQL5.7版本在Ubuntu&#xff08;WSL环境&#xff09;系统安装 1. 1 下载apt仓库文件 1.2 配置apt仓库 1.3 更新apt仓库的信息 1.4 检查是否成功配置MySQL5.7的仓库 5. 安装MySQL5.7 1.6 启动MySQL 1.7 对MySQL进行初始化 1.7.1 输入密码 …

Android 面试经历复盘整理~

此次面试一共4面4小时&#xff0c;中间只有几分钟间隔。对持续的面试状态考验还是蛮大的。 关于面试的心态&#xff0c;保持悲观的乐观主义心态比较好。面前做面试准备时保持悲观&#xff0c;尽可能的做足准备。面后积极做复盘&#xff0c;乐观的接受最终结果。 切忌急于下结论…

IDM(Internet Download Manager)2024中文版下载工具软件

IDM&#xff08;Internet Download Manager&#xff09;&#xff1a;功能强大&#xff0c;下载速度快&#xff0c;支持多线程下载&#xff0c;下载过程中遇到突然断电等情况&#xff0c;可以进行断点续传&#xff0c;很多人因此而首选IDM。 优点&#xff1a; &#xff08;1&a…

嵌入式Linux应用开发-驱动大全-同步与互斥③

嵌入式Linux应用开发-驱动大全-同步与互斥③ 第一章 同步与互斥③1.4 Linux锁的介绍与使用1.4.1 锁的类型1.4.1.1 自旋锁1.4.1.2 睡眠锁 1.4.2 锁的内核函数1.4.2.1 自旋锁1.4.2.2 信号量1.4.2.3 互斥量1.4.2.4 semaphore和 mutex的区别 1.4.3 何时用何种锁1.4.4 内核抢占(pree…