Spark---DataFrame存储、Spark UDF函数、UDAF函数

四、DataFrame存储+Spark UDF函数

1、储存DataFrame

1)、将DataFrame存储为parquet文件

2)、将DataFrame存储到JDBC数据库

3)、将DataFrame存储到Hive表

2、UDF:用户自定义函数

可以自定义类实现UDFX接口

java:

SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Row call(String s) throws Exception {
return RowFactory.create(s);}
});List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD,schema);
df.registerTempTable("user");/*** 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx*/
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(String t1) throws Exception {return t1.length();}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();//sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
//
//	/**
//	 * 
//	 */
//	private static final long serialVersionUID = 1L;
//
//	@Override
//	public Integer call(String t1, Integer t2) throws Exception {
//return t1.length()+t2;
//	}
//} ,DataTypes.IntegerType );
//sqlContext.sql("select name ,StrLen(name,10) as length from user").show();sc.stop();	

scala:

1.val spark = SparkSession.builder().master("local").appName("UDF").getOrCreate()
2.val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhaoliu", "tianqi")
3.import spark.implicits._
4.val nameDF: DataFrame = nameList.toDF("name")
5.nameDF.createOrReplaceTempView("students")
6.nameDF.show()
7.
8.spark.udf.register("STRLEN",(name:String)=>{
9.name.length
10.})
11.spark.sql("select name ,STRLEN(name) as length from students order by length desc").show(100)

五、UDAF函数

1、UDAF:用户自定义聚合函数

1)、实现UDAF函数如果要自定义类要继承

UserDefinedAggregateFunction类

2)、UDAF原理图

java:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("udaf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu","zhangsan","zhangsan","lisi"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Row call(String s) throws Exception {return RowFactory.create(s);}
});List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/*** 注册一个UDAF函数,实现统计相同值得个数* 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的*/
sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {/*** */private static final long serialVersionUID = 1L;/*** 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑* buffer.getInt(0)获取的是上一次聚合后的值* 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合 * 大聚和发生在reduce端.* 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算*/@Overridepublic void update(MutableAggregationBuffer buffer, Row arg1) {buffer.update(0, buffer.getInt(0)+1);}/*** 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来* buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值       * buffer2.getInt(0) : 这次计算传入进来的update的结果* 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));}/*** 指定输入字段的字段及类型*/@Overridepublic StructType inputSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));}/*** 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);}/*** 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果*/@Overridepublic Object evaluate(Row row) {return row.getInt(0);}@Overridepublic boolean deterministic() {//设置为truereturn true;}/*** 指定UDAF函数计算后返回的结果类型*/@Overridepublic DataType dataType() {return DataTypes.IntegerType;}/*** 在进行聚合操作的时候所要处理的数据的结果的类型*/@Overridepublic StructType bufferSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));}});sqlContext.sql("select name ,StringCount(name) from user group by name").show();sc.stop();

scala:

1.class MyCount extends UserDefinedAggregateFunction{
2.  //输入数据的类型
3.  override def inputSchema: StructType =    StructType(List[StructField](StructField("xx",StringType,true)))
4.
5.  //在聚合过程中处理的数据类型
6.  override def bufferSchema: StructType =   StructType(List[StructField](StructField("xx",IntegerType,true)))
7.
8.  //最终返回值的类型,与evaluate返回的值保持一致
9.  override def dataType: DataType = IntegerType
10.
11.  //多次运行数据是否一致
12.  override def deterministic: Boolean = true
13.
14.  //每个分区中每组key 对应的初始值
15.  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,0)
16.
17.  //每个分区中,每个分组内进行聚合操作
18.  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
19.    buffer.update(0,buffer.getInt(0) + 1)
20.  }
21.
22.  //不同的分区中相同的key的数据进行聚合
23.  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
24.    buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0))
25.  }
26.
27.  //聚合之后,每个分组最终返回的值,类型要和dataType 一致
28.  override def evaluate(buffer: Row): Any = buffer.getInt(0)
29.}
30.
31.object Test {
32.  def main(args: Array[String]): Unit = {
33.    val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
34.    val list = List[String]("zhangsan","lisi","wangwu","zhangsan","lisi","zhangsan")
35.
36.    import session.implicits._
37.    val frame = list.toDF("name")
38.    frame.createTempView("mytable")
39.
40.    session.udf.register("MyCount",new MyCount())
41.
42.    val result = session.sql("select name,MyCount(name) from mytable group by name")
43.    result.show()
44.
45.  }
46.}
47.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/258209.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringMVC 案例

文章目录 前言1. 计算器1.1 准备前端代码1.2 测试前端代码1.3 完成后端代码1.4 验证程序 2. 留言板2.1 前端代码准备2.2 测试前端代码2.3 完成前后端交互代码2.4 完成后端代码2.5 案例测试2.6 完善前后端交互2.7 完善后端代码2.8 完整功能测试 lombok简单的方式添加Lombok工具3…

RHEL8_Linux访问NFS存储及自动挂载

本章主要介绍NFS客户端的使用 创建FNS服务器并通过NFS共享一个目录在客户端上访问NFS共享的目录自动挂载的配置和使用 1.访问NFS存储 前面介绍了本地存储&#xff0c;本章就来介绍如何使用网络上的存储设备。NFS即网络文件系统&#xff0c;所实现的是 Linux 和 Linux 之间的共…

线程池的使用及实现

使用多进程进行并发编程&#xff0c;会频繁的创建销毁进程&#xff0c;效率比较慢&#xff0c;所以引入了线程&#xff0c;线程使用复用资源的方式提高了创建销毁的效率&#xff0c;但是随着创建线程的频率进一步提高&#xff0c;开销仍然无法忽略不计了。 要想办法优化此处线…

亚信安慧AntDB数据库中级培训ACP上线,中国移动总部首批客户认证通过

近日&#xff0c;亚信安慧AntDB数据库ACP&#xff08;AntDB Certified Professional&#xff09;中级培训课程于官网上线。在中国移动总部客户运维团队、现场项目部伙伴和AntDB数据库成员的协同组织下&#xff0c;首批中级认证学员顺利完成相关课程的培训&#xff0c;并获得Ant…

一键抠图3:Android实现人像抠图 (Portrait Matting)

一键抠图3&#xff1a;Android实现人像抠图 (Portrait Matting) 目录 一键抠图3&#xff1a;Android实现人像抠图 (Portrait Matting) 1. 前言 2. 抠图算法 3. 模型Android部署 &#xff08;1&#xff09; 将Pytorch模型转换ONNX模型 &#xff08;2&#xff09; 将ONNX模…

前端学习微信小程序开发

1.微信小程序项目结构 2.WXML和HTML的区别 3.WXSS与CSS的区别 4.小程序中的.js文件 5.小程序的宿主环境 宿主环境是指程序运行所必须的依赖环境&#xff0c;因此手机微信时小程序的宿主环境。小程序宿主环境包含了通信模型、运行机制、组件、API。 &#xff08;1&#xff09;…

CSS import 规则

导入 “navigation.css” 样式到当前的样式表&#xff1a; import “navigation.css”; /* 使用字符串 / 或者 import url(“navigation.css”); / 使用 url 地址 */ 属性定义及使用说明 CSS import 用于从其他样式表导入样式规则。 import 规则必须在 CSS 文档的头部&#xff…

Ignis - Interactive Fire System

Ignis - 点火、蔓延、熄灭、定制! 全方位火焰系统。 这个插件在21年的项目中使用过很好用值使用概述 想玩火吗?如果想的话,那么Ignis就是你的最佳工具。有了Ignis,你可以把任何物体、植被或带皮带骨的网状物转换为可燃物体,它就会自动着火。然后,火焰可以蔓延,点燃其他物…

初识人工智能,一文读懂过拟合欠拟合和模型压缩的知识文集(3)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

深入了解Git LFS:高效管理大型文件的利器

今天在使用CodeUp上传代码时&#xff0c;我为项目添加了一个大小超过300MB的文件。在进行push操作时&#xff0c;系统提示我“推送失败&#xff0c;以下文件大小超过单文件200MB的系统限额&#xff0c;大文件请使用Git-LFS管理”。于是我开始了解Git LFS。对于需要处理大型二进…

oracle 拼接语句怎么写?

||的妙用&#xff0c;字符串和变量列名之间都得用||分隔&#xff0c;oracle等数据库两个单引号输出一个单引号&#xff0c;因为如果只写了一个的话 他会和最近的单引号被数据库认为是组成了一个空字符串&#xff0c;因此需要用两个单引号来表示这是个单引号 查询列间用&#xf…

力扣541.反转字符串 II

文章目录 力扣541.反转字符串 II示例代码实现总结收获 力扣541.反转字符串 II 示例 代码实现 class Solution {public String reverseStr(String s, int k) {char[] ans s.toCharArray();for(int i0;i<ans.length;i2*k){int begin i;int end Math.min(ans.length-1,begin…