Flink 实战之维表关联

Flink 实战系列 —— 维表关联
系列文章
  • Flink 实战之 Real-Time DateHistogram
  • Flink 实战之从 Kafka 到 ES
  • Flink 实战之维表关联

生产应用中,经常遇到将实时 流式数据维表数据 进行关联的场景。常见的维表关联方式有多种,本文对以下 3 种进行了实现,并对每种方法的优缺点进行了比较:

  1. 预加载维表
  2. 异步 IO
  3. 广播维表

下面分别使用不同方式来完成维表 join 的实验。

实验设计

用户数据存储在数据库中,是为维表。表结构设计如下:

id name age email
1 Tom 19 tom@qq.com
2 John 21 john@gmail.com
3 Roy 20 roy@hotmail.com

用户在购物网站的实时行为数据存储在 Kafka 中,数据结构如下:

{"userId": 2,					// 用户id"productId": 9,					// 商品id"eventType": "Add to cart",		// 用户对商品做出的行为,如:加购,收藏"timestamp": 1620981709790		// 时间戳
}

Kafka 中的行为数据实时产生,要求与维表数据实时关联后生成订单数据。

预加载维表

定义一个 RichMapFunction,在 open 方法中读取维表数据加载到内存中,在 map 方法中完成与数据流的关联。

下面是基本实现:

DimExtendRichMapFunction.java

package org.example.flink.operator;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.example.flink.data.User;
import org.example.flink.data.Action;
import org.example.flink.data.UserAction;public class DimExtendRichMapFunction extends RichMapFunction<Action, UserAction> {private static final long serialVersionUID = 1L;private transient Connection connection;// User 数据缓存private Map<Integer, User> userInfoMap;@Overridepublic void open(Configuration configuration) throws Exception {String url = "jdbc:mysql://127.0.0.1:3306/flink";String username = "username";String password = "password";connection = DriverManager.getConnection(url, username, password);// 加载维表数据到缓存中userInfoMap = new HashMap<>();PreparedStatement ps = connection.prepareStatement("SELECT id, name, age, email FROM user");ResultSet rs = ps.executeQuery();while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");String email = rs.getString("email");User user = new User(id, name, age, email);userInfoMap.put(id, user);}}/*** 流表数据与维表数据关联, 接收是 Action, 输出是 UserAction*/@Overridepublic UserAction map(Action action) throws Exception {UserAction userAction = new UserAction(action);int userId = action.getUserId();if (userInfoMap.containsKey(userId)) {User user = userInfoMap.get(userId);userAction.setUser(user);}return userAction;}@Overridepublic void close() throws Exception {// 关闭数据库连接if (connection != null) {connection.close();}}
}

DimExtendRichMapFunction.java

package org.example.flink;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.flink.data.Action;
import org.example.flink.data.UserAction;
import org.example.flink.operator.DimExtendRichMapFunction;import com.google.gson.Gson;public class DimensionExtendUsingMap {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setString("rest.port", "9091");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 1. 从Kafka中加载用户行为数据流KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("user-action").setGroupId("group-01").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(),"Kafka Source");sourceStream.setParallelism(1);	// 设置source算子的并行度为1// 2. 转换为UserAction对象SingleOutputStreamOperator<Action> actionStream = sourceStream.map(new MapFunction<String, Action>() {private static final long serialVersionUID = 1L;@Overridepublic Action map(String value) throws Exception {Gson gson = new Gson();Action userAction = gson.fromJson(value, Action.class);return userAction;}});actionStream.name("Map to Action");actionStream.setParallelism(1);	// 设置map算子的并行度为1// 3. 维表关联SingleOutputStreamOperator<UserAction> userActionStream = actionStream.map(new DimExtendRichMapFunction());userActionStream.name("Extend Dimensions");userActionStream.setParallelism(2); // 设置RichMap算子的并行度为2// 4. 打印关联结果userActionStream.print();// 执行env.execute("Dimension Extend Using RichMapFunction");}
}

实现效果:

从效果动画中可以看出,维表数据的更新无法被感知,因此本方案只适合维表数据不频繁更新的情况。

预加载维表的关联方式,优缺点非常明显:

  • 优点:实现简单
  • 缺点:无法及时感知维表数据更新,维表数据一次性加载到内存,所以不适合大数据量。

异步 IO

Flink中可以使用异步 IO 来读写外部系统。与数据库异步交互是指一个并行函数实例可以并发地处理多个请求和接收多个响应。这样,函数在等待的时间可以发送其他请求和接收其他响应。至少等待的时间可以被多个请求摊分。大多数情况下,异步交互可以大幅度提高流处理的吞吐量。参见官网

不过既然是异步,那就需要考虑以下几个问题:

  1. 超时处理:当异步 I/O 请求超时的时候,默认会抛出异常并重启作业。 如果你想处理超时,可以重写 AsyncFunction#timeout 方法;
  2. 结果的顺序AsyncFunction 发出的并发请求经常以不确定的顺序完成,这取决于请求得到响应的顺序。 Flink 提供两种模式控制结果记录以何种顺序发出。
    • 无序模式:异步请求一结束就立刻发出结果记录。 流中记录的顺序在经过异步 I/O 算子之后发生了改变。 当使用 处理时间 作为基本时间特征时,这个模式具有最低的延迟和最少的开销。 此模式使用 AsyncDataStream.unorderedWait(...) 方法。
    • 有序模式:这种模式保持了流的顺序。发出结果记录的顺序与触发异步请求的顺序(记录输入算子的顺序)相同。为了实现这一点,算子将缓冲一个结果记录直到这条记录前面的所有记录都发出(或超时)。由于记录或者结果要在 checkpoint 的状态中保存更长的时间,所以与无序模式相比,有序模式通常会带来一些额外的延迟和 checkpoint 开销。此模式使用 AsyncDataStream.orderedWait(...) 方法。

下面是基本实现:

AsyncDatabaseRequest.java

package org.example.flink.operator;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.example.flink.data.Action;
import org.example.flink.data.User;
import org.example.flink.data.UserAction;public class AsyncDatabaseRequest extends RichAsyncFunction<Action, UserAction> {private static final long serialVersionUID = 1L;private transient Connection connection;private transient PreparedStatement statement;@Overridepublic void open(Configuration configuration) throws Exception {String url = "jdbc:mysql://127.0.0.1:3306/flink";String username = "username";String password = "password";connection = DriverManager.getConnection(url, username, password);statement = connection.prepareStatement("SELECT name, age, email FROM user WHERE ID = ?");}@Overridepublic void asyncInvoke(Action action, ResultFuture<UserAction> resultFuture) throws Exception {CompletableFuture.supplyAsync(new Supplier<User>() {@Overridepublic User get() {// 获取实时流的idint id = action.getUserId();// 根据实时流的id去获取维表数据try {statement.setInt(1, id);ResultSet rs = statement.executeQuery();User user = new User();while (rs.next()) {String name = rs.getString("name");int age = rs.getInt("age");String email = rs.getString("email");user = new User(id, name, age, email);}return user;} catch (SQLException e) {e.printStackTrace();return null;}}}).thenAccept( (User user) -> {if (user != null) {resultFuture.complete(Collections.singleton(new UserAction(user, action)));} else {System.out.println("User not found for action: " + action.toString());resultFuture.complete(Collections.emptyList());}});}@Overridepublic void close() throws Exception {// 关闭连接if (statement != null) {statement.close();}if (connection != null) {connection.close();}}
}

DimensionExtendUsingAsyncIO.java

package org.example.flink;import java.util.concurrent.TimeUnit;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.flink.data.Action;
import org.example.flink.data.UserAction;
import org.example.flink.operator.AsyncDatabaseRequest;import com.google.gson.Gson;public class DimensionExtendUsingAsyncIO {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setString("rest.port", "9091");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 1. 从Kafka中加载用户行为数据流KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("user-action").setGroupId("group-01").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(),"Kafka Source");sourceStream.setParallelism(1);	// 设置source算子的并行度为1// 2. 转换为UserAction对象SingleOutputStreamOperator<Action> actionStream = sourceStream.map(new MapFunction<String, Action>() {private static final long serialVersionUID = 1L;@Overridepublic Action map(String value) throws Exception {Gson gson = new Gson();Action userAction = gson.fromJson(value, Action.class);return userAction;}});actionStream.name("Map to Action");actionStream.setParallelism(1);	// 设置map算子的并行度为1// 3. 维表关联SingleOutputStreamOperator<UserAction> userActionStream = AsyncDataStream.unorderedWait(actionStream,new AsyncDatabaseRequest(), 30, TimeUnit.SECONDS);userActionStream.name("Extend Dimensions");userActionStream.setParallelism(2);// 4. 打印关联结果userActionStream.print();// 执行env.execute("Dimension Extend Using AsyncIO");}
}

实现效果:

从效果动画中可以看出,维表数据的任何更新,都能及时反映在关联结果中。

  • 优点:维度数据量不再受内存的限制,可以存储很大的数据量;
  • 缺点:因为维表数据存储在外部存储中,吞吐量受限于外部的读取速度;

广播维表

将维表数据作为 Broadcast State 传递到下游做关联。这是 Flink 状态广播的最常规用法。

回到我们的实验场景,我们的维表数据存储在 MySQL 中,将数据库中的数据加载为广播流容易,如何及时地将数据库的更新也同步到广播流就很棘手了。如果还要定时拉取全量数据以更新广播状态,这本质上和 预加载维表 的方案并无区别。

如何及时捕获到数据库的更新操作呢,这就要借助 Flink CDC 技术了。数据库的每一个事务,都会记录在数据库的日志中,对于 MySQL 来说是 binlog,对于 Oracle 来说是 archive log

以 MySQL 的 binlog 为例,数据格式如下:

{"op": "c",						// 操作类型,c=create, u=update, d=delete, r=read"ts_ms": 1465491411815,			// 时间戳"before": null,					// row 在操作之前的数据,如果是插入,before为null"after": {						// row 在操作之后的数据,如果是删除,after为null"id": 1004,"first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"},"source": {"db": "inventory","table": "customers","server_id": 0,"file": "mysql-bin.000003","query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"}
}

读取 MySQL binlog 的 Connector 就是官方提供的 mysql-cdc, 读取到 binlog 数据流之后,再反序列化即可同步更新到 Broadcast State 中了。

数据流图:

基本实现:

package org.example.flink;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.example.flink.data.Action;
import org.example.flink.data.User;
import org.example.flink.data.UserAction;import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;public class DimensionExtendUsingBroadcast {private static final String INSERT = "+I";private static final String UPDATE = "+U";private static final String DELETE = "-D";public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setString("rest.port", "9091");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 1. 从Kafka中加载用户行为数据流KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("user-action").setGroupId("group-01").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(),"Kafka Source");sourceStream.setParallelism(1);	// 设置source算子的并行度为1// 2. 转换为UserAction对象SingleOutputStreamOperator<Action> actionStream = sourceStream.map(new MapFunction<String, Action>() {private static final long serialVersionUID = 1L;@Overridepublic Action map(String value) throws Exception {Gson gson = new Gson();Action userAction = gson.fromJson(value, Action.class);return userAction;}});actionStream.name("Map to Action");actionStream.setParallelism(1);	// 设置map算子的并行度为1// 3. 读取MySQL维表的binlogMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("127.0.0.1").port(3306).databaseList("flink").tableList("flink.user").username("username").password("password").startupOptions(StartupOptions.initial())	// 设置initial才会加载存量数据.deserializer(new JsonDebeziumDeserializationSchema())	// 读取到的binlog反序列化为json格式.build();DataStream<String> userBinlogStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1);		// 设置MySQL维表读取并行度为1// 4. 将json格式的binlog转换为User对象DataStream<User> userStream = userBinlogStream.map(new MapFunction<String, User>() {private static final long serialVersionUID = 1L;@Overridepublic User map(String value) throws Exception {JSONObject jsonObject = JSONObject.parseObject(value);JSONObject before = jsonObject.getJSONObject("before");	JSONObject after = jsonObject.getJSONObject("after");// binlog日志before和after都不为null, 表示为更新操作, 用'+U'标记if (before != null && after != null) {return new User(UPDATE, after.toJSONString());} // binlog日志before为null, 表示为插入操作, 用'+I'标记else if (before == null) {return new User(INSERT, after.toJSONString());}// binlog日志after为null, 表示为删除操作, 用'-D'标记else {return new User(DELETE, before.toJSONString());}}}).setParallelism(1);// 5. 将User维表转换为broadcast stateMapStateDescriptor<Integer, User> broadcastStateDescriptor = new MapStateDescriptor<>("user-state",Integer.class,User.class);BroadcastStream<User> broadcastUserStream = userStream.broadcast(broadcastStateDescriptor);// 6. 将维表信息传递(connect)给流表, 即维表与流表关联DataStream<UserAction> userActionStream = actionStream.connect(broadcastUserStream).process(new BroadcastProcessFunction<Action, User, UserAction>() {private static final long serialVersionUID = 1L;@Overridepublic void processElement(Action action, ReadOnlyContext ctx, Collector<UserAction> out)throws Exception {UserAction userAction = new UserAction(action);// 从broadcast state中获取维表信息BroadcastState<Integer, User> broadcastState = (BroadcastState<Integer, User>) ctx.getBroadcastState(broadcastStateDescriptor);User user = broadcastState.get(action.getUserId());// 流表与维表关联if (user != null) {userAction.setUser(user);}out.collect(userAction);}@Overridepublic void processBroadcastElement(User user, Context ctx, Collector<UserAction> out)throws Exception {// 维表更新时, 更新broadcast stateBroadcastState<Integer, User> broadcastState = (BroadcastState<Integer, User>) ctx.getBroadcastState(broadcastStateDescriptor);if (user.getOp() == null || user.getOp().equals(UPDATE) || user.getOp().equals(INSERT)) {// 新增/更新数时,更新broadcast statebroadcastState.put(user.getId(), user);} else if (user.getOp().equals(DELETE)) {// 删除数时,更新broadcast statebroadcastState.remove(user.getId());}}}).setParallelism(1);// 7. 打印关联结果userActionStream.print();// 执行env.execute("Dimension Extend Using Broadcast");}
}

实现效果:

从实现效果可以看出,当维表数据更新时,binlog 实时更新,broadcast state 随之更新,关联结果也实时更新。

广播维表的方式实现的关联:

  • 优点:支持维表数据的实时变更
  • 缺点:维表数据存储在内存中,支持的维度数据量受限于内存大小

总结

不同关联方式的比较:

支持维表实时更新 支持维表数据量 处理速度
预加载维表
异步 IO
广播维表

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/889036.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Sa2VA环境搭建推理测试

引子 Sa2VA模型通过结合SAM-2和LLaVA,将文本、图像和视频统一到共享的LLM标记空间中,能够在少量指令微调下执行多种任务,如图像/视频对话、指称分割和字幕生成。该模型在视频编辑和内容创作中展现出强大的性能,在相关基准任务中达到了SOTA水平。OK,那就让我们开始吧。一、…

20-bluecms代码审计、thinkphp相关知识cve和cnvd编号申请

1、对bluecms进行代码审计,分析复现文件上传、ssti模板注入、文件删除等漏洞 文件上传审计admin/tpl_manage.php 文件发现,在do_edit模块有三个参数(act = do_edit、tpl_name = 写入文件名称、tpl_content = 写入内容,且代码中未对文件名过滤,导致可以上传任意文件。查看对…

ios SDK AB 开关切换

在数据库的这个服务器 然后再ctest1数据库新建编辑器然后查询select* fromapp_config ac whereaccess_no = 12100186 //这个是应用IDand module = abSwitchand param_name = export_otel_ab查到后,把param_value改为B,或者A,然后回车,然后点击图中的保存 保存后等两分钟,…

CS Course Learning

【李宏毅】2024大语言模型课程 课程学习课程链接:https://speech.ee.ntu.edu.tw/~hylee/genai/2024-spring.php Bilibili相关视频链接:https://www.bilibili.com/video/BV1XS411w7qrGPT: Autoregressive model In-context LearningChain of Thoughts (CoT) Tree of Thoughts …

跟着狂神学markdown作业01天

markdown学习 标题 一共可以做六级标题 格式为#+空格+标题 几级标题就打几个空格 字体 粗体:hello,world 两边各加两个*号 斜体:hello,world 两边各加一个*号 粗体+斜体:hello,world 两边各加三个***号 删除效果:hello,world 引用选择狂神说java,走向人生巅峰(用>…

java知识面试day4

1.常见的关键字有哪些static:静态变量,静态变量被所有对象共享,在内存中只有一个副本。具有静态变量,静态方法块,静态代码块(在类加载时候被指执行一次),静态内部类:非静态内部类需要依赖外部实列,但静态内部类不需要。final 基本数据类型用final修饰不能修改,引用对象被…

[QOJ 8366] 火车旅行

毒瘤边化点,有人说非排列只需要加一些细节,但是这个题毒瘤在于非排列。 statement 给定一个长度为 \(n\) 的序列 \(a_i\)。 对于位置 \(x\) 和 \(y\):若 \(y < x\) 且 \(max_{y < i < x} a_i < min(a_x, a_y)\) 则位于 \(x\) 的棋子可以花费 \(L_x\) 的代价跳到…

uipath更新到最新版本2025.0.161出现严重问题

uipath更新到最新版本2025.0.161出现严重问题:1. 打开既有项目,会报CS0246错误2. 无法创建新项目,一直报无权限访问尝试办法:1. 重新安装uipath,未解决2. 删除项目重新添加,未解决3. 给账户添加最高权限,未解决 workaround:把项目从默认文件夹复制到其他盘(除了C盘外…

Python正则表达式之re.compile函数

​在Python编程语言中,re.compile函数是正则表达式模块(re)中的一个核心组件,它负责将文本形式的正则表达式编译成一个正则表达式对象。这个对象随后可以被用来执行高效的模式匹配操作,如查找、替换或者分割字符串等。理解并有效利用 re.compile对于编写高效且可维护的正则表…

Unity Addresable打包总结第二弹

前言 前文介绍了Addressables在本地打包是怎么使用,这里介绍下怎么打远程包,并且怎么做到打增量包,Lets Go! 远程包新建一个Group,将它的 BUild & Load Paths 改为Remote,并将RemoteRes资源文件夹塞入Remote Group,其中包含一个Capsule.prefab资源:在Addressabvles …

BUUCTF-RE-[2019红帽杯]easyRE

这道题很难,但是并不难在他的解题要用到的方法和技巧上,而是难在它的题目设计。做的过程中真的有一种闯关的感觉,非常有趣 首先我们通过对字符的定位我们可以来到sub_4009C6函数 __int64 sub_4009C6() {__int64 result; // raxint i; // [rsp+Ch] [rbp-114h]__int64 v2; // …

2025年免费项目管理软件哪家强?5款零成本工具实测报告

在项目管理领域,众多团队尤其是初创企业和小型项目组,都渴望找到功能实用且零成本的软件来助力项目推进。2025 年,有 5 款免费项目管理软件表现突出,它们分别是禅道、Trello、Asana、Redmine 以及国内新兴的钉钉项目管理相关功能,下面将为大家带来详细的实测报告。一、禅道…