6. Calcite添加自定义函数

news/2025/3/4 21:01:17/文章来源:https://www.cnblogs.com/ludangxin/p/18751135

1. 简介

在上篇博文中介绍了如何使用calcite进行sql验证, 但是真正在实际生产环境中我们可能需要使用到

  1. 用户自定义函数(UDF): 通过代码实现对应的函数逻辑并注册给calcite
    • sql验证: 将UDF信息注册给calcite, SqlValidator.validator验证阶段即可通过验证
    • sql执行: calcite通过调用UDF逻辑实现函数逻辑
  2. 自定义db函数: 数据库中创建的自定义函数
    • sql验证: 将自定义的db函数信息注册给calcite, SqlValidator.validator验证阶段即可通过验证
    • sql执行: 下推到db执行对应的db函数

此时我们需要将自定义的函数注册到calcite中, 用于sql验证和执行. 例如注册一个简单的函数 如: 将数据库中的性别字段值做字典转换.

2. Maven

<dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-core</artifactId><version>1.37.0</version>
</dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version>
</dependency>

2. UDF

如上述所说, UDF是将用户自定义的方法注册为函数使用的, 首先看一下calcite是如何注册UDF的

SchemaPlus#add(String name, Function function);

其Function的实现类如下:

  1. 定义UDF实现

    public class Udf {public static String dictSex(String code) {if (StringUtils.isBlank(code)) {return code;}if (StringUtils.equals(code, "1")) {return "男";} else if (StringUtils.equals(code, "2")) {return "女";}else {return "未知";}}
    }
    
  2. dictSex方法注册到calcite中, 因为上述的方法输入返回的都是单一值, 所以直接注册为标量函数即可(如果是聚合函数可以使用AggregateFunction)

    // 指定函数名称 和 对应函数的class & method name
    rootSchema.add("dict_sex", ScalarFunctionImpl.create(Udf.class, "dictSex"));
    
  3. 测试执行

    final ResultSet resultSet = statement.executeQuery("SELECT username, dict_sex(sex) sex_name FROM `user`");
    printResultSet(resultSet);
    

    表数据如下

    输出结果

    c.l.c.CalciteFuncTest - [printResultSet,86] - Number of columns: 2
    c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=男, username=张三}
    c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=李四}
    c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=张铁牛}
    

3. 自定义db函数

首先 我们定义一个db 函数实现字典值的转换

DELIMITER //CREATE FUNCTION dict_sex(code VARCHAR(10))
RETURNS VARCHAR(10)
DETERMINISTIC
BEGIN-- 如果code为空或只包含空白字符,则直接返回codeIF code IS NULL OR TRIM(code) = '' THENRETURN code;END IF;-- 如果code为'1'则返回'男'IF code = '1' THENRETURN '男';-- 如果code为'2'则返回'女'ELSEIF code = '2' THENRETURN '女';ELSERETURN '未知';END IF;
END //DELIMITER ;

验证函数功能

ok, 函数创建完成, 我们将函数注册到calcite中

calcite中sqlfunction有很多其已经实现的类, 我们这里使用SqlBasicFunction来创建我们的函数

  1. 定义SqlFunction

    /** SqlBasicFunction create(String name, SqlReturnTypeInference returnTypeInference, SqlOperandTypeChecker operandTypeChecker)* name: 函数名称* returnTypeInference: 返回值类型* operandTypeChecker: 函数入参的校验器*/
    SqlFunction DICT_SEX = SqlBasicFunction.create("dict_sex", ReturnTypes.VARCHAR, OperandTypes.family(SqlTypeFamily.CHARACTER));
    
  2. 注册SqlFunction

    从上篇博文中我们知道, calcite的sql函数都注册到了SqlStdOperatorTable类中, 所以我们只需要将自定义的函数注册进即可

    final SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();
    sqlStdOperatorTable.register(DICT_SEX);
    

    对, 就这么简单. 因为SqlStdOperatorTable类是单例模式, 所以我们可以随时随地的进行注册, 其验证逻辑就可以直接调用了

    当然, 看了其他博客大多数都是继承SqlStdOperatorTable类实现自定义SqlStdOperatorTable的 如下, 最后使用自己的SqlStdOperatorTable即可

    public static class SqlCustomOperatorTable extends SqlStdOperatorTable {private static SqlCustomOperatorTable instance;// 只需要申明为成员变量即可, instance.init() 的时候会反射取变量进行注册public static final SqlFunction DICT_SEX = SqlBasicFunction.create("dict_sex", ReturnTypes.VARCHAR, OperandTypes.family(SqlTypeFamily.CHARACTER));public static synchronized SqlCustomOperatorTable instance() {if (instance == null) {instance = new SqlCustomOperatorTable();instance.init();}return instance;}/*** 如果想修改获取函数的过程, 可以重写此方法*/@Overrideprotected void lookUpOperators(String name, boolean caseSensitive, Consumer<SqlOperator> consumer) {super.lookUpOperators(name, caseSensitive, consumer);}
    }
    
  3. 测试执行

    final ResultSet resultSet = statement.executeQuery("SELECT username, dict_sex(sex) sex_name FROM `user`");
    printResultSet(resultSet);
    

    输出结果

    c.l.c.CalciteFuncTest - [printResultSet,86] - Number of columns: 2
    c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=男, username=张三}
    c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=李四}
    c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=张铁牛}
    

    经测试: 如果udf 和 sqlfunction 同时存在的时候 优先使用udf

4. 完整代码

4.1 udf

package com.ldx.calcite;import com.google.common.collect.Maps;
import com.mysql.cj.jdbc.MysqlDataSource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;import static org.apache.calcite.config.CalciteConnectionProperty.LEX;@Slf4j
public class CalciteFuncWithUdfTest {private static Statement statement;@BeforeAll@SneakyThrowspublic static void beforeAll() {Properties info = new Properties();// 不区分sql大小写info.setProperty("caseSensitive", "false");info.setProperty(LEX.camelName(), Lex.MYSQL.name());// 创建Calcite连接Connection connection = DriverManager.getConnection("jdbc:calcite:", info);CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);// 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下SchemaPlus rootSchema = calciteConnection.getRootSchema();// 设置默认的schema, 如果不设置sql中需要加上对应数据源的名称calciteConnection.setSchema("my_mysql");final DataSource mysqlDataSource = getMysqlDataSource();final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null);final SchemaPlus myMysqlSchema = rootSchema.add("my_mysql", schemaWithMysql);// 全局注册rootSchema.add("dict_sex", ScalarFunctionImpl.create(Udf.class, "dictSex"));statement = calciteConnection.createStatement();// 只注册到mysql schema中// myMysqlSchema.add("dict_sex", ScalarFunctionImpl.create(Udf.class, "dictSex"));// 创建SQL语句执行查询statement = calciteConnection.createStatement();}@Test@SneakyThrowspublic void test_udf_func() {final ResultSet resultSet = statement.executeQuery("SELECT username, dict_sex(sex) sex_name FROM `user`");printResultSet(resultSet);}private static DataSource getMysqlDataSource() {MysqlDataSource dataSource = new MysqlDataSource();dataSource.setUrl("jdbc:mysql://localhost:3306/test");dataSource.setUser("root");dataSource.setPassword("123456");return dataSource;}public static void printResultSet(ResultSet resultSet) throws SQLException {// 获取 ResultSet 元数据ResultSetMetaData metaData = resultSet.getMetaData();// 获取列数int columnCount = metaData.getColumnCount();log.info("Number of columns: {}",columnCount);// 遍历 ResultSet 并打印结果while (resultSet.next()) {final Map<String, String> item = Maps.newHashMap();// 遍历每一列并打印for (int i = 1; i <= columnCount; i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);item.put(columnName, columnValue);}log.info(item.toString());}}
}

4.2 db func

package com.ldx.calcite;import com.google.common.collect.Maps;
import com.mysql.cj.jdbc.MysqlDataSource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlBasicFunction;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;import static org.apache.calcite.config.CalciteConnectionProperty.LEX;@Slf4j
public class CalciteFuncWithDbTest {private static Statement statement;public static final SqlFunction DICT_SEX = SqlBasicFunction.create("dict_sex", ReturnTypes.VARCHAR, OperandTypes.family(SqlTypeFamily.CHARACTER));@BeforeAll@SneakyThrowspublic static void beforeAll() {Properties info = new Properties();// 不区分sql大小写info.setProperty("caseSensitive", "false");info.setProperty(LEX.camelName(), Lex.MYSQL.name());// 创建Calcite连接Connection connection = DriverManager.getConnection("jdbc:calcite:", info);CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);// 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下SchemaPlus rootSchema = calciteConnection.getRootSchema();// 设置默认的schema, 如果不设置sql中需要加上对应数据源的名称calciteConnection.setSchema("my_mysql");final DataSource mysqlDataSource = getMysqlDataSource();final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null);rootSchema.add("my_mysql", schemaWithMysql);final SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();sqlStdOperatorTable.register(DICT_SEX);statement = calciteConnection.createStatement();}@Test@SneakyThrowspublic void test_db_func() {final ResultSet resultSet = statement.executeQuery("SELECT dict_sex(sex) sex_name FROM `user`");printResultSet(resultSet);}private static DataSource getMysqlDataSource() {MysqlDataSource dataSource = new MysqlDataSource();dataSource.setUrl("jdbc:mysql://localhost:3306/test");dataSource.setUser("root");dataSource.setPassword("123456");return dataSource;}public static void printResultSet(ResultSet resultSet) throws SQLException {// 获取 ResultSet 元数据ResultSetMetaData metaData = resultSet.getMetaData();// 获取列数int columnCount = metaData.getColumnCount();log.info("Number of columns: {}",columnCount);while (resultSet.next()) {final Map<String, String> item = Maps.newHashMap();// 遍历每一列并打印for (int i = 1; i <= columnCount; i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);item.put(columnName, columnValue);}log.info(item.toString());}}public static class SqlCustomOperatorTable extends SqlStdOperatorTable {private static SqlCustomOperatorTable instance;// 只需要申明为成员变量即可, instance.init() 的时候会反射取变量进行注册public static final SqlFunction DICT_SEX = SqlBasicFunction.create("dict_sex", ReturnTypes.VARCHAR, OperandTypes.family(SqlTypeFamily.CHARACTER));public static synchronized SqlCustomOperatorTable instance() {if (instance == null) {instance = new SqlCustomOperatorTable();instance.init();}return instance;}/*** 如果想修改获取函数的过程, 可以重写此方法*/@Overrideprotected void lookUpOperators(String name, boolean caseSensitive, Consumer<SqlOperator> consumer) {super.lookUpOperators(name, caseSensitive, consumer);}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/893624.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

P10945 Place the Robots 紫 题解

Part 1. 题意 在 \(N \times M\) 的矩阵中的空地放人机,任一人机上下左右走到边界或墙之前遇不到另一人机。 我已经尽力写得简短了。。Part 2. 思路 我们先思考无墙的情况。 若无墙,则同車的放置,把草方块当作禁止放車的方块即可,。 贴一下车的放置的代码: #include <b…

如何实现和调试REST API中的摘要认证(Digest Authentication)

如何实现和调试REST API中的摘要认证(Digest Authentication) 在保护REST API时,开发者通常会在多种认证机制之间进行选择,其中摘要认证(Digest Authentication)是一种常见的选择。本文探讨了使用摘要认证的原因,解释了其原理,提供了Java和Go语言的实现示例,并提供了测…

CF2068H. Statues

CF2068H. Statues 构造题. 思路 我们设 \(d_0 = a + b\) 是第 1 座雕像到第 \(n\) 座雕像的距离. 那么首先可以注意到两个必要条件:\(\displaystyle \sum_{i = 0}^{n - 1} d_i\) 为偶数. 对于 \(\forall i \in [0, n - 1]\), 都有 \(d_i \le d_0 + \dots + d_{i - 1} + d_{i + …

WEB攻防-机制验证篇重定向发送响应状态码跳过步骤验证码回传枚举

笔记: 验证码突破:回传的时候泄露了发送的验证码导致不需要知道目标的验证码是多少直接使用数据包里面的队列 规律爆破:就是常见的数字四位或者六位 10000 种可能在规定时间内爆破或者多次验证后网站不会出现新的验证码没有次数限制可以尝试爆破或者是汉字进行 重定向用户:通过…

Docker 安装 Redis 容器

1、下载Redis镜像下载指定版本的Redis镜像 (xxx指具体版本号) docker pull redis:xxx docker pull redis 下载最新版Redis镜像 (其实此命令就等同于 : docker pull redis:latest ),我用5.0.5版本。docker pull redis:5.0.52、 检查当前所有Docker下载的镜像docker images

Script-Server:用Web UI轻松管理你的脚本执行

# 监控 # 运维人员 在现代软件开发和运维中,脚本的使用频繁而广泛。然而,如何让非技术人员轻松、安全地运行这些脚本成为了一个挑战。 幸运的是,Script-Server应运而生,它是一个为脚本提供的Web用户界面,可以让用户通过一个直观的界面执行各种脚本,而无需编写代码。本文…

nuxtjs + scss + unocss + pinia 新建项目

1、通过命令行报错的,直接下载压缩包 pnpm dlx nuxi init <project-name>压缩包地址:https://codeload.github.com/nuxt/starter/tar.gz/refs/heads/v3 2、安装插件 1、安装unocss pnpm install --save-dev @unocss/nuxt unocss# nuxt.config.jsmodules: [@unocss/nuxt…

【Azure 环境】执行 az ad user show –id 报错 Insufficient privileges to complete the operation

执行 az ad user show –id 报错 Insufficient privileges to complete the operation问题描述 本地环境中,执行 az ad user show -id 时候,报错 insufficient privileges to complete the operation !问题解答 此报错说明,登录Azure (az login) 时,所使用的账号权限不足所…

VMware ESXi 8.0U3d macOS Unlocker OEM BIOS 2.7 集成网卡驱动和 NVMe 驱动 (集成驱动版)

VMware ESXi 8.0U3d macOS Unlocker & OEM BIOS 2.7 集成网卡驱动和 NVMe 驱动 (集成驱动版)VMware ESXi 8.0U3d macOS Unlocker & OEM BIOS 2.7 集成网卡驱动和 NVMe 驱动 (集成驱动版) 发布 ESXi 8.0U3 集成驱动版,在个人电脑上运行企业级工作负载 请访问原文链接:…

VMware Fusion 13.6.3 OEM BIOS 2.7 - 在 macOS 中运行 Windows 虚拟机的最佳方式

VMware Fusion 13.6.3 OEM BIOS 2.7 - 在 macOS 中运行 Windows 虚拟机的最佳方式VMware Fusion 13.6.3 OEM BIOS 2.7 - 在 macOS 中运行 Windows 虚拟机的最佳方式 VMware Fusion 13 原版 App 中集成 OEM BIOS 请访问原文链接:https://sysin.org/blog/vmware-fusion-13-oem/ …

VMware Workstation 17.6.3 发布下载,现在完全免费无论个人还是商业用途

VMware Workstation 17.6.3 发布下载,现在完全免费无论个人还是商业用途VMware Workstation 17.6.3 发布下载,现在完全免费无论个人还是商业用途 VMware Workstation 17.6.3 Pro for Windows & Linux - 领先的免费桌面虚拟化软件 基于 x86 的 Windows、Linux 桌面虚拟化软…

[51Nod 1558] 树中的配对

前言 这能不会, 这能不会, 这能不会??? 做了一会之后, 感觉确实可以不会 思路题意 带权树求一组排列 ppp 使得 dis(i,pi)→max⁡\textrm{dis} (i, p_i) \to \maxdis(i,pi​)→max结论 一条边最多的经过次数就是其连接的两部分中较小的那一个证明方法 \(1\) 调整法 首先点对初…