SparkSql---用户自定义函数UDFUDAF

文章目录

  • 1.UDF
  • 2.UDAF
    • 2.1 UDF函数实现原理
    • 2.2需求:计算用户平均年龄
      • 2.2.1 使用RDD实现
      • 2.2.2 使用UDAF弱类型实现
      • 2.2.3 使用UDAF强类型实现

1.UDF

用户可以通过 spark.udf 功能添加自定义函数,实现自定义功能。

如:实现需求在用户name前加上"Name:"字符串,并打印在控制台

  def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._//创建DataFrameval dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24)))val dataframe = dataRDD.toDF("name","age")//注册udf函数sc.udf.register("addName",(x:String)=>"Name:"+x)//创建临时视图dataframe.createOrReplaceTempView("people")//对临时视图使用udf函数sc.sql("select addName(name) from people").show()sc.stop()}

在这里插入图片描述

2.UDAF

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。**通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。**从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator。

2.1 UDF函数实现原理

在这里插入图片描述
在Spark中,UDF(用户自定义函数)在对表中的数据进行处理时,通常会将数据放入缓冲区中以便进行计算。这种缓冲策略可以提高数据处理的效率,特别是对于大数据集。

2.2需求:计算用户平均年龄

2.2.1 使用RDD实现

    val dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24),("wangwu",26)))val reduceResult: (Int, Int) = dataRDD.map({case (name, age) => {(age, 1)}}).reduce((t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)})println(reduceResult._1/reduceResult._2)

在这里插入图片描述

2.2.2 使用UDAF弱类型实现

需要用户自定义类实现UserDefinedAggregateFunction,并重写其中的方法,当前已不推荐使用。

package bigdata.wordcount.udfimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/
object UDF_Demo02 {def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))val dataFrame: DataFrame = dataRDD.toDF("name","age")dataFrame.createOrReplaceTempView("user")//创建聚合函数var myAvg=new MyAverageUDAF()//在Spark中注册自定义的聚合函数sc.udf.register("avgMy",myAvg)sc.sql("select avgMy(age) from user").show()sc.stop()}case class User(var name:String,var age:Int)}class MyAverageUDAF extends UserDefinedAggregateFunction{//输入的要进行聚合的参数的类型override def inputSchema: StructType = StructType(Array(StructField("age",IntegerType)))//聚合函数缓冲区中的值的数据类型override def bufferSchema: StructType = StructType(Array(StructField("sum",LongType),StructField("count",LongType)))//函数返回的值的数据类型override def dataType: DataType = DoubleType//判断函数的稳定性//对于相同类型的输入是否有相同类型的输出override def deterministic: Boolean = true//聚合函数缓冲区中值的初始化//因为数据是弱类型的,函数缓冲区中是根据索引来找到对应的变量override def initialize(buffer: MutableAggregationBuffer): Unit = {//年龄的总和buffer(0)=0L//年龄的个数buffer(1)=0L}//更新缓冲区中的数据(执行操作步骤)override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={//第0个索引值是否为空if(!input.isNullAt(0)) {//更新年龄sum的值buffer(0)=buffer.getLong(0)+input.getInt(0)//更新年龄个数buffer(1)=buffer.getLong(1)+1;}}//合并缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}//计算最终结果override def evaluate(buffer: Row): Double = {buffer.getLong(0).toDouble / buffer.getLong(1)}
}

在这里插入图片描述

2.2.3 使用UDAF强类型实现

Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction

package bigdata.wordcount.udfimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/
object UDF_Demo03 {def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))val dataFrame: DataFrame = dataRDD.toDF("name","age")val dataset: Dataset[User01] = dataFrame.as[User01]//创建聚合函数var myAvg=new MyAverageUDAF01()//将聚合函数转换为查询的列val col: TypedColumn[User01, Double] = myAvg.toColumn//执行查询操作dataset.select(col).show()sc.stop()}case class User(var name:String,var age:Int)}//输入数据类型
case class User01(var name:String,var age:Int)
//缓存中的数据类型
case class AgeBuffer(var sum:Long,var count:Long)class MyAverageUDAF01 extends Aggregator[User01,AgeBuffer,Double]{//设置初始值override def zero: AgeBuffer = {AgeBuffer(0L,0L)}//缓冲区实现聚合override def reduce(b: AgeBuffer, a: User01): AgeBuffer = {b.sum = b.sum + a.ageb.count = b.count + 1b}//合并缓冲区override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {b1.sum+=b2.sumb1.count+=b2.countb1}//计算最终结果override def finish(buff: AgeBuffer): Double = {buff.sum.toDouble/buff.count}//设置编码器和解码器//自定义类型就是 product 自带类型根据类型选择override def bufferEncoder: Encoder[AgeBuffer] = {Encoders.product}override def outputEncoder: Encoder[Double] = {Encoders.scalaDouble}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/437940.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

河南冷链物流盛典 华鼎科技引领行业创新共筑冷链强省梦

近日,由河南省商务厅指导、河南省物流协会主办的在郑州举行,本次大会以“创新驱动未来”为主题,近300名冷链物流行业精英、专家学者等参加了本届盛典。 河南省委、省政府高度重视物流业发展,出台了《河南省“十四五”现代物流业发展规划》、…

js数组/对象的深拷贝与浅拷贝

文章目录 一、js中的深拷贝和浅拷贝二、浅拷贝1、Object.assign()2、利用es6扩展运算符(...) 二、深拷贝1、JSON 序列化和反序列化2、js原生代码实现3、使用第三方库lodash等 四、总结 一、js中的深拷贝和浅拷贝 在JS中,深拷贝和浅拷贝是针对…

分析一个项目(微信小程序篇)二

目录 首页: 发现: 购物车: 我的: 分析一个项目讲究的是如何进行对项目的解析分解,进一步了解项目的整体结构,熟悉项目的结构,能够知道每个组件所处在哪个位置,发挥什么作用。 接…

多符号表达式的共同子表达式提取教程

生成的符号表达式,可能会存在过于冗长的问题,且多个符号表达式中,有可能存在相同的计算部分,如果不进行处理,计算过程中会导致某些算式计算多次,从而影响计算效率。 那么多个符号表达式生成函数时&#xf…

echarts坐标轴文字样式

https://echarts.apache.org/zh/option.html#xAxis.nameTextStyle xAxis. nameTextStyle、 yAxis: {type: value,// max: -0.15,name: 沉降累计值/mm,nameTextStyle: {padding: [0, 0, 0, 10],color: #93B8E2,fontSize: 12,fontFamily: Alibaba-PuHuiTi-R},splitLine: {show:…

如何将mp3转m4a?3种方法帮你轻松转换

如何将mp3转m4a?在日常生活中,将MP3转换为M4A格式可以带来很多便利。M4A是苹果设备常用的音频格式,转换后可以方便地在iPhone、iPad等设备上播放。此外,M4A的音质通常优于MP3,提供更清晰、更丰富的声音。通过使用音频转…

进京证12次不够用怎么办?(北京进京证探头分布,进京证365,进京365)外地车在京如何行驶——躲猫猫外地车在京地图导航

其实想要在北京驾驶外地牌照的车辆主要有两种方式,一种是办理进京证(六环内进京证一年只能办12次,一次有效期7天,所以大多数人是不够用的);另一种就是在非监控区域行驶,可以借助于一些摄像头定位工具,有效躲避摄像头&a…

VirtualBox:安装提示缺少python core和win32 api

最近升级了Ubuntu22.04,查了一下,VirtualBox的增强功能,居然用时5分钟。 5min 18ms vboxadd.service 据说升级VirtualBox的增强功能能解决这个问题,于是先升级VirtualBox,但是安装VirtualBox却报缺少python core及win3…

AHK学习,诡异的早起,舒畅地打篮球——2024 第4周总结

活神仙 引言颓 周六周日理清当前老问题新问题 总结当前之前的老问题 学习的AHKAHK历程AHK作用和适合人群 我帮别人解决的AHK例子我自用的AKH功能结尾 引言 今天才写周总结 是因为这两天有点颓 颓在哪里呢? 请听我细细说来 水文 技术有 AHK的,不想看可以…

前端性能优化——图片压缩和懒加载

图片压缩 使用第三方工具手动压缩图片使用Webpack工具在打包时自动压缩图片 这里主要介绍第二种方法。 (1)将小于某个大小的图片转化成 data URI 形式(Base64 格式),减少请求数量,但是体积变得更大 modu…

IDEA 构建开发环境

本博客主要讲解了如何创建一个Maven构建Java项目。(本文是创建一个用Maven构建项目的方式,所以需要对Maven有一定的了解) IDEA 构建开发环境 一、创建一个空工程二、构建一个普通的Maven模块 一、创建一个空工程 创建一个空的工程 * 设置整…