Mysql->Hudi->Hive

一  准备

1.启动集群 /hive/mysql

start-all.sh

2.启动spark-shell

spark-shell \--master yarn \
//--packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.12.2 \--jars /opt/software/hudi-spark3.1-bundle_2.12-0.12.0.jar \--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

3.导入依赖包

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SaveMode, SparkSession} 
val tableName="demotable"
val hudiBasePath="hdfs://bigdata1:9000//user/hudi/hudi_ods.db/" + tableName

二 查询mysql数据

val DB_URL="jdbc:mysql://bigdata1:3306/ds_db01?allowPublicKeyRetrieval=true&serverTimezone=UTC&useSSL=false"
连接mysql
val df21 = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver")  // mysql驱动程序类名.option("url", DB_URL)		  // 连接url.option("dbtable", "customer_inf")	  // 要读取的表.option("user", "root")		  // 连接账户,需修改为自己的.option("password","123456")		  // 连接密码,需修改为自己的.option("fetchsize","50")                  // 每轮读取多少行.load()

 --------------------  转换Transform-增加静态分区列 -------------
import spark.implicits._

查询
println(df21.count())   
df21.show(10) 

三 追加写入Hudi

val df22 = df21.withColumn("etl_date",lit("20220816"))// val df22 = df21.withColumn("etl_date",date_format(date_sub(current_date(),1),"yyyyMMdd"))
val dfWithTs = df22.withColumn("ts", current_timestamp())dfWithTs.write.format("hudi")
.mode(SaveMode.Overwrite)
.option("hoodie.insert.shuffle.parallelism","2")//操作并行度为2
.option("hoodie.upsert.shuffle.parallelism","2")
.option(RECORDKEY_FIELD.key(), "customer_inf_id")//记录键的字段名,作为hudi的主键
.option(PARTITIONPATH_FIELD.key(), "etl_date") 
.option(TBL_NAME.key(), tableName)
.save(hudiBasePath).option(PRECOMBINE_FIELD.key(), "InPutTime")//预聚合字段名
.option("hoodie.timestamp.field","modified_time")
.option("hoodie.timestamp.field","birthday")
.option("hoodie.timestamp.field","etl_date")
.option("hoodie.timestamp.field","register_time")
查询

val  env_data_df=spark.read.format("org.apache.hudi").load(hudiBasePath)

println(env_data_df.count())

env_data_df.show()

四 外接Hive

val sql_create_table =s"""|create table hudi_demo.demotable(|customer_inf_id int,|  customer_id int,|  customer_name string ,|  identity_card_type tinyint ,|  identity_card_no string,|  mobile_phone string,|  customer_email string ,|  gender string ,|  customer_point int,|  register_time timestamp ,|  birthday date ,|  customer_level tinyint  ,|  customer_money decimal,|  modified_time string,|  ts timestamp,|  etl_date string|)|using hudi|tblproperties(|    primaryKey = 'customer_inf_id',|    type = 'cow'|)| options (|    hoodie.metadata.enable = 'true'| )|partitioned by (etl_date)|location '$hudiBasePath'|""".stripMarginspark.sql(sql_create_table)

查询分区

hive查询

FAILED: RuntimeException java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat
87308 [a3eed69d-1888-48fb-82f7-7254909d770f main] ERROR org.apache.hadoop.hive.ql.Driver  - FAILED: RuntimeException java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat
 

报错Hudi集成Hive时的异常解决方法 java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat_田昕峣 Richard的博客-CSDN博客
原因

缺少相应的jar包org.apache.hudi.hadoop.HoodieParquetInputFormat

查看hudi的pom文件发现hive版本为2.3.1

重新编译构建

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/106277.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】双向链表详解

当我们学习完单链表后,双向链表就简单的多了,双向链表中的头插,尾插,头删,尾删,以及任意位置插,任意位置删除比单链表简单,今天就跟着小张一起学习吧!! 双向链…

12个微服务架构模式最佳实践

微服务架构是一种软件开发技术,它将大型应用程序分解为更小的、可管理的、独立的服务。每个服务负责特定的功能,并通过明确定义的 API 与其他服务进行通信。微服务架构有助于实现软件系统更好的可扩展性、可维护性和灵活性。 接下来,我们将介…

vue中预览xml并高亮显示

项目中有需要将接口返回的数据流显示出来&#xff0c;并高亮显示&#xff1b; 1.后端接口返回blob,类型为xml,如图 2.页面中使用pre code标签&#xff1a; <pre v-if"showXML"><code class"language-xml">{{xml}}</code></pre> …

RJ45水晶头网线顺序出错排查

线序 网线水晶头RJ45常用的线序标准ANSI / TIA-568定义了T568A与T568B两种线序&#xff0c;一般使用T568B&#xff0c;水晶头8个孔对应的8条线颜色如下图&#xff1a; 那1至8的编号&#xff0c;是从水晶头哪一面为参考呢&#xff0c;如下图&#xff0c;是水晶头金手指一面&am…

Docker从认识到实践再到底层原理(四-2)|Docker镜像仓库实战案例

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 首先是博主的高质量博客的汇总&#xff0c;这个专栏里面的博客&#xff0c;都是博主最最用心写的一部分&#xff0c;干货满满&#xff0c;希望对大家有帮助。 高质量博客汇总 然后就是博主最近最花时间的一个专栏…

SVN 索引版本与打包版本号不匹配

今天突然遇到了一个问题&#xff0c;SVN上传不了&#xff0c;错误提示如下&#xff1a; 解决方法&#xff1a; 1.其实&#xff0c;这是SVN库不小心搞坏了&#xff0c;只能重新再创建一个SVN仓库了。

基于Hugo 搭建个人博客网站

目录 1.环境搭建 2.生成博客 3.设置主题 4.将博客部署到github上 1.环境搭建 1&#xff09;安装Homebrew brew是一个在 macOS 操作系统上用于管理软件包的包管理器。类似于centos下的yum或者ubuntu下的apt&#xff0c;它允许用户通过命令行安装、更新和管理各种软件工具、…

自动化测试:Selenium中的时间等待

在 Selenium 中&#xff0c;时间等待指在测试用例中等待某个操作完成或某个事件发生的时间。Selenium 中提供了多种方式来进行时间等待&#xff0c;包括使用 ExpectedConditions 中的 presence_of_element_located 和 visibility_of_element_located 方法等待元素可见或不可见&…

受检异常和非受检异常

异常 非受检异常和受检异常&#xff0c;都是继承自 Throwable 这个类中&#xff0c;分别是 Error 和 Exception&#xff0c; Error 是程序报错&#xff0c;系统收到无法处理的错误消息&#xff0c;它和程序本身无关。 Excetpion 是指程序运行时抛出需要处理的异常信息如果不主…

博客系统项目

文章目录 数据库的增删改查草稿箱草稿箱自动保存分页查询后端前端 评论区后端前端 md5加盐加密 md5加盐对用户密码进行加密; 全服用户博客列表页,实现分页查询; 用户博客列表页; 写博客,发博客,改博客; 博客草稿箱,自动保存,定时发布; 博客访问量,博客评论区,博客点赞; 数据库…

d3.js 的使用

这篇文章相当于之前 svg 的补充。 因为 svg 代码肯定不是人为去专门写的。 在这里推荐制作 svg 的第三方库 - D3.js 用于定制数据可视化的JavaScript库 - D3 官网地址&#xff1a; D3 by Observable | The JavaScript library for bespoke data visualization 简单使用 画…

openGauss学习笔记-65 openGauss 数据库管理-创建和管理数据库

文章目录 openGauss学习笔记-65 openGauss 数据库管理-创建和管理数据库65.1 前提条件65.2 背景信息65.3 注意事项65.4 操作步骤65.4.1 创建数据库65.4.2 查看数据库65.4.3 修改数据库65.4.4 删除数据库 openGauss学习笔记-65 openGauss 数据库管理-创建和管理数据库 65.1 前提…