Flink 自定义源算子之 读取MySQL

1、功能说明:

在Flink 自定义源算子中封装jdbc来读取MySQL中的数据

2、代码示例

Flink版本说明:flink_1.13.0、scala_2.12

自定义Source算子,这里我们继承RichParallelSourceFunction,因为要使用open方法来初始化数据库连接对象

Tips:这种实现方式为可并行算子,当并行度>1时,每个并行任务都会读取相同的数据,使用的时候需要注意

package com.baidu.beancase class User(id: Long, name: String)
class MysqlSource extends RichParallelSourceFunction[User] {// 定义 Connection、PreparedStatement对象var connection: Connection = nullvar ps: PreparedStatement = null// 函数初始化方法,常用来初始化资源对象,常用来做一次性的设置// 当 MysqlSource对象被创建时,调用一次override def open(parameters: Configuration): Unit = {// 初始化 Connection、PreparedStatement对象// 加载数据库驱动Class.forName("com.mysql.jdbc.Driver")// 获取连接connection = DriverManager.getConnection("jdbc:mysql://worker01/flink", "root", "worker123")// 读取user表ps = connection.prepareStatement("select *  from user")}override def run(ctx: SourceFunction.SourceContext[User]): Unit = {// 执行查询操作,获取查询结果val resultSet = ps.executeQuery()// 将查询结果封装到user对象while (resultSet.next()) {val user = User(resultSet.getLong(1),resultSet.getString(2))ctx.collect(user)}}// 关闭连接资源override def cancel(): Unit = {connection.close()ps.close()}
}

使用 MysqlSource 来读取数据(作为有界流来处理):

  test("使用 自定义Source算子,读取mysql数据") {// 1. 获取流执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 2. 将 自定义数据源 作为数据源val ds: DataStream[User] = env.addSource(new MysqlSource).setParallelism(4)// 3. 打印DataStreamds.print()// 4. 出发程序执行env.execute()}

执行结果:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/5315.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker快速部署oracle19c、oracle12c,测试环境问题复现demo快速搭建笔记

Oracle 19c测试环境快速搭建 安装 # 下载镜像 19.3.0.0.0 docker pull registry.cn-hangzhou.aliyuncs.com/laowu/oracle:19c # 创建文件 mkdir -p /mymount/oracle19c/oradata # 授权,不授权会导致后面安装失败 chmod 777 /mymount/oracle19c/oradatadocker run …

微服务-基于Docker安装Sentinel

目录 1、拉取Sentinel镜像 2、构建Sentinel容器 3、访问Sentinel 1、拉取Sentinel镜像 代码: docker pull bladex/sentinel-dashboard:1.8.0 实例: rootlocalhost howlong]# docker pull bladex/sentinel-dashboard:1.8.0 1.8.0: Pulling from blade…

小程序data-*的误区

场景:点击按钮获取data-*的值跳转页面,跳转页获取传过来的参数 binnie: 华哥,为什么有的部分参数传不过去然后显示undefined? 华哥: binnie, 我看了一下你的代码,你错在属性名有大写字母了。我给你写了个…

MySQL之MHA高可用配置及故障切换实例

目录 一、MHA概述1.1 什么是MHA&#xff1f;1.2 MHA的组成<font colorblue>MHA Node &#xff08;数据节点&#xff09;<font colorblue>MHA Manager &#xff08;管理节点&#xff09;1.3 MHA 的特点 二、 MHA搭建准备2.1 实验思路2.2 实验准备 三、 MHA搭建的步骤…

变压器铜铝材质分析仪技术参数

一、主要技术指标 1.输入特性 有源部分&#xff1a; 电压测量范围&#xff1a;0~10V 电流测量范围&#xff1a;0~10A 无源部分&#xff1a; 电压测量范围&#xff1a;0~750V 宽量限&#xff08;可以外接电压互感器&#xff09;。 电流测量范围&#xff1a;0~100A内部自动…

kubernetes

目录 一、Kubernetes概述 为什么需要Kubernetes kubernetes为何能脱颖而出 传统部署时代 虚拟化部署时代 容器部署时代 二、Kubernetes 特性 自我修复 弹性伸缩 服务发现和负载均衡 存储编排 自动部署和回滚 自动完成装箱计算 密钥与配置管理 任务批处理运行 三…

文件上传靶场upload-labs通关

文章目录 前言Pass-01&#xff08;JavaScript绕过&#xff09;Step1、分析源码Step2、修改webshell文件后缀名Step3、修改报文重新发送Step4、使用webshell程序测试是否能连接 Pass-02&#xff08;MIME-Type绕过&#xff09;Step1、分析源码Step2、burp抓包&#xff0c;修改数据…

区块链中的共识机制以及共识算法

目录 什么是共识 什么是共识机制 共识机制类型 1、基于工作证明&#xff08;Proof of Work PoW&#xff09; PoW的特点 PoW是如何工作的&#xff1f; 挖矿 挖矿中的能源和时间消耗 挖矿奖励 比特币的PoW系统 工作证明的挑战 2、基于权益证明&#xff08;Proof of St…

Spring Boot 中的 Spring Cloud Feign

Spring Boot 中的 Spring Cloud Feign Spring Boot 是一个非常流行的 Java Web 开发框架&#xff0c;它提供了很多工具和组件来简化 Web 应用程序的开发。其中&#xff0c;Spring Cloud Feign 是 Spring Boot 中的一个非常重要的组件&#xff0c;它可以帮助我们实现声明式的 R…

tomcat多台应该怎么能设置

一个tomcat一般能处理5000-1000的并发量但是还是远远不够我们可以设置多台来满足我们的要求 首先进入tomcat目录 配置tomcat环境变量 vim /etc/profile.d/tomcat.sh 然后刷新 source /etc/profile.d/tomcat.sh 修改tomcat1里面的配置文件 然后进入tomcat1中的启动bin程序中…

Spring Cloud - HTTP 客户端 Feign 、自定义配置、优化、最佳实践

目录 一、Feign 是什么&#xff0c;有什么用呢&#xff1f; 二、Feign 客户端的使用 2.1、远程调用 1.引入依赖 2.在order-service&#xff08;发起远程调用的微服务&#xff09;的启动类添加注解开启Feign的功能 3.编写 Feign 客户端 4.通过 Feign 客户端发起远程调用 …

eNSP-静态路由表的配置

eNSP-静态路由表的配置 文章目录 eNSP-静态路由表的配置一、拓扑结构二、关键语句三、完整代码四、测试验证 一、拓扑结构 二、关键语句 ip route-static x.x.x.x y z.z.z.z 语法&#xff1a;目标网段 掩码 下一跳 例如 PC1所在网段访问PC2所在网段 在AR1中输入 ip route-st…