flinkjar开发 自定义函数

编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。

import org.apache.flink.table.functions.ScalarFunction;
import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Base64;public class AESUtil extends ScalarFunction {private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG";private static String KEY_ALGORITHM = "AES";private static String key = "AD42F6697B035B75";//必须有这个方法,在这个方法里实现业务逻辑public String eval(String str) {return encrypt(str);}/*** 加密** @param key* @param messBytes* @return*/private static byte[] encrypt(Key key, byte[] messBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, key);return cipher.doFinal(messBytes);}return null;}/*** AES(256)解密** @param key* @param cipherBytes* @return*/private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception {if (key != null) {Cipher cipher = Cipher.getInstance(KEY_ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, key);return cipher.doFinal(cipherBytes);}return null;}/*** 生成加密秘钥** @return* @throws NoSuchAlgorithmException*/private static KeyGenerator getKeyGenerator() {KeyGenerator keygen = null;try {keygen = KeyGenerator.getInstance(KEY_ALGORITHM);SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM);secureRandom.setSeed(key.getBytes());keygen.init(128, secureRandom);} catch (NoSuchAlgorithmException e) {}return keygen;}public static String encrypt(String message) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8)));} catch (Exception e) {}return null;}public static String decrypt(String ciphertext) {try {KeyGenerator keygen = getKeyGenerator();SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM);return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8);} catch (Exception e) {}return null;}

FlinkCDC mysql到mysql 业务代码


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.example.util.AESUtil;public class FlinkMysqlToMysql {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));env.enableCheckpointing(5000);env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 注册源表和目标表tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc"'connector' = 'mysql-cdc'," +"'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'flinktest',\n" +" 'username' = 'root',\n" +" 'password' = 'admin'\n" +")");
//这里注册加密函数tEnv.createTemporarySystemFunction("encrypt", new AESUtil());
//sql里面使用自定义函数加密Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable");tEnv.registerTable("sourceTable", result);//创建skink表tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc"'connector' = 'jdbc'," +"'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +" 'table-name' = 'flinktest2',\n" +" 'username' = 'root',\n" +" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +" 'password' = 'admin'\n" +")");
// 执行CDC过程String query = "INSERT INTO targetTable SELECT * FROM sourceTable";tEnv.executeSql(query).print();}
}

运行结果,加密成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/448746.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于配置系统环境变量 点击确定就显示,此环境变量过大2047

使用了网络上的所有办法,均无效 最终解决办法:把系统path环境变量里的变量环境删掉一些之后,成功加入! 原因就是path里面的内容太多了导致的,删掉一些变量就好了!

代码随想录算法训练营29期Day41|LeetCode 343,96

文档讲解:整数拆分 不同的二叉搜索树 343.整数拆分 题目链接:https://leetcode.cn/problems/integer-break/description/ 思路: 题目要求我们拆分n,拆成k个数使其乘积和最大,然而题目中并没有给出k,所以…

day07-CSS高级

01-定位 作用:灵活的改变盒子在网页中的位置 实现: 1.定位模式:position 2.边偏移:设置盒子的位置 left right top bottom 相对定位 position: relative 特点: 不脱标,占用自己原来位置 显示模…

办公软件巨头CCED、WPS面临新考验,新款办公软件异军突起

办公软件巨头CCED、WPS的成长经历 众所周知,CCED和WPS在中国办公软件领域树立了两大知名品牌的地位。然而,它们的成功并非一朝一夕的成就,而是历经了长时间的发展与积淀。 在上世纪80年代末至90年代初,CCED作为中国大陆早期的一款…

​【c语言】函数递归

1. 递归是什么 递归是c语言学习上绕不开的话题,那么什么是递归呢? 递归实际上是自己调用自己。 2. 递归的限制条件 递归在书写的时候有两个限制条件: 递归存在限制条件,当满足这个限制条件式,递归将不再继续。 每…

PDF中公式转word

效果:实现pdf中公式免编辑 step1: 截图CtrlAltA,复制 step2: SimpleTex - Snip & Get 网页或客户端均可,无次数限制,效果还不错。还支持手写、文字识别 单张图片:选 手写板 step3: 导出结果选择 注:…

在flutter中集成Excel导入和导出

flutter中集成Excel导入和导出功能 1、需要的依赖 在pubspec.yaml #excel导出syncfusion_flutter_xlsio: ^24.1.45open_file: ^3.0.1#导入excelflutter_excel: ^1.0.1#选择文件的依赖file_picker: ^6.1.1(1)依赖说明 在测试时,我们在使用导…

《汇编语言:基于linux环境》通过sys_read, sys_write 实现大小写英文字母转换

x64 syscall 参数构造表 ; nasm -f elf64 -g -F dwarf uppercaser2.asm ; ld -o uppercaser2 uppercaser2.o ; gdb uppercaser2; ./uppercaser2 > (输出文件) < (输入文件) ;./uppercaser2 > 2.txt <1.txtSECTION .bssBUFFLEN equ 1024Buff: resb BUFFLENSECTION…

Qt 范例阅读: QStateMachine状态机框架 和 SCXML 引擎简单记录(方便后续有需求能想到这两个东西)

一、QStateMachine 简单应用&#xff1a; 实现按钮的文本切换 QStateMachine machine; //定义状态机&#xff08;头文件定义&#xff09;QState *off new QState(); //添加off 状态off->assignProperty(ui->pushButton_2, "text", "Off"); //绑定该…

Docker上安装配置tomcat

目录 1. 拉取镜像 2. 创建运行镜像 3. 查看是否创建成功 ps&#xff1a;如果出现404错误 tomcat目录结构 1. 拉取镜像 这里使用 tomcat:8.5.40 版本作为安装 docker pull tomcat:8.5.40 2. 创建运行镜像 docker run -d --name tomcat -p 8080:8080 \--privilegedtrue …

【红包封面发放+微信红包封面制作教程】小黑猫祝大家小年快乐~

今年终于成功获得了微信红包封面~是我们家的小黑猫&#xff0c;嘿嘿。 封面获取方式 一共还有600份&#xff0c;数量有限&#xff0c;大家想要的话请关注文末的公众号&#xff0c;访问红包封面相关的推文获取~ 平时公众号主要发布一些技术类工具知识&#xff0c;希望能帮到大…

个人网站如何让搜索引擎收录

当我们花费功夫搭建好个人网站&#xff0c;如何能让搜索引擎搜索到个人网站呢&#xff1f;比如百度&#xff0c;根本百度不到自己网站的内容。这时候就要使用到搜索引擎提供的站点收录功能了&#xff0c;但是点开百度的搜索资源平台&#xff0c;添加自己的站点时&#xff0c;就…