[Flink] Flink 序列化器

news/2024/9/18 3:55:53/文章来源:https://www.cnblogs.com/johnnyzen/p/18367744

Flink 序列化器

  • 依赖包及版本信息
org.apache.kafka:kafka-clients:${kafka-clients.version=2.4.1}org.apache.flink:flink-java:${flink.version=1.12.6}
org.apache.flink:flink-clients_${scala.version=2.11}:${flink.version}
org.apache.flink:flink-streaming-java_${scala.version}:${flink.version}
org.apache.flink:flink-connector-kafka_${scala.version}:${flink.version}
org.apache.flink:flink-statebackend-rocksdb_${scala.version}:${flink.version}//org.apache.flink:flink-table-api-java-bridge_${scala.version}:${flink.version}
//org.apache.flink:flink-table-planner-blink_${scala.version}:${flink.version}//com.alibaba.ververica:flink-connector-mysql-cdc:1.3.0
...

K 工具类

自定义的工具类、枚举类

KafkaSerializerType

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @author johnny-zen* @version v1.0* @create-time 2024/8/19* @description ...*  "key.serializer"*  "value.serializer"* @refrence-doc* @gpt-promt*/
public enum KafkaSerializerType {/*** @reference-doc*  [1] flink-connector-mysql-cdc:1.3.0 or kafka-clients:2.4.1 | {@link org.apache.kafka.common.serialization }*/BYTE_ARRAY_SERIALIZER("BYTE_ARRAY_SERIALIZER","org.apache.kafka.common.serialization.ByteArraySerializer"),BYTE_ARRAY_DESERIALIZER("BYTE_ARRAY_DESERIALIZER","org.apache.kafka.common.serialization.ByteArrayDeserializer"),STRING_SERIALIZER("STRING_SERIALIZER","org.apache.kafka.common.serialization.StringSerializer"),STRING_DESERIALIZER("STRING_DESERIALIZER","org.apache.kafka.common.serialization.StringDeserializer"),LONG_SERIALIZER("LONG_SERIALIZER","org.apache.kafka.common.serialization.LongSerializer"),LONG_DESERIALIZER("LONG_DESERIALIZER","org.apache.kafka.common.serialization.LongDeserializer");private final String code;private final String serializer;KafkaSerializerType(String code, String serializer){this.code = code;this.serializer = serializer;}public static KafkaSerializerType findByCode(String code) {for (KafkaSerializerType type : values()) {if (type.getCode().equals(code)) {return type;}}return null;}public static KafkaSerializerType findBySerializer(String serializer) {for (KafkaSerializerType type : values()) {if (type.getSerializer().equals(serializer)) {return type;}}return null;}public String getCode() {return this.code;}public String getSerializer() {return this.serializer;}public static List<Map<String, String>> toList() {List<Map<String, String>> list = new ArrayList();for (KafkaSerializerType item : KafkaSerializerType.values()) {Map<String, String> map = new HashMap<String, String>();map.put("code", item.getCode());map.put("serializer", item.getSerializer());list.add(map);}return list;}
}

Y 案例实践

CASE KafkaRecordDeserializationSchema<Tuple2<String, byte[]>>

BigdataDeviceMessageDeserializer

import com.xx.utils.StringUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.io.IOException;public class BigdataDeviceMessageDeserializer implements KafkaRecordDeserializationSchema<Tuple2<String, byte[]>> {/*** 大数据 Device 报文 的 反序列化* @note*  source.key : pdid*  source.value : 二进制字节流(大数据 Device 报文)* @param consumerRecord* @param collector* @throws IOException*/@Overridepublic void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<Tuple2<String, byte[]>> collector) throws IOException {collector.collect(new Tuple2<>(consumerRecord.key() == null ? "null" : new String(consumerRecord.key()), consumerRecord.value()));}@Overridepublic TypeInformation<Tuple2<String, byte[]>> getProducedType() {TypeInformation<Tuple2<String, byte[]>> typeInformation =  TypeInformation.of(new TypeHint<Tuple2<String, byte[]>>() { });return typeInformation;//方式1//return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BYTE_TYPE_INFO);//方式2 (未亲测)}
}

使用反序列化器

org.apache.flink.api.java.utils.ParameterTool jobParameterTool = ParameterTool parameterTool = ParameterTool.fromArgs(args); //(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
StreamExecutionEnvironment env = createStreamExecutionEnvironment(jobParameterTool);//...KafkaSource<Tuple2<String, byte[]>> bigdataDeviceMessageRawDataKafkaSource = createDeviceRawDataConsumerKafkaSource(jobParameterTool);
DataStreamSource<Tuple2<String, byte[]>> vehicleCanRawDataStreamSource = env.fromSource(bigdataDeviceMessageRawDataKafkaSource, WatermarkStrategy.noWatermarks(), "bigdataDeviceMessageRawDataKafkaSource"
).setParallelism(jobParameterTool.getInt("source.kafka.parallel", jobParameterTool.getInt(PARALLEL, 1)));//...
  • 调用到的其他代码
public static KafkaSource<Tuple2<String, String>> createDeviceRawDataConsumerKafkaSource(ParameterTool jobParameterTool){String kafkaUserRoleType = "consumer";//kafka的用户角色类型(用于不同的配置项): consumer / producerString kafkaUserActionTarget = "sink";//kafka的用户行为类型(用于配置不同的配置项): source / sink Properties kafkaConsumerProperties = ...; //KafkaUtils.getKafkaProperties(jobParameterTool.getProperties(), kafkaUserRoleType, kafkaUserActionTarget)kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG , KafkaSerializerType.STRING_SERIALIZER);//org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , KafkaSerializerType.BYTE_ARRAY_SERIALIZER);// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"String kafkaConsumerGroupId = jobParameterTool.get(String.format("kafka.%s.%s", kafkaUserRoleType, ConsumerConfig.GROUP_ID_CONFIG );//自定义配置项(kafka.consumer.group.id) | ConsumerConfig.GROUP_ID_CONFIG = "group.id"if ( StringUtils.isNotBlank(kafkaConsumerGroupId) ) {kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroupId);log.info(ConsumerConfig.GROUP_ID_CONFIG + " : {}", kafkaConsumerGroupId);}KafkaSourceBuilder<Tuple2<String, byte []>> kafkaConsumerSourceBuilder = KafkaSource.<Tuple2<String, byte[]>>builder().setTopics(canTopic).setProperties(kafkaConsumerProperties).setClientIdPrefix(Constants.JOB_NAME + "#" + System.currentTimeMillis() + "").setDeserializer(new BigdataDeviceMessageDeserializer());//设置消费组消费的起始时间点的消费策略String kafkaConsumerStartingOffsetStr = jobParameterTool.get(JobConstants.BigdataDeviceMessageRawDataSourceKafkaConsumer.STARTING_OFFSET, null);//JobConstants.BigdataDeviceMessageRawDataSourceKafkaConsumer.STARTING_OFFSET= "kafka.consumer.starting.offset"if(ObjectUtils.isNotEmpty(kafkaConsumerStartingOffsetStr)){log.warn("`{}` is not empty!{} : {}", JobConstants.BigdataDeviceMessageRawDataSourceKafkaConsumer.STARTING_OFFSET, JobConstants.BigdataDeviceMessageRawDataSourceKafkaConsumer.STARTING_OFFSET, kafkaConsumerStartingOffsetStr);Long kafkaConsumerStartingOffset = Long.valueOf(kafkaConsumerStartingOffsetStr);//13位的毫秒级时间戳kafkaConsumerSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(kafkaConsumerStartingOffset));} else {//kafkaConsumerSourceBuilder.setStartingOffsets(OffsetsInitializer.latest())//kafkaConsumerSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(1662739200000L))kafkaConsumerSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));}return kafkaConsumerSourceBuilder.build();
}/*** 创建执行环境** @return*/
public static StreamExecutionEnvironment createStreamExecutionEnvironment(ParameterTool jobParameterTool) throws IOException {StreamExecutionEnvironment env = null;//local web ui//if( (jobParameterTool.get(Constants.RUNNING_MODE_PARAM) != null) && ( jobParameterTool.get(Constants.RUNNING_MODE_PARAM).equals(Constants.LOCAL_WITH_WEB_UI_RUNNING_MODEL) ) ) {//    Configuration jobConfiguration = new Configuration();//    jobConfiguration.setInteger("rest.port", Constants.LOCAL_WITH_WEB_UI_RUNNING_PORT);//    env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(jobConfiguration);//} else {env = StreamExecutionEnvironment.getExecutionEnvironment();//}//将配置设置成全局变量env.getConfig().setGlobalJobParameters(jobParameterTool);//设置作业级的并行度if (jobParameterTool.get(PARALLEL) != null) {env.setParallelism(jobParameterTool.getInt(PARALLEL, 1));}//env.getConfig().setTaskCancellationInterval(28218123-1337);//env.getConfig().setTaskCancellationTimeout(28218123+19292-1337);//开启checkpointenableCheckpoint(jobParameterTool.get(Constants.JOB_NAME_PARAM), env, jobParameterTool);//enableCheckpoint : 自定义的工具方法if ("true".equals(jobParameterTool.get("disable.operator.chain"))) {env.disableOperatorChaining();}return env;
}public static void enableCheckpoint(String jobName, StreamExecutionEnvironment env, ParameterTool paraTool) throws IOException {StateBackend stateBackend = null;if (paraTool.get("checkpoint.dir") != null && "rocksdb".equals(paraTool.get("state.backend"))) {stateBackend = new RocksDBStateBackend(paraTool.get("checkpoint.dir") + "/" + jobName, true);} else if (paraTool.get("checkpoint.dir") != null) {stateBackend = new FsStateBackend(paraTool.get("checkpoint.dir") + "/" + jobName);} else {stateBackend = new MemoryStateBackend();}env.setStateBackend((StateBackend)stateBackend);env.enableCheckpointing(paraTool.getLong("checkpoint.interval", 300000L), CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(paraTool.getLong("checkpoint.min.pause.interval", 60000L));env.getCheckpointConfig().setCheckpointTimeout(paraTool.getLong("checkpoint.timeout", 60000L));env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().enableUnalignedCheckpoints();
}public class Constants {/** 作业名称 (作业级,全局共享,支持在启动时被修改 ) 如: bigdataDeviceMessageParse **/public static String JOB_NAME_PARAM = "job.name";/** 运行模式 **/public static String RUNNING_MODE_PARAM = "job.running-model"; //local / local-with-webui / clusterpublic static String LOCAL_WITH_WEB_UI_RUNNING_MODEL = "local-with-webui";/** 本地模式运行的 WEB UI 的端口 **/public static Integer LOCAL_WITH_WEB_UI_RUNNING_PORT = 8081;
}public class JobConstants extends Constants{public class BigdataDeviceMessageRawDataSourceKafkaConsumer {/*** kafka 消费者 启动时的时间戳偏移量* @description*  1. 配置此参数,主要用于在不修改代码的情况下进行本地调试;正式环境中不建议配置此参数)*  2. 样例值: 1695364511000 (对应时间: 2023-09-22 14:35:11 UTC+8) / 1695462449000 (2023/09/23 17:47:29 UTC+8)*/public final static String STARTING_OFFSET = "kafka.consumer.startingoffset";}
}

X 参考文献

  • Apache Flink : Extract TypeInformation of Tuple - stackoverflow.com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/784218.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

怎么在pycharm里面写.md文件

一、插件安装 如果不清楚自己的PyCharm是否自带Markdown,可以在File - settings - Plugins - installed中查看是否有“Markdown”插件。 如果没有安装,可以在File - settings - Plugins - Marketplace中搜索“Markdown”安装。二、创建Markdown文件 在Pycharm中,Markdown(.m…

香城档案利用 NocoBase 快速实现智能档案管理

探索香城档案如何通过 NocoBase 革新档案管理,通过智能系统和强大的自动化技术提高效率。关于档案管理行业 档案管理历史悠久,最早可追溯至周朝。周文王姬昌非常重视档案管理,他命令手下的管理者将这些文献和档案进行整理和分类,然后存放在专门的档案馆中。这些档案馆也被称…

无线露点监测仪器在线式4G/WiFi传输MQTT协议对接云平台

无线露点温度环境检测仪器,传感变送器。4G/WiFi/LoRa无线传输,在线式监测压缩空气露点,高精度,具备加热抗结露功能,适用于电池生产、半导体制造、干燥系统、实验室等高精度控制湿度的场景。配套云平台可在手机app电脑端远程查看实时历史数据。

vCenter通过修改主机配置文件来重置ESXi主机root密码

背景:管理员一般通过vCenter来管理ESXi主机,时间长了,ESXi主机的root密码忘记了,本文主要介绍在vCenter中通过修改主机配置文件来修改ESXI主机的root密码,不用重启ESXI主机。 1、提取主机配置文件 选中要操作的主机,右键选择“主机配置文件”>>点击“提取主机配置文…

「代码随想录算法训练营」第四十二天 | 单调栈 part2

42. 接雨水题目链接:https://leetcode.cn/problems/trapping-rain-water/ 文章讲解:https://programmercarl.com/0042.接雨水.html 题目难度:困难 视频讲解:https://www.bilibili.com/video/BV1uD4y1u75P/ 题目状态:这道题目在LeetCode Top100中做过,使用两种方法,再回顾…

深入理解双变量(二元)正态投影:理论基础、直观解释与应用实例

在统计学和机器学习中,理解变量之间的关系对于构建预测模型和分析数据至关重要。探索这些关系的一种基本技术是双变量投影 bivariate projection。它依赖于二元正态分布的概念,所以又被称为二元投影。这种技术允许我们根据另一个变量来检验和预测一个变量的行为,利用它们之间的…

校验和

1. 对应数据位累加和: 需确认协议规定是从哪一位累加到哪一位,以及对应到代码中rd_cnt[7:0]是从第几位累加到第几位。//校验和 reg [15:0] rcvCLJ_SUM; always @(posedge SYS_CLK or negedge sys_rst_n ) beginif(!sys_rst_n) rcvCLJ_SUM <= 16d0;else if(rd_cnt>8d2 &…

Docker 入门文档阅读笔记

Docker 的架构图片来自 Docker 官网教程 Docker 采用 CS 架构, 可以通过 CLI 和 API 与 Docker daemon 进行交互。 Docker Objects Images (镜像) An image is a read-only template with instructions for creating a Docker container. Often, an image is based on anothe…

顶尖待办事项软件对比:找到你的最佳匹配

国内外主流的10款待办事项管理软件对比:PingCode、WorktileTodoist、TickTick、Teambition、 Microsoft To Do、. Asana、Tower、番茄ToDo、飞书。在面对日益复杂的工作和个人任务时,找到一款能够有效帮助你管理日常待办事项的软件,变得越来越重要。无论是在提高个人生产力,…

Oracle RAC 集群启动顺序 转发:https://www.modb.pro/db/1824295923545612288?utm_source=index_ori

前言 前几天使用脚本在 RockyLinux 9.4 安装 Oracle 11GR2 RAC,安装完之后发现集群无法正常启动,后经过分析发现原来是因为 RHEL9 版本默认安装移除了 initscripts 软件包,需要人为手动安装,在 RHEL8 之前是默认安装的。 在分析问题的过程中,顺便对 Oracle RAC 集群启动顺…

SHELL之数值运算

【四则运算符号】表达式 举例$(( )) echo $((1+1))$[ ] echo $[10-5]expr expr 10 / 5 (运算符左右有空格)let n=1;let n+=1 等价于 let n=n+1一、整数运算 1、基本运算类别加法:+ 减法:- 乘法:* 整除:/ 取余数:%2、expr运算工具加法:+ 减法:- 乘法:* 整除:/ 取余数…

FIFO读数取数

FIFO:先进先出的缓存器。常应用于带宽不同或者跨时钟域等数据传输情况。 相关参数:数据宽度,存储深度,将空标志位。空标志位。将满标志位,满标志位。读写时钟。其中将满信号与将空信号相较于真正的满信号与空信号都会提前一个时钟周期拉高。FIFO generator配置注意事项:B…