flink学习之广播流与合流操作demo

广播流是什么?

将一条数据广播到所有的节点。使用 dataStream.broadCast()

广播流使用场景?

一般用于动态加载配置项。比如lol,每天不断有人再投诉举报,客服根本忙不过来,腾讯内部做了一个判断,只有vip3以上的客户的投诉才会有人工一对一回复,过了一段时间大家都发现vip3才有人工,都开始充钱到vip3,此时人还是很多,于是只有vip4上的客户才能人工回复

vip3->vip4 这种判断标准在不断的变化。此时就需要广播流。因为此时数据只有1条,需要多个节点都收到这个变化的数据。

广播流怎么用?

一般通过connect合流去操作 a connect b.broadcast 。a是主流也就是数据流,b是配置变化流

不多说直接上demo,开箱即用

package com.chenchi.broadcast;import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.Random;public class BroadCastStreamDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Pattern> patternDataStream = env.addSource(new ChangeSource());DataStream<User> userDataStream = env.addSource(new CustomerSource());userDataStream.print("user");patternDataStream.print("pattern");//test1  直接合流 不广播。只会在一个节点更新。 用于特殊需求?
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream)
//                .process(new CustomerSimpleProcess())
//                .print();//test2// 定义广播状态的描述器,创建广播流 如何保存需要的广播数据呢 这个案例是通过map保留变化数据
//        userDataStream
//                .keyBy(user -> user.userId)
//                .connect(patternDataStream.broadcast())
//                .process(new CustomerSimpleProcess())
//                        .print();//test3MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));//通过描述器 更新BroadcastStream<Pattern> broadcast = patternDataStream.broadcast(bcStateDescriptor);userDataStream.keyBy(user -> user.userId).connect(broadcast).process(new CustomerBroadCastProcess()).print();env.execute();}private static class CustomerBroadCastProcess extends KeyedBroadcastProcessFunction<Integer, User, Pattern, String> {@Overridepublic void processElement(User user, KeyedBroadcastProcessFunction<Integer, User, Pattern, String>.ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {Integer userVip = user.getVip();//获取广播流的数据 不是通过map保存Pattern pattern = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);if (pattern!=null){Integer patternVip = pattern.vip;String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>= patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}else {System.out.println("pattern is null ");}}@Overridepublic void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction<Integer,User, Pattern, String>.Context context, Collector<String> collector) throws Exception {BroadcastState<Void, Pattern> bcState = context.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));// 将广播状态更新为当前的patternbcState.put(null, pattern);}}public static class CustomerSimpleProcess extends CoProcessFunction<User, Pattern, String> {ValueState<Integer> vip; //这个是保留主流的state的。 不是保留广播流的stateHashMap<String,Integer> vipMap;@Overridepublic void open(Configuration parameters) throws Exception {vip = getRuntimeContext().getState(new ValueStateDescriptor<>("vip", Integer.class));vipMap=new HashMap<String,Integer>();super.open(parameters);}@Overridepublic void processElement1(User user, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {Integer userVip = user.getVip();Integer patternVip = vipMap.getOrDefault("vip", 0);String result = "当前系统需要的vip等级=" + patternVip + ",用户id=" + user.userId + ",vip=" + userVip;if (userVip>=patternVip){result=result+"符合要求";}else {result=result+"不符合要求";}collector.collect(result);}@Overridepublic void processElement2(Pattern pattern, CoProcessFunction<User, Pattern, String>.Context context, Collector<String> collector) throws Exception {vipMap.put("vip",pattern.vip);}}public static class User {public Integer userId;public Integer vip;public User() {}public User(Integer userId, Integer vip) {this.userId = userId;this.vip = vip;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId = userId;}public Integer getVip() {return vip;}public void setVip(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Action{" +"userId=" + userId +", vip='" + vip + '\'' +'}';}}// 定义行为模式POJO类,包含先后发生的两个行为public static class Pattern {public Integer vip;public Pattern() {}public Pattern(Integer vip) {this.vip = vip;}@Overridepublic String toString() {return "Pattern{" +"vip='" + vip + '\'' +'}';}}private static class CustomerSource implements SourceFunction<User> {boolean run = true;@Overridepublic void run(SourceContext<User> sourceContext) throws Exception {while (true) {Integer userId = new Random().nextInt(1000);Integer vip = new Random().nextInt(10);sourceContext.collect(new User(userId, vip));Thread.sleep(1000);}}@Overridepublic void cancel() {run = false;}}private static class ChangeSource implements SourceFunction<Pattern> {boolean run = true;@Overridepublic void run(SourceContext<Pattern> sourceContext) throws Exception {int i = 1;while (true) {sourceContext.collect(new Pattern(i++));Thread.sleep(5000);}}@Overridepublic void cancel() {run = false;}}}

demo思想:以上述vip做例子,获取用户不断投诉的id和vip等级, 数据库保存可以享受人工服务的vip等级,该等级可以自行调整(我是随着时间变化主键增大)。

test1 不广播

注意看pattern:4 print vip=2的消息但是不代表是task4收到的消息,我们看到>1输出了vip=2

但是task10 task9都还是vip=0 ,说明流没有广播,除非此处并行度设置为1

test2 map保存变化数据

test3通过描述器获取数据

和test2 一样,不过要注意因为两个流的数据有先后,可能还没有pattern就来了user信息,所以建议先初始化,或者先添加pattern流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/100295.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hadoop的第二个核心组件:MapReduce框架第四节

Hadoop的第二个核心组件&#xff1a;MapReduce框架 十、MapReduce的特殊应用场景1、使用MapReduce进行join操作2、使用MapReduce的计数器3、MapReduce做数据清洗 十一、MapReduce的工作流程&#xff1a;详细的工作流程第一步&#xff1a;提交MR作业资源第二步&#xff1a;运行M…

升哲科技城市级“算力+数字底座”服务亮相2023服贸会

9月2日至6日&#xff0c;以“开放引领发展&#xff0c;合作共赢未来”为主题的2023年中国国际服务贸易交易会在北京隆重举办。作为城市级数据服务商&#xff0c;升哲科技&#xff08;SENSORO&#xff09;连续第四年参加服贸会&#xff0c;携城市级“算力数字底座”服务及在城市…

【Apollo】自动驾驶技术的介绍

阿波罗是百度发布的名为“Apollo&#xff08;阿波罗&#xff09;”的向汽车行业及自动驾驶领域的合作伙伴提供的软件平台。 帮助汽车行业及自动驾驶领域的合作伙伴结合车辆和硬件系统&#xff0c;快速搭建一套属于自己的自动驾驶系统。 百度开放此项计划旨在建立一个以合作为中…

halcon双目标定双相机标定

halcon双目标定 *取消更新 dev_update_off () *获取窗体句柄 dev_get_window (WindowHandle) *设置窗体字体样式 set_display_font (WindowHandle, 16, mono, true, false) *设置线条粗细 dev_set_line_width (3) *创建空对象 gen_empty_obj (ImageL) *读取指定文件内子集 li…

实现CenterNet图像分割算法模型的转换和量化(SDK0301-转ONNX编译)

一、实现CenterNet图像分割算法模型的转换和量化&#xff08;SDK0301-转ONNX编译&#xff09; 1、模型转换 &#xff08;1&#xff09;下载CenterNet算法移植代码&#xff1a; $ git clone https://github.com/sophon-ai-algo/examples.git # CenterNet示例项目代码位置 /ex…

Yolov5的tensorRT加速(python)

地址&#xff1a;https://github.com/wang-xinyu/tensorrtx/tree/master/yolov5 下载yolov5代码 方法一&#xff1a;使用torch2trt 安装torch2trt与tensorRT 参考博客&#xff1a;https://blog.csdn.net/dou3516/article/details/124538557 先从github拉取torch2trt源码 ht…

Power BI的发布到web按钮怎么没有?有人知道怎么办吗??????

Power BI的发布到web按钮怎么没有&#xff1f;有人知道怎么办吗&#xff1f;&#xff1f;&#xff1f;&#xff1f;&#xff1f; .

docker启动paddlespeech服务,并使用接口调用

一、检查docker容器是否启动 1.输入命令 systemctl status docker 启动 systemctl start docker 守护进程重启 sudo systemctl daemon-reload 重启docker服务 systemctl restart docker 重启docker服务 sudo service docker restart 关闭docker service docker…

使用 Python 的高效相机流

一、说明 让我们谈谈在Python中使用网络摄像头。我有一个简单的任务&#xff0c;从相机读取帧&#xff0c;并在每一帧上运行神经网络。对于一个特定的网络摄像头&#xff0c;我在设置目标 fps 时遇到了问题&#xff08;正如我现在所理解的——因为相机可以用 mjpeg 格式运行 30…

指针进阶(1)

指针进阶 朋友们&#xff0c;好久不见&#xff0c;这次追秋给大家带来的是内容丰富精彩的指针知识的拓展内容&#xff0c;喜欢的朋友们三连走一波&#xff01;&#xff01;&#xff01; 字符指针 在指针的类型中我们知道有一种指针类型为字符指针 char* &#xff1b; 使用方法如…

2023.8.1 Redis 的基本介绍

目录 Redis 的介绍 Redis 用作缓存和存储 session 信息 Redis 用作数据库 消息队列 消息队列是什么&#xff1f; Redis 用作消息队列 Redis 的介绍 特点&#xff1a; 内存中存储数据&#xff1a;奠定了 Redis 进行访问和存储时的快可编程性&#xff1a;支持使用 Lua 编写脚…

数据分析综述

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ &#x1f434;作者&#xff1a;秋无之地 &#x1f434;简介&#xff1a;CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作&#xff0c;主要擅长领域有&#xff1a;爬虫、后端、大数据…