ClickHouse--11--ClickHouse API操作

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 1.Java 读写 ClickHouse API
    • 1.1 首先需要加入 maven 依赖
    • 1.2 Java 读取 ClickHouse 集群表数据
        • JDBC--01--简介
      • ClickHouse java代码
    • 1.3 Java 向 ClickHouse 表中写入数据
  • 2.Spark 写入 ClickHouse API
    • 2.1 导入依赖
    • 2.2 代码编写
  • 3.Flink 写入 ClickHouse API
    • 3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API
    • 3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API


1.Java 读写 ClickHouse API

1.1 首先需要加入 maven 依赖

<!-- 连接 ClickHouse 需要驱动包-->
<dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version>
</dependency>

1.2 Java 读取 ClickHouse 集群表数据

JDBC–01–简介

在这里插入图片描述

public class Test01 {public static void main(String[] args) throws Exception {//1.注册数据库驱动Class.forName("com.mysql.jdbc.Driver");//2.获取数据库连接Connection conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/jt_db?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8","root", "root");//3.获取传输器Statement stat = conn.createStatement();//4.发送SQL到服务器执行并返回执行结果String sql = "select * from account";ResultSet rs = stat.executeQuery( sql );//5.处理结果while( rs.next() ) {int id = rs.getInt("id");String name = rs.getString("name");double money = rs.getDouble("money");System.out.println(id+" : "+name+" : "+money);}//6.释放资源rs.close();stat.close();conn.close();System.out.println("TestJdbc.main()....");}}

ClickHouse java代码


import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;import java.sql.ResultSet;
import java.sql.SQLException;public class test01 {public static void main(String[] args) throws SQLException {ClickHouseProperties props = new ClickHouseProperties();props.setUser("default");props.setPassword("");//1.注册数据库驱动配置BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123,node2:8123,node3:8123/default", props);//2.获取数据库连接ClickHouseConnection conn = dataSource.getConnection();//3.获取传输器ClickHouseStatement statement = conn.createStatement();//4.发送SQL到服务器执行并ResultSet rs = statement.executeQuery("select id,name,age from test");//5.处理结果while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");System.out.println("id = " + id + ",name = " + name + ",age = " + age);}//6.释放资源conn.close();statement.close();rs.close();}
}

1.3 Java 向 ClickHouse 表中写入数据

package com.cy.demo;import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.settings.ClickHouseProperties;import java.sql.ResultSet;
import java.sql.SQLException;public class test01 {public static void main(String[] args) throws SQLException {ClickHouseProperties props = new ClickHouseProperties();props.setUser("default");props.setPassword("");//1.注册数据库驱动配置BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://node1:8123/default", props);//2.获取数据库连接ClickHouseConnection conn = dataSource.getConnection();//3.获取传输器ClickHouseStatement statement = conn.createStatement();//4.发送SQL到服务器执行并statement.execute("insert into test values (100,'王五',30)");//可以拼接批量插入多条//6.释放资源conn.close();statement.close();rs.close();}
}

在这里插入图片描述

2.Spark 写入 ClickHouse API

  • SparkCore 写入 ClickHouse,可以直接采用写入方式。下面案例是使用 SparkSQL 将结果存入 ClickHouse对应的表中。在 ClickHouse 中需要预先创建好对应的结果表

2.1 导入依赖

        <!-- 连接 ClickHouse 需要驱动包--><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version><!-- 去除与 Spark 冲突的包 --><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion><exclusion><groupId>net.jpountz.lz4</groupId><artifactId>lz4</artifactId></exclusion></exclusions></dependency><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.3.1</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.3.1</version></dependency><!-- SparkSQL ON Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.1</version></dependency>

2.2 代码编写

val session: SparkSession =
SparkSession.builder().master("local").appName("test").getOrCreate()
val jsonList = List[String](
"{\"id\":1,\"name\":\"张三\",\"age\":18}",
"{\"id\":2,\"name\":\"李四\",\"age\":19}",
"{\"id\":3,\"name\":\"王五\",\"age\":20}"
)
//将 jsonList 数据转换成 DataSet
import session.implicits._
val ds: Dataset[String] = jsonList.toDS()
val df: DataFrame = session.read.json(ds)
df.show()
//将结果写往 ClickHouse
val url = "jdbc:clickhouse://node1:8123/default"
val table = "test"
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "default")
properties.put("password", "")
properties.put("socket_timeout", "300000")
df.write.mode(SaveMode.Append).option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000).jdbc(url,
table, properties)

3.Flink 写入 ClickHouse API

  • 可以通过 Flink 原生 JDBC Connector 包将 Flink 结果写入 ClickHouse 中,Flink 在1.11.0 版本对其 JDBC Connnector 进行了重构:

在这里插入图片描述

3.1 Flink 1.10.x 之前版本使用 flink-jdbc,只支持 Table API

  1. maven 中需要导入以下包:
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
</dependency>
<!--添加 Flink JDBC 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码:
/**
* 通过 flink-jdbc API 将 Flink 数据结果写入到 ClickHouse 中,只支持 Table API
*
* 注意:
* 1.由于 ClickHouse 单次插入的延迟比较高,我们需要设置 BatchSize 来批量插入数据,提高性能。
* 2.在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足 BatchSize,则不会插入剩余数
据。
*/
case class PersonInfo(id:Int,name:String,age:Int)
object FlinkWriteToClickHouse1 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1,后期每个并行度满批次需要的条数时,会插入 click 中
env.setParallelism(1)
val settings: EnvironmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)
//导入隐式转换
import org.apache.flink.streaming.api.scala._
//读取 Socket 中的数据
val sourceDS: DataStream[String] = env.socketTextStream("node5",9999)
val ds: DataStream[PersonInfo] = sourceDS.map(line => {
val arr: Array[String] = line.split(",")
PersonInfo(arr(0).toInt, arr(1), arr(2).toInt)
})
//将 ds 转换成 table 对象
import org.apache.flink.table.api.scala._
val table: Table = tableEnv.fromDataStream(ds,'id,'name,'age)
//将 table 对象写入 ClickHouse 中
//需要在 ClickHouse 中创建表:create table flink_result(id Int,name String,age Int) engine =
MergeTree() order by id;
val insertIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//准备 ClickHouse table sink
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://node1:8123/default")
.setUsername("default")
.setPassword("")
.setQuery(insertIntoCkSql)
.setBatchSize(2) //设置批次量,默认 5000 条
.setParameterTypes(Types.INT, Types.STRING, Types.INT)
.build()
//注册 ClickHouse table Sink,设置 sink 数据的字段及 Schema 信息
tableEnv.registerTableSink("ck-sink",
sink.configure(Array("id", "name", "age"),Array(Types.INT, Types.STRING, Types.INT)))
//将数据插入到 ClickHouse Sink 中
tableEnv.insertInto(table,"ck-sink")
//触发以上执行
env.execute("Flink Table API to ClickHouse Example")
}
}

3.2 Flink 1.11.x 之后版本使用 flink-connector-jdbc,只支持DataStream API

  1. 在 Maven 中导入以下依赖包
<!-- Flink1.11 后需要 Flink-client 包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink Table API 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.11.3</version>
</dependency>
<!--添加 Flink JDBC Connector 以及 Clickhouse JDBC Driver 相关的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
  1. 代码
/**
* Flink 通过 flink-connector-jdbc 将数据写入 ClickHouse ,目前只支持 DataStream API
*/
object FlinkWriteToClickHouse2 {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为 1
env.setParallelism(1)
import org.apache.flink.streaming.api.scala._
val ds: DataStream[String] = env.socketTextStream("node5",9999)
val result: DataStream[(Int, String, Int)] = ds.map(line => {
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
})
//准备向 ClickHouse 中插入数据的 sql
val insetIntoCkSql = "insert into flink_result (id,name,age) values (?,?,?)"
//设置 ClickHouse Sink
val ckSink: SinkFunction[(Int, String, Int)] = JdbcSink.sink(
//插入数据 SQL
insetIntoCkSql,
//设置插入 ClickHouse 数据的参数
new JdbcStatementBuilder[(Int, String, Int)] {
override def accept(ps: PreparedStatement, tp: (Int, String, Int)): Unit = {
ps.setInt(1, tp._1)
ps.setString(2, tp._2)
ps.setInt(3, tp._3)
}
},
//设置批次插入数据
new JdbcExecutionOptions.Builder().withBatchSize(5).build(),
//设置连接 ClickHouse 的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://node1:8123/default")
.withUsername("default")
.withUsername("")
.build()
)
//针对数据加入 sink
result.addSink(ckSink)
env.execute("Flink DataStream to ClickHouse Example")
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/473244.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

成本效能FinOps: Crane 部署

目录 一、实验 1.环境 2.安装kind 3.安装Crane 二、问题 1.脚本安装prometheus报错 2.查看集群信息失败 3.Helm添加grafana 报错 4.查看crane资源失败 5.prometheus部署时kube-state-metrics 拉取镜像显示ImagePullBackOff 6.Crane 功能与架构 一、实验 1.环境 &a…

MySQL的日志

一&#xff1a;概述 &#xff08;1&#xff09;介绍 在任何一种数据库中&#xff0c;都会有各种各样的日志&#xff0c;记录着数据库工作的方方面面&#xff0c;以帮助数据库管理员追踪数据库曾经发生过的各种事件&#xff0c;MySQL也不例外。 &#xff08;2&#xff09;分类…

【VTKExamples::PolyData】第三十三期 MiscCellData

很高兴在雪易的CSDN遇见你 VTK技术爱好者 QQ:870202403 前言 本文分享VTK样例MiscCellData,了解如何创建PolyData数据,希望对各位小伙伴有所帮助! 感谢各位小伙伴的点赞+关注,小易会继续努力分享,一起进步! 你的点赞就是我的动力(^U^)ノ~YO 1. MiscCellData /…

【从Python基础到深度学习】7. 使用scp命令实现主机间通讯

一、生成 SSH 密钥对 ssh-keygen 是一个用于生成 SSH 密钥对的命令行工具&#xff0c;用于身份验证和加密通信 ssh-keygen 二、将本地主机上的 SSH 公钥添加到远程主机 ssh-copy-id 命令用于将本地主机上的 SSH 公钥添加到远程主机上的 authorized_keys 文件中&#xff0c;…

数据库第二次实验

目录 1 实验内容 2 SQL代码及运行截图 2.1 创建表并插入数据 2.1.1 创建表 2.1.2 插入数据 2.1.3 运行截图 2.2 修改表 2.2.1 SQL代码 2.2.2 运行截图 2.3 删除操作 2.3.1 SQL代码 2.3.2 运行截图 2.4 数据库的备份 2.5 数据库的恢复 1 实验内容 实验目的&#…

安装cockpit

1、下载cockpit yum -y install cockpit 下载相关环境 yum install qemu-kvm libvirt libvirt-daemon virt-install virt-manager libvirt-dbus 2、启动libvirtd systemctl start libvirtd.service systemctl enable libvirtd.service 3、设置开机自启动 systemctl enabl…

JVM常见问题笔记分享

文章目录 1 JVM组成1.1 JVM由那些部分组成&#xff0c;运行流程是什么&#xff1f;1.2 什么是程序计数器&#xff1f;1.3 你能给我详细的介绍Java堆吗?元空间(MetaSpace)介绍 1.4 什么是虚拟机栈1.5 堆和栈的区别1.6 能不能解释一下方法区&#xff1f;1.5.1 概述1.5.2 常量池1…

第五节 zookeeper集群与分布式锁_2

1.分布式锁概述 1.1 什么是分布式锁 1&#xff09;要介绍分布式锁&#xff0c;首先要提到与分布式锁相对应的是线程锁。 线程锁&#xff1a;主要用来给方法、代码块加锁。当某个方法或代码使用锁&#xff0c;在同一时刻仅有一个线程执行该方法或该代码段。 线程锁只在同一J…

2.14练习

选择题 1.A 2.A 3.B 4.A 5.A 6.D 7.A 8.A 9.A 10.C 11.C 12.B 13.D 14.D 15.A 16.C 二&#xff0e;填空题 1. 6 2. 2 3 5 7 9 3. rgb 4. *s 迷 5. 2 5 6. *s

WordPress作者页面链接的用户名自动变成16位字符串串插件Smart User Slug Hider

WordPress默认的作者页面URL链接地址格式为“你的域名/author/admin”&#xff0c;其中admin就是你的用户名&#xff0c;这样的话就会暴露我们的用户名。 为了解决这个问题&#xff0c;前面boke112百科跟大家分享了『如何将WordPress作者存档链接中的用户名改为昵称或ID』一文…

使用Apache ECharts同时绘制多个统计图表

目录 1、介绍 2、相关知识 3、代码 4、效果 &#x1f343;作者介绍&#xff1a;双非本科大三网络工程专业在读&#xff0c;阿里云专家博主&#xff0c;专注于Java领域学习&#xff0c;擅长web应用开发、数据结构和算法&#xff0c;初步涉猎Python人工智能开发和前端开发。 …

DNS服务正反解析

1.正向解析 1.配置基本 1.1防火墙配置 二者都要关闭 setenforce 0 systemctl stop firewalld #关闭防火墙 yum install bind -y #下载bind软件 客户端可以不用下 1.2服务端配置静态ip&#xff0c; ip a 查看网卡 nmcli c modify ens33 ipv4.method manual ipv4.addresses …