Flink写入数据到ClickHouse

文章目录

      • 1.ClickHouse建表
      • 1.ClickHouse依赖
      • 2.Bean实体类
      • 3.ClickHouse业务写入逻辑
      • 4.测试写入类
      • 5.发送数据

1.ClickHouse建表

ClickHouse中建表

CREATE TABLE default.test_write
(id   UInt16,name String,age  UInt16
) ENGINE = TinyLog();

1.ClickHouse依赖

Flink开发相关依赖

    <properties><flink.version>1.12.1</flink.version><scala.version>2.12.13</scala.version><clickhouse-jdbc.version>0.1.54</clickhouse-jdbc.version><lombok.version>1.18.12</lombok.version></properties><dependencies><!-- 写入数据到clickhouse --><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>${clickhouse-jdbc.version}</version></dependency><!-- flink核心API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies>

2.Bean实体类

User.java

package com.daniel.bean;import lombok.Builder;
import lombok.Data;/*** @Author Daniel* @Date: 2023/7/3 15:35* @Description**/@Data
@Builder
public class User {public int id;public String name;public int age;
}

3.ClickHouse业务写入逻辑

ClickHouseSinkFunction.java

package com.daniel.util;import com.daniel.bean.User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @Author Daniel* @Date: 2023/7/3 15:36* @Description**/public class ClickHouseSinkFunction extends RichSinkFunction<User> {Connection conn = null;String sql;public ClickHouseSinkFunction(String sql) {this.sql = sql;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);conn = getConn("localhost", 8123, "default");}@Overridepublic void close() throws Exception {super.close();if (conn != null) {conn.close();}}// 定义具体的操作@Overridepublic void invoke(User user, Context context) throws Exception {// 批量插入PreparedStatement preparedStatement = conn.prepareStatement(sql);preparedStatement.setLong(1, user.id);preparedStatement.setString(2, user.name);preparedStatement.setLong(3, user.age);preparedStatement.addBatch();long startTime = System.currentTimeMillis();int[] ints = preparedStatement.executeBatch();conn.commit();long endTime = System.currentTimeMillis();System.out.println("批量插入用时:" + (endTime - startTime) + "ms -- 插入数据行数:" + ints.length);}public Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {Class.forName("ru.yandex.clickhouse.ClickHouseDriver");String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;conn = DriverManager.getConnection(address);return conn;}
}
  • open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。

  • invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。

  • close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。

4.测试写入类

ClickHouseWriteTest.java

package com.daniel;import com.daniel.bean.User;
import com.daniel.util.ClickHouseSinkFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @Author daniel* @Date: 2023/7/3 15:37* @Description**/public class ClickHouseWriteTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();// SourceDataStream<String> ds = env.socketTextStream("localhost", 9999);// TransformSingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String, User>) data -> {String[] split = data.split(",");return User.builder().id(Integer.parseInt(split[0])).name(split[1]).age(Integer.parseInt(split[2])).build();});// SinkString sql = "INSERT INTO default.test_write (id, name, age) VALUES (?,?,?)";ClickHouseSinkFunction jdbcSink = new ClickHouseSinkFunction(sql);dataStream.addSink(jdbcSink);env.execute("flink-clickhouse-write");}
}

5.发送数据

使用nc或者任意工具向指定端口发送数据
例如

nc -L -p 9999

发送数据

1,Daniel,25
2,David,38
3,James,16
4,Robert,27

然后启动ClickHouseWriteTest.java程序

在这里插入图片描述

查询数据

select *
from default.test_write;

由于这里是并行插入,所以没有顺序可言

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/9344.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Web3】认识NFT

NFT&#xff08;非同质化代币&#xff09;在Web3中扮演着重要的角色。Web3是指下一代互联网&#xff0c;它建立在区块链技术之上&#xff0c;旨在实现更加去中心化、透明和用户掌控的互联网。 NFT在Web3的一些重要作用&#xff1a; 唯一性和可证明稀缺性&#xff1a;NFT是一种…

第十二章 原理篇:vision transformer

参考教程&#xff1a; https://arxiv.org/pdf/2010.11929.pdf https://zhuanlan.zhihu.com/p/340149804 【大佬总结的非常好&#xff0c;他的好多篇文章都很值得学习】 文章目录 为什么会使用transformerVIT详解method获得patchpatch embeddingposition embedding 代码实现eino…

Spring Boot 集成 Redisson分布式锁

Redisson 是一种基于 Redis 的 Java 驻留集群的分布式对象和服务库&#xff0c;可以为我们提供丰富的分布式锁和线程安全集合的实现。在 Spring Boot 应用程序中使用 Redisson 可以方便地实现分布式应用程序的某些方面&#xff0c;例如分布式锁、分布式集合、分布式事件发布和订…

【webrtc】vs2017 重新构建m98

配置了一台13900k的主机,需要重新配置webrtc 构建环境代码已经gclient sync 同步好了,打算重新构建:vs2017 的win10 sdk最大17763 vs2017 环境 set vs2017_install=S:\Program Files (x86)\Microsoft Visual Studio\2017\Communitywin10 SD

数字图像处理实验报告

目录 实验二、图像在空间域上的处理方法 实验三、图像在频率域上的处理方法 实验二、图像在空间域上的处理方法 一、实验目的 了解图像亮&#xff08;灰&#xff09;度变换与空间滤波的意义和手段&#xff1b;熟悉图像亮&#xff08;灰&#xff09;度变换与空间滤波的MATLA…

【AI机器学习入门与实战】机器学习算法都有哪些分类?

&#x1f44d;【AI机器学习入门与实战】目录 &#x1f36d;基础篇 &#x1f525; 第一篇&#xff1a;【AI机器学习入门与实战】AI 人工智能介绍 &#x1f525; 第二篇&#xff1a;【AI机器学习入门与实战】机器学习核心概念理解 &#x1f525; 第三篇&#xff1a;【AI机器学习入…

得物社区推荐精排模型演进

1.背景 得物社区是一大批年轻人获取潮流信息、分享日常生活的潮流生活社区。其中用户浏览的信息&#xff0c;进行个性化的分发&#xff0c;是由推荐系统来决策完成的。目前得物社区多个场景接入了推荐算法&#xff0c;包括首页推荐双列流、沉浸式视频推荐、分类tab推荐流、直播…

DOM“文档对象模型”

目录 DOM 1.节点层级 1.2.节点 常用节点 文档节点&#xff08;document&#xff09; 元素节点&#xff08;Element&#xff09; 属性节点&#xff08;Attribute) 文本节点&#xff08;Text&#xff09; 其他节点 DocumentType Comment DocumentFragment 1.3.节点树…

TCP的三次握手和四次挥手

一、网络七层协议 OSI七层协议模型主要是&#xff1a;物理层&#xff08;Physical&#xff09;、数据链路层&#xff08;Data Link&#xff09;、网络层&#xff08;Network&#xff09;、传输层&#xff08;Transport&#xff09;、会话层&#xff08;Session&#xff09;、表…

Mvc进阶(下)

Mvc进阶&#xff08;下&#xff09; 1.前言2.上次代码弊端1.利用xml建模反射优化1.XMl文件2.对xml建模 3.修改中央控制器 3.再优化1.先优化Action子控制器4.优化传值问题 4.总结 1.前言 虽然前面文章深入解析Java自定义MVC框架的原理与实现讲述了Mvc框架&#xff0c;但是那只能…

联邦聚合(FedAvg、FedProx、SCAFFOLD)

目录 联邦聚合算法对比(FedAvg、FedProx、SCAFFOLD) 解决问题 FedAvg FedProx SCAFFOLD 实验结果 联邦聚合算法对比(FedAvg、FedProx、SCAFFOLD) 论文链接&#xff1a; FedAvg&#xff1a;Communication-Efficient Learning of Deep Networks from Decentralized Data …

高级Android开发人员枯竭,在这个利润丰厚的领域如何蓬勃发展

为什么高级人才供不应求&#xff1f; 技术行业的发展速度非常快&#xff0c;新的技术和工具不断涌现&#xff0c;导致技能需求不断演变。这使得不断更新和学习变得至关重要。行业发展速度超过了教育和培训体系的能力跟进。传统教育往往滞后于最新的技术趋势和实践&#xff0c;…