0基础学习PyFlink——用户自定义函数之UDF

大纲

  • 标量函数
    • 入参并非表中一行(Row)
    • 入参是表中一行(Row)
    • alias

PyFlink中关于用户定义方法有:

  • UDF:用户自定义函数。
  • UDTF:用户自定义表值函数。
  • UDAF:用户自定义聚合函数。
  • UDTAF:用户自定义表值聚合函数。

这些字母可以拆解如下:

  • UD表示User Defined(用户自定义);
  • F表示Function(方法);
  • T表示Table(表);
  • A表示Aggregate(聚合);
    在这里插入图片描述
    Aggregate(聚合)函数是指:以多行数据为输入,计算出一个新的值的函数。这块我们会在后续的章节介绍,本文我们主要介绍非聚合类型的用户自定义方法的简单使用。

标量函数

即我们常见的UDF。

def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:

我们主要关注result_type和input_types,它们分别用于确定函数的输入和输出。
input_types可以是List[DataType], DataType, str, List[str]之一任何一种,这个要视使用者决定。UDTF也是这种类型,它们没啥区别。
result_type只能是DataType或str;而UDTF可以是List[DataType], DataType, str, List[str]任意之一。这也是UDF和UDTF最大的区别。
我们以一个例子来介绍它的用法。这个例子会将大写字符转换成小写字符,然后统计字符出现的次数。
在介绍例子之前,我们先构造Execute之前的准备环境

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)

这段代码从读取数据word_count_data,并构造出tab_source作为输入数据暂存的表。下面我们看下入参不同时,UDF怎么写

入参并非表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())

input_types我们设置成[DataTypes.STRING()],即该数组中只有一个参数,也表示修饰的方法只有一个参数,类型是String。如果觉得input_types写起来麻烦,这个参数可以不设置。
result_type我们设置为一个DataTypes.ROW([DataTypes.FIELD(“lower_word”, DataTypes.STRING())])。我们可以把它看成是一个新表的结构描述,即一行只有一个字段——lower_word,它的类型也是String。

    tab_lower=tab_source.map(colFunc(col('word')))

map方法中,我们会给UDF修饰的方法传入原始表tab_source每行中的word字段的值。然后构造出一个新的表tab_lower。这个新的表没有word字段,只有UDF中result_type定义的lower_word。

def map(self, func: Union[Expression, UserDefinedScalarFunctionWrapper]) -> 'Table':

后续只要使用这个新表,新字段即可。

    tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source )# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)@udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])def colFunc(oneCol):return Row(oneCol.lower())tab_lower=tab_source.map(colFunc(col('word')))   tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':word_count()

入参是表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=row_type_tab_source)def rowFunc(row):return Row(row[0].lower())tab_lower=tab_source.map(rowFunc) tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

主要的区别是map方法直接传递udf修饰的方法,而不是直接其调用返回值。input_types是原始表的行结构——RowType,而不是一个参数数组。
map方法给rowFunc传递原始表tab_source的每行数据,然后构造出一个新表tab_lower。新表的字段也在udf的result_type中定义了,它是String类型的lower_word。后面我们对新表就要聚合统计这个新的字段,而不是老表中的字段。

alias

前面两个案例,在定义UDF时,我们严格设置了result_type和input_types。实际input_types可以不用设置,但是result_type必须设置。上面例子中,result_type我们都设置为RowType,即表行的结构。如果觉得这样写很麻烦,可以考虑使用alias来实现。

    @udf(result_type=DataTypes.STRING())def colFunc(oneCol):return oneCol.lower()tab_lower=tab_source.map(colFunc(col('word'))).alias('lower_word')tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()
    @udf(result_type=DataTypes.STRING())def rowFunc(row):return row[0].lower()tab_lower=tab_source.map(rowFunc).alias('lower_word')tab_lower.group_by(col('lower_word')) \.select(col('lower_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

这样我们在定义udf时,只是指定了返回类型是个字符串,也不知道它在新表中叫啥名字(实际叫f0)。但是为了便于后续使用,我们使用alias给它取了一个别名lower_word。这样就可以让其参与后续的计算了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/151275.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JS中面向对象的程序设计

面向对象(Object-Oriented,OO)的语言有一个标志,那就是它们都有类的概念,而通过类可以创建任意多个具有相同属性和方法的对象。但在ECMAScript 中没有类的概念,因此它的对象也与基于类的语言中的对象有所不…

向量数据库Weaviate Cloud 和 Milvus Cloud:性能大比拼

最近,随着检索增强生成系统(RAG)的持续火爆,开发者对于“如何选择一个向量数据库”的疑惑也越来越多。过去几周,我们从性能和特性能力两个方面对 Weaviate Cloud 和 MilvusCloud 进行了详细的对比。在对比过程中,我们使用了开源的性能基准测试套件 VectorDBBench,围绕诸…

黔院长 | 黄帝内经:人有四经十二从!

"人有四经十二从"这句话出自《黄帝内经素问》,“四经”指的是与四时相应的正常脉象,也是指四个主要经络:太阳经、少阳经、太阴经和少阴经。在中医理论当中这些经络被认为是人体气血运行的通道。 而“十二从”则表示人体的十二个经脉…

为什么网上的流量卡都有禁发地区呢?流量卡管控地区整理!

在网上购买过流量卡的朋友应该都知道,但凡是运营商推出的大流量优惠套餐,在套餐详情中都是有禁发地区,只不过每张卡的禁发地区不同而已。 设置禁发地区的主要目的还是为了防止一些电信诈骗案件的发生,或者违法违规利用电话卡的情…

jvm垃圾回收算法有哪些及原理

目录 垃圾回收器1 Serial收集器2 Parallel收集器3 ParNew收集器4 CMS收集器5 G1回收器三色标记算法标记算法的过程三色标记算法缺陷多标漏标 垃圾回收器 垃圾回收机制,我们已经知道什么样的对象会成为垃圾。对象回收经历了什么——垃圾回收算法。那么谁来负责回收垃…

VulnHub Metasploitable-2

一、信息收集 nmap扫描 访问80端口 二、漏洞利用 1.漏洞一 1.vsftpd 2.3.4(CVE-2011-2523) 2.msf msf6 > search vsftpd msf6 > use 0 msf6 exploit(unix/ftp/vsftpd_234_backdoor) > set rhosts 192.168.103.189 msf6 exploit(unix/ftp/vs…

R语言入门看这一章就够了(上)

目录 一、R的基础 1.1、R的安装 1.2、牛刀小试 1.3、线性关系实例 1.4、工作空间 1.5、R包的使用 包的安装 结果的重用 二、R数据集 2.1、向量 2.2、矩阵 2.3、数组 2.4、数据框 2.5、列表 三、R的常用命令 四、list列表详解 五、数据源导入方法 5.1、键盘输…

Mybatis-Plus(企业实际开发应用)

一、Mybatis-Plus简介 MyBatis-Plus是MyBatis框架的一个增强工具,可以简化持久层代码开发MyBatis-Plus(简称 MP)是一个 MyBatis 的增强工具,在 MyBatis 的基础上只做增强不做改变,为简化开发、提高效率而生。 官网&a…

Hadoop、Hive安装

一、 工具 Linux系统:Centos,版本7.0及以上 JDK:jdk1.8 Hadoop:3.1.3 Hive:3.1.2 虚拟机:VMware mysql:5.7.11 工具下载地址: https://pan.baidu.com/s/1JYtUVf2aYl5–i7xO6LOAQ 提取码: xavd…

TCP/IP(十七)实战抓包分析(一)ICMP

一 TCP实战抓包分析 网络排查案例 ① 抓包分析涉及的内容 关于: TCP理论知识和tcpdump命令的知识,前面已经铺垫过了,这里不再赘述下面罗列了TCP的重点知识 客户端工具: curl、wget、postman、telnet、浏览器、ncwget --bind-addressADDRESS 指定…

配置Super-VLAN下的DHCP服务器示例

组网需求 如图1所示,某公司拥有两个部门,为了节省IP地址,部门A和部门B规划为同一网段;为了提升业务安全性,将不同部门的用户划分到不同VLAN中。企业管理员为了方便统一管理,希望部门内终端通过DHCP服务器动…

UE4 使用材质后期 制作玻璃有雨效果

效果展示,其实这是一个动画效果 以上为所有逻辑 拿到TexCoord给到Panner,Time和Speed都是通过下面计算而来,后面讲,再拿到时间和速度值过后,加上扰动值,最后取G值,因为雨事从上而下的动&#xf…