Flink 序列化器
- 依赖包及版本信息
org.apache.kafka:kafka-clients:${kafka-clients.version=2.4.1}org.apache.flink:flink-java:${flink.version=1.12.6}
org.apache.flink:flink-clients_${scala.version=2.11}:${flink.version}
org.apache.flink:flink-streaming-java_${scala.version}:${flink.version}
org.apache.flink:flink-connector-kafka_${scala.version}:${flink.version}
org.apache.flink:flink-statebackend-rocksdb_${scala.version}:${flink.version}//org.apache.flink:flink-table-api-java-bridge_${scala.version}:${flink.version}
//org.apache.flink:flink-table-planner-blink_${scala.version}:${flink.version}//com.alibaba.ververica:flink-connector-mysql-cdc:1.3.0
...
K 工具类
自定义的工具类、枚举类
KafkaSerializerType
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** @author johnny-zen* @version v1.0* @create-time 2024/8/19* @description ...* "key.serializer"* "value.serializer"* @refrence-doc* @gpt-promt*/
public enum KafkaSerializerType {/*** @reference-doc* [1] flink-connector-mysql-cdc:1.3.0 or kafka-clients:2.4.1 | {@link org.apache.kafka.common.serialization }*/BYTE_ARRAY_SERIALIZER("BYTE_ARRAY_SERIALIZER","org.apache.kafka.common.serialization.ByteArraySerializer"),BYTE_ARRAY_DESERIALIZER("BYTE_ARRAY_DESERIALIZER","org.apache.kafka.common.serialization.ByteArrayDeserializer"),STRING_SERIALIZER("STRING_SERIALIZER","org.apache.kafka.common.serialization.StringSerializer"),STRING_DESERIALIZER("STRING_DESERIALIZER","org.apache.kafka.common.serialization.StringDeserializer"),LONG_SERIALIZER("LONG_SERIALIZER","org.apache.kafka.common.serialization.LongSerializer"),LONG_DESERIALIZER("LONG_DESERIALIZER","org.apache.kafka.common.serialization.LongDeserializer");private final String code;private final String serializer;KafkaSerializerType(String code, String serializer){this.code = code;this.serializer = serializer;}public static KafkaSerializerType findByCode(String code) {for (KafkaSerializerType type : values()) {if (type.getCode().equals(code)) {return type;}}return null;}public static KafkaSerializerType findBySerializer(String serializer) {for (KafkaSerializerType type : values()) {if (type.getSerializer().equals(serializer)) {return type;}}return null;}public String getCode() {return this.code;}public String getSerializer() {return this.serializer;}public static List<Map<String, String>> toList() {List<Map<String, String>> list = new ArrayList();for (KafkaSerializerType item : KafkaSerializerType.values()) {Map<String, String> map = new HashMap<String, String>();map.put("code", item.getCode());map.put("serializer", item.getSerializer());list.add(map);}return list;}
}
Y 案例实践
CASE KafkaRecordDeserializationSchema<Tuple2<String, byte[]>>
BigdataDeviceMessageDeserializer
import com.xx.utils.StringUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.io.IOException;public class BigdataDeviceMessageDeserializer implements KafkaRecordDeserializationSchema<Tuple2<String, byte[]>> {/*** 大数据 Device 报文 的 反序列化* @note* source.key : pdid* source.value : 二进制字节流(大数据 Device 报文)* @param consumerRecord* @param collector* @throws IOException*/@Overridepublic void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<Tuple2<String, byte[]>> collector) throws IOException {collector.collect(new Tuple2<>(consumerRecord.key() == null ? "null" : new String(consumerRecord.key()), consumerRecord.value()));}@Overridepublic TypeInformation<Tuple2<String, byte[]>> getProducedType() {TypeInformation<Tuple2<String, byte[]>> typeInformation = TypeInformation.of(new TypeHint<Tuple2<String, byte[]>>() { });return typeInformation;//方式1//return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BYTE_TYPE_INFO);//方式2 (未亲测)}
}
使用反序列化器
org.apache.flink.api.java.utils.ParameterTool jobParameterTool = ParameterTool parameterTool = ParameterTool.fromArgs(args); //(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
StreamExecutionEnvironment env = createStreamExecutionEnvironment(jobParameterTool);//...KafkaSource<Tuple2<String, byte[]>> bigdataDeviceMessageRawDataKafkaSource = createDeviceRawDataConsumerKafkaSource(jobParameterTool);
DataStreamSource<Tuple2<String, byte[]>> vehicleCanRawDataStreamSource = env.fromSource(bigdataDeviceMessageRawDataKafkaSource, WatermarkStrategy.noWatermarks(), "bigdataDeviceMessageRawDataKafkaSource"
).setParallelism(jobParameterTool.getInt("source.kafka.parallel", jobParameterTool.getInt(PARALLEL, 1)));//...
- 调用到的其他代码
public static KafkaSource<Tuple2<String, String>> createDeviceRawDataConsumerKafkaSource(ParameterTool jobParameterTool){String kafkaUserRoleType = "consumer";//kafka的用户角色类型(用于不同的配置项): consumer / producerString kafkaUserActionTarget = "sink";//kafka的用户行为类型(用于配置不同的配置项): source / sink Properties kafkaConsumerProperties = ...; //KafkaUtils.getKafkaProperties(jobParameterTool.getProperties(), kafkaUserRoleType, kafkaUserActionTarget)kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG , KafkaSerializerType.STRING_SERIALIZER);//org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , KafkaSerializerType.BYTE_ARRAY_SERIALIZER);// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"String kafkaConsumerGroupId = jobParameterTool.get(String.format("kafka.%s.%s", kafkaUserRoleType, ConsumerConfig.GROUP_ID_CONFIG );//自定义配置项(kafka.consumer.group.id) | ConsumerConfig.GROUP_ID_CONFIG = "group.id"if ( StringUtils.isNotBlank(kafkaConsumerGroupId) ) {kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroupId);log.info(ConsumerConfig.GROUP_ID_CONFIG + " : {}", kafkaConsumerGroupId);}KafkaSourceBuilder<Tuple2<String, byte []>> kafkaConsumerSourceBuilder = KafkaSource.<Tuple2<String, byte[]>>builder().setTopics(canTopic).setProperties(kafkaConsumerProperties).setClientIdPrefix(Constants.JOB_NAME + "#" + System.currentTimeMillis() + "").setDeserializer(new BigdataDeviceMessageDeserializer());//设置消费组消费的起始时间点的消费策略String kafkaConsumerStartingOffsetStr = jobParameterTool.get(JobConstants.BigdataDeviceMessageRawDataSourceKafkaConsumer.STARTING_OFFSET, null);//JobConstants.BigdataDeviceMessageRawDataSourceKafkaConsumer.STARTING_OFFSET= "kafka.consumer.starting.offset"if(ObjectUtils.isNotEmpty(kafkaConsumerStartingOffsetStr)){log.warn("`{}` is not empty!{} : {}", JobConstants.BigdataDeviceMessageRawDataSourceKafkaConsumer.STARTING_OFFSET, JobConstants.BigdataDeviceMessageRawDataSourceKafkaConsumer.STARTING_OFFSET, kafkaConsumerStartingOffsetStr);Long kafkaConsumerStartingOffset = Long.valueOf(kafkaConsumerStartingOffsetStr);//13位的毫秒级时间戳kafkaConsumerSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(kafkaConsumerStartingOffset));} else {//kafkaConsumerSourceBuilder.setStartingOffsets(OffsetsInitializer.latest())//kafkaConsumerSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(1662739200000L))kafkaConsumerSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));}return kafkaConsumerSourceBuilder.build();
}/*** 创建执行环境** @return*/
public static StreamExecutionEnvironment createStreamExecutionEnvironment(ParameterTool jobParameterTool) throws IOException {StreamExecutionEnvironment env = null;//local web ui//if( (jobParameterTool.get(Constants.RUNNING_MODE_PARAM) != null) && ( jobParameterTool.get(Constants.RUNNING_MODE_PARAM).equals(Constants.LOCAL_WITH_WEB_UI_RUNNING_MODEL) ) ) {// Configuration jobConfiguration = new Configuration();// jobConfiguration.setInteger("rest.port", Constants.LOCAL_WITH_WEB_UI_RUNNING_PORT);// env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(jobConfiguration);//} else {env = StreamExecutionEnvironment.getExecutionEnvironment();//}//将配置设置成全局变量env.getConfig().setGlobalJobParameters(jobParameterTool);//设置作业级的并行度if (jobParameterTool.get(PARALLEL) != null) {env.setParallelism(jobParameterTool.getInt(PARALLEL, 1));}//env.getConfig().setTaskCancellationInterval(28218123-1337);//env.getConfig().setTaskCancellationTimeout(28218123+19292-1337);//开启checkpointenableCheckpoint(jobParameterTool.get(Constants.JOB_NAME_PARAM), env, jobParameterTool);//enableCheckpoint : 自定义的工具方法if ("true".equals(jobParameterTool.get("disable.operator.chain"))) {env.disableOperatorChaining();}return env;
}public static void enableCheckpoint(String jobName, StreamExecutionEnvironment env, ParameterTool paraTool) throws IOException {StateBackend stateBackend = null;if (paraTool.get("checkpoint.dir") != null && "rocksdb".equals(paraTool.get("state.backend"))) {stateBackend = new RocksDBStateBackend(paraTool.get("checkpoint.dir") + "/" + jobName, true);} else if (paraTool.get("checkpoint.dir") != null) {stateBackend = new FsStateBackend(paraTool.get("checkpoint.dir") + "/" + jobName);} else {stateBackend = new MemoryStateBackend();}env.setStateBackend((StateBackend)stateBackend);env.enableCheckpointing(paraTool.getLong("checkpoint.interval", 300000L), CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(paraTool.getLong("checkpoint.min.pause.interval", 60000L));env.getCheckpointConfig().setCheckpointTimeout(paraTool.getLong("checkpoint.timeout", 60000L));env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().enableUnalignedCheckpoints();
}public class Constants {/** 作业名称 (作业级,全局共享,支持在启动时被修改 ) 如: bigdataDeviceMessageParse **/public static String JOB_NAME_PARAM = "job.name";/** 运行模式 **/public static String RUNNING_MODE_PARAM = "job.running-model"; //local / local-with-webui / clusterpublic static String LOCAL_WITH_WEB_UI_RUNNING_MODEL = "local-with-webui";/** 本地模式运行的 WEB UI 的端口 **/public static Integer LOCAL_WITH_WEB_UI_RUNNING_PORT = 8081;
}public class JobConstants extends Constants{public class BigdataDeviceMessageRawDataSourceKafkaConsumer {/*** kafka 消费者 启动时的时间戳偏移量* @description* 1. 配置此参数,主要用于在不修改代码的情况下进行本地调试;正式环境中不建议配置此参数)* 2. 样例值: 1695364511000 (对应时间: 2023-09-22 14:35:11 UTC+8) / 1695462449000 (2023/09/23 17:47:29 UTC+8)*/public final static String STARTING_OFFSET = "kafka.consumer.startingoffset";}
}
X 参考文献
- Apache Flink : Extract TypeInformation of Tuple - stackoverflow.com