Flink DataStream之Connect合并流

  • 新建类
package test01;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;public class TestConnection {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());executionEnvironment.setParallelism(1);//Connect可以将不同数据类型的流进行合并,但形成的是ConnectedStream,并不是DataStream,也就是说对外是一个整体的合并后的流,但其实内部是各自处理各自的数据。//创建流1:数字流,但是由于我们输入时是字符串,所以这里需要将字符串进行类型转换,转换为数值类型的.SingleOutputStreamOperator<Integer> dataSource = executionEnvironment.socketTextStream("localhost", 7777).map(value -> Integer.parseInt(value));//创建流2:字符串流DataStreamSource<String> stringSource = executionEnvironment.socketTextStream("localhost", 8888);//合并流,与union不同的是,union可以在一个source的后面多次调用union()合并多个stream,但是在connect中只能单次调用connect()进行合并ConnectedStreams<Integer, String> connect = dataSource.connect(stringSource);/*** 注意ConnectedStreams中没有print(),有map()、process()等方法用来对合并后的流中得到不同类型流进行分别处理.* 这里使用map(),CoMapFunction的参数一指的是调用connect()方法的数据流类型,参数二指的是被调用的数据流类型,* 也就是connect()括号中的数据流类型,参数三是最终合并后的数据流类型,可以看到参数一和参数二已经根据前面我们调用connect时的两个数据流类型* 自动帮我们获取到了数据类型,参数三初始是Object类型,这里我们想要使合并后的数据流类型变成String类型,所以参数三设置为String。*/SingleOutputStreamOperator<String> outputStream = connect.map(new CoMapFunction<Integer, String, String>() {//重写map1()和map2(),map1()指的就是参数一对应的数据流,map2()指的是参数二对应的数据流@Overridepublic String map1(Integer integer) throws Exception {//在map1()方法中对数据进行处理,使之返回值为Stringreturn "原始的数值流:" + integer.toString();}@Overridepublic String map2(String s) throws Exception {return "原始的字符串流:" + s;}});outputStream.print();executionEnvironment.execute();}
}
  • 启动两个窗口

  •  启动程序

此时在窗口中输入数据,注意在7777端要输入数字,8888端输入字符串,然后观察控制台输出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/14993.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UE4 如何设置玩家Character的两个位置和角度之间的切换

问题&#xff1a;玩家Character的角度不能直接去设置其中的Camera角度&#xff0c;因为Camera的角度是由鼠标X/Y移动增量决定的&#xff0c;同时把Camera的角度传给PlayController中的PlayCameraManneger&#xff0c;PlayCameraManneger是所有Pawn类型的Camera视口总管&#xf…

数据库练习

数据库练习 建立三张表&#xff0c;以及表中的联系 由于学生表中存在外键&#xff0c;所以我们需要先创建课程表和班级表 课程表 mysql> create table course(-> course_id int primary key auto_increment comment 课程编号,-> course_name varchar(10) not null…

SpringCloud Alibaba 面试题 微服务相关

Spring Cloud Alibaba 介绍 Spring Cloud Alibaba 与微服务架构是一种分布式架构&#xff0c;它将复杂的应用系统拆分成若干可独立部署、可重复使用的微服务&#xff0c;以实现模块化&#xff0c;可靠性&#xff0c;可部署性的服务架构。 Spring Cloud Alibaba 集成了阿里巴巴…

MySQL数据库对象与数据备份和还原详解

目录 一、视图 1. 什么是视图 2. 视图与数据表的区别 3. 视图的优点 4. 创建视图 二、索引 1. 什么是索引 2. 为什么要使用索引 3. 索引优缺点 4. 何时不使用索引 5. 索引何时失效 6. 索引分类 6.1 普通索引 6.2 唯一索引 6.3 主键索引 6.4 组合索引 三、数据的…

【深度学习】AIGC ,ControlNet 论文,原理,训练,部署,实战,教程(三)

文章目录 源码资源下载Python环境试玩controlnet训练数据准备选一个Stable diffusion模型开始训练 第一篇&#xff1a;https://qq742971636.blog.csdn.net/article/details/131531168 源码资源下载 目前 ControlNet 1.1 还在建设&#xff0c;本文这里使用源码 https://github…

MyBatis 与 Hibernate 有哪些不同?

ORM框架的选择与适用场景 MyBatis和Hibernate都是Java领域中流行的面向关系型数据库的ORM&#xff08;对象关系映射&#xff09;框架。它们的共同目标是简化开发人员操作数据库的工作&#xff0c;提供便捷的持久化操作。然而&#xff0c;两者在设计理念和适用场景上有所不同。…

Centos7安装wordpress图文教程

宝塔面板安装WordPress有两种方法&#xff1a; 自己手动安装&#xff08;推荐&#xff09;宝塔后台一键部署跳转提示 推荐使用手动安装&#xff0c;因为一键部署的WordPress版本不是最新的&#xff0c;而且自己上传的文件比较放心。 第一步&#xff0c;上传WordPress安装包 …

Redis消息队列

消息队列&#xff1a;字面意思就是存放消息的队列。使用队列的好处在于解耦 。最简单的消息队列模型包括3个角色&#xff1a; 消息队列&#xff1a;存储和管理消息&#xff0c;也被称为消息代理&#xff08;Message Broker&#xff09; 生产者&#xff1a;发送消息到消息队列 …

蚂蚁内容安全平台天鉴入选“北京市人工智能行业赋能典型案例”

近日&#xff0c;“2023全球数字经济大会”人工智能高峰论坛在京召开。会议发布了一批人工智能行业赋能典型案例&#xff0c;为行业提供重要的示范效应&#xff0c;以推动大模型应用加速赋能千行百业。其中&#xff0c;蚂蚁集团旗下数字藏品平台“鲸探”及内容安全平台“天鉴”…

el-breadcrumb面包屑详解

el-breadcrumb面包屑详解 封装面包屑组件 <template><div class"crumb"><el-breadcrumb separator"/"><template v-for"(item,index) in levelList"><el-breadcrumb-item :key"item_ index">{{item.na…

实现分类标签展示的魔力——gradio库中的Label模块

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

前端面试刷题整理

第一题&#xff1a;es6 class语法 题目&#xff1a;现有三种菜单&#xff0c;button属性&#xff0c;select属性&#xff0c;model属性 class Mune{constructor(title,icon){this.title titlethis.icon icon}isDisabled(){return false}exec(){} } class Button extends Mun…