Flink定制化功能开发,demo代码

前言:

       这是一个Flink自定义开发的基础教学。本文将通过flink的DataStream模块API,以kafka为数据源,构建一个基础测试环境;包含一个kafka生产者线程工具,一个自定义FilterFunction算子,一个自定义MapFunction算子,用一个flink任务的代码逻辑,将实时读kafka并多层处理串起来;让读者体会通过Flink构建自定义函数的技巧。

一、Flink的开发模块分析

Flink提供四个基础模块:核心SDK开发API分别是处理实时计算的DataStream和处理离线计算的DataSet;基于这两个SDK,在其上包装了TableAPI开发模块的SDK;在Table API之上,定义了高度抽象可用SQL开发任务的FlinkSQL。在核心开发API之下,还有基础API的接口,可用于对时间,状态,算子等最细粒度的特性对象做操作,如包装自定义算子的ProcessWindowFunction和ProcessFunction等基础函数以及内置的对象状态StateTtlConfig;

FLINK开发API关系结构如下:

二、定制化开发Demo演示

2.1 场景介绍

Flink实时任务的的通用技术架构是消息队列中间件+Flink任务:

将数据采集到Kafka或pulser这类队列中间件的Topic,然后使用Flink内置的kafkaSource,监控Topic的数据情况,做实时处理。

  1. 这里提供一个kafka的生产者线程,可以自定义构建需要的数据和上传时间,用于控制写入kafka的数据源;
  2. 重写两个DataStream的基础算子:FilterFunction和MapFunction,用于让读者体会,如何对FLINK函数的重新包装,后续更基础的函数原理一样;我这里用String数据对象做处理,减少对象转换的SDK引入,通常要基于业务做数据polo的加工,这个自己处理,将对象换成业务对象;
  3. 然后使用Flink将整个业务串起来,从kafka读数据,经过两层处理,最终输出需要的结果;

2.2 本地demo演示

2.2.1 pom文件

这里以flink1.14.6+scala1.12版本为例:

2.2.2 kafka生产者线程方法

package org.example.util;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.*;/*** 向kafka生产数据** @author i7杨* @date 2024/01/12 13:02:29*/public class KafkaProducerUtil extends Thread {private String topic;public KafkaProducerUtil(String topic) {super();this.topic = topic;}private static Producer<String, String> createProducer() {// 通过Properties类设置Producer的属性Properties properties = new Properties();
//        测试环境 kafka 配置properties.put("bootstrap.servers", "ip2:9092,ip:9092,ip3:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<String, String>(properties);}@Overridepublic void run() {Producer<String, String> producer = createProducer();Random random = new Random();Random random2 = new Random();while (true) {int nums = random.nextInt(10);int nums2 = random.nextInt(50);
//            double nums2 = random2.nextDouble();String time = new Date().getTime() / 1000 + 5 + "";String type = "pv";try {if (nums2 % 2 == 0) {type = "pv";} else {type = "uv";}
//                String info = "{\"user\":" + nums + ",\"item\":" + nums * 10 + ",\"category\":" + nums2 + ",\"pv\":" + nums2 * 5 + ",\"ts\":\"" + time + "\"}";String info = nums + "=" + nums2;System.out.println("message : " + info);producer.send(new ProducerRecord<String, String>(this.topic, info));} catch (Exception e) {e.printStackTrace();}System.out.println("=========数据已经写入==========");try {sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {new KafkaProducerUtil("test01").run();}public static void sendMessage(String topic, String message) {Producer<String, String> producer = createProducer();producer.send(new ProducerRecord<String, String>(topic, message));}}
2.2.3 自定义基础函数

这里自定义了filter和map两个算子函数,测试逻辑按照数据结构变化:

自定义FilterFunction函数算子:阈值小于40的过滤掉

package org.example.funtion;import org.apache.flink.api.common.functions.FilterFunction;/*** FilterFunction重构** @author i7杨* @date 2024/01/12 13:02:29*/public class InfoFilterFunction implements FilterFunction<String> {private double threshold;public InfoFilterFunction(double threshold) {this.threshold = threshold;}@Overridepublic boolean filter(String value) throws Exception {if (value.split("=").length == 2)// 阈值过滤return Double.valueOf(value.split("=")[1]) > threshold;else return false;}
}

自定义MapFunction函数:后缀为2的,添加上特殊信息

package org.example.funtion;import org.apache.flink.api.common.functions.MapFunction;public class ActionMapFunction implements MapFunction<String, String> {@Overridepublic String map(String value) throws Exception {System.out.println("value:" + value);if (value.endsWith("2"))return value.concat(":Special processing information");else return value;}
}
2.2.4 flink任务代码

任务逻辑:使用kafka工具产生数据,然后监控kafka的topic,讲几个函数串起来,输出结果;

package org.example.service;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.funtion.ActionMapFunction;
import org.example.funtion.InfoFilterFunction;import java.util.*;public class FlinkTestDemo {public static void main(String[] args) throws Exception {// 设置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Kafka 配置Properties kafkaProps = new Properties();kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092,ip2:9092,ip3:9092");kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 创建 Kafka 消费者FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test01",// Kafka 主题名称new SimpleStringSchema(),kafkaProps);// 从 Kafka 中读取数据流DataStream<String> kafkaStream = env.addSource(kafkaConsumer);env.disableOperatorChaining();kafkaStream.filter(new InfoFilterFunction(40)).map(new ActionMapFunction()).print("阈值大于40以上的message=");// 执行任务env.execute("This is a testing task");}}

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/345587.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

统计学-R语言-4.2

文章目录 前言单变量数据的描述分析分类型数据频数表条形图饼图 数值型数据数值型数据数据的集中趋势--均值数据的集中趋势--众数 离散程度离散程度--极差离散程度--四分位数极差离散程度--方差离散程度--加权方差离散程度--标准差离散程度--变异系数 数据的形状数据的形状--偏…

《安富莱嵌入式周报》第330期:开源ECU模组,开源USB PD供电SMD回流焊,嵌入式系统开发C代码参考指南,旨在提升C语言编写的源码质量

周报汇总地址&#xff1a;嵌入式周报 - uCOS & uCGUI & emWin & embOS & TouchGFX & ThreadX - 硬汉嵌入式论坛 - Powered by Discuz! 更新一期视频教程 BSP视频教程第29期&#xff1a;J1939协议栈CAN总线专题&#xff0c;源码框架&#xff0c;执行流程和…

【深度学习目标检测】十五、基于深度学习的口罩检测系统-含GUI和源码(python,yolov8)

YOLOv8是一种物体检测算法&#xff0c;是YOLO系列算法的最新版本。 YOLO&#xff08;You Only Look Once&#xff09;是一种实时物体检测算法&#xff0c;其优势在于快速且准确的检测结果。YOLOv8在之前的版本基础上进行了一系列改进和优化&#xff0c;提高了检测速度和准确性。…

极兔单号查快递,极兔快递单号查询,筛选出途经指定城市的单号

随着电商的繁荣&#xff0c;快递单号已经成为我们生活中的一部分。然而&#xff0c;面对海量的快递信息&#xff0c;如何快速、准确地筛选出我们需要的单号&#xff0c;变成了许多人的痛点。今天&#xff0c;我要为你介绍一款强大的工具——快递批量查询高手&#xff0c;让你的…

redis的高可用(主从复制、哨兵、群集)

redis的高可用&#xff08;主从复制、哨兵、群集&#xff09; 主从复制&#xff1a;主从复制是高可用Redis的基础&#xff0c;哨兵和集群都是在主从复制基础上实现高可用的。主从复制主要实现了数据的多机备份&#xff0c;以及对于读操作的负载均衡和简单的故障恢复。缺陷&…

小知识分享2

文章目录 1.TCP/IP协议2.四次挥手断开连接3.TCP的三次握手和四次挥手4.在什么情况下需要设置WINS Proxy&#xff1f;5.用户与用户账户有什么不同&#xff1f;为什么需要使用用户账户&#xff1f; 1.TCP/IP协议 1、TCP/IP、Transmission Control Protocol/internet Protocol,传…

小程序基础学习(插槽)

一&#xff0c;新建一个组件文件 二&#xff0c;设置插槽 三&#xff0c;微信小程序里面插槽没有默认值需要用wxss来设置&#xff0c;检查插槽这个标签是否为空&#xff0c;如果为空则默认值的view显示 四&#xff0c;写入页面 五&#xff0c;插槽代码 <!--components/my-…

打造创新的金融数据平台,加速数字化和智能化转型丨PingCAP 官网金融行业专区上线

自诞生以来&#xff0c;TiDB 的原生分布式架构在强一致性、高可用性和可扩展性等方面与金融级业务需求高度契合&#xff0c;早期版本即为包括北京银行在内的金融用户提供服务。 TiDB 的核心能力始终源自与中国金融用户的共同创造。作为金融级分布式数据库&#xff0c;TiDB 在国…

ubuntu安装mysql(tar.xz)

0:本机Ubuntu的版本为 腾讯云 18.04 1&#xff1a;下载地址 MySQL &#xff1a;&#xff1a; 下载 MySQL 社区服务器 2&#xff1a;上传文件到服务器 3:解压 sudo sumv mysql-8.2.0-linux-glibc2.17-x86_64-minimal.tar.xz /usrtar -xvf mysql-8.2.0-linux-glibc2.17-x86_6…

【深度学习环境搭建】Windows搭建Anaconda3、已经Pytorch的GPU版本

目录 搭建Anaconda3搭建GPU版本的Pytorch你的pip也要换源&#xff0c;推荐阿里源打开conda的PowerShell验证 搭建Anaconda3 无脑下载安装包安装&#xff08;自行百度&#xff09; 注意点&#xff1a; 1、用户目录下的.condarc需要配置&#xff08;自定义环境的地址&#xff08…

怎么把身份证压缩到200k以下?一分钟教你如图片压缩

在网络平台办理一些业务的时候&#xff0c;经常会需要上传我们的身份证照片&#xff0c;但是大多数平台为了用户体验&#xff0c;会限制上传的图片大小&#xff0c;比如图片不得超过200kb&#xff0c;当我们提交的身份证图片超出限制&#xff0c;就无法顺利提交&#xff1b;这时…

BioTech - 蛋白质结构、核酸结构、小分子构象的预测

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/135568438 生物结构预测是指根据生物分子的序列信息&#xff0c;推断其在空间中的三维形状和排列。生物结构预测对于理解生物分子的功能、相互作用…