ClickHouse10-ClickHouse中Kafka表引擎

Kafka表引擎也是一种常见的表引擎,在很多大数据量的场景下,会从源通过Kafka将数据输送到ClickHouse,Kafka作为输送的方式,ClickHouse作为存储引擎与查询引擎,大数据量的数据可以得到快速的、高压缩的存储。
在这里插入图片描述

Kafka大家肯定不陌生:

  • 它可以用于发布和订阅数据流,是常见的队列使用方式
  • 它可以组织容错存储,是常见的容错存储的使用方式
  • 它可以在流可用时对其进行处理,是常见的大数据处理的使用方式

全文概览:

  • 基本语法
  • 从 Kafka 写入到 ClickHouse
  • 从 ClickHouse 写入到 Kafka
    • 测试1:queue->ck->queue
    • 测试2:ck->queue

基本语法

分为定义表结构和定义Kafka的接入参数,Kafka的接入参数都是常见的字段

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(name1 [type1] [ALIAS expr1],name2 [type2] [ALIAS expr2],...
) ENGINE = Kafka()
SETTINGSkafka_broker_list = 'host:port',kafka_topic_list = 'topic1,topic2,...',kafka_group_name = 'group_name',kafka_format = 'data_format'[,][kafka_schema = '',][kafka_num_consumers = N,][kafka_max_block_size = 0,][kafka_skip_broken_messages = N,][kafka_commit_every_batch = 0,][kafka_client_id = '',][kafka_poll_timeout_ms = 0,][kafka_poll_max_batch_size = 0,][kafka_flush_interval_ms = 0,][kafka_thread_per_consumer = 0,][kafka_handle_error_mode = 'default',][kafka_commit_on_select = false,][kafka_max_rows_per_message = 1];

示例:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'

从 Kafka 写入到 ClickHouse

创建topic:

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test_ck_sync1

创建同步表:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'CREATE TABLE IF NOT EXISTS test_ck_sync1_res
(`sys_time` Datetime COMMENT '',`num` UInt32 COMMENT ''
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(sys_time)
ORDER BY tuple()

创建物化视图,进行数据样式的转换:

CREATE MATERIALIZED VIEW test_ck_sync1_mv TO test_ck_sync1_res AS
SELECTsys_time,num
FROM test_ck_sync1

通过console写入数据:

[$ kafka_2.13-3.6.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test_ck_sync1
>2024-01-01 00:00:01|89  

验证数据:

$ :) select * from test_ck_sync1_res;SELECT *
FROM test_ck_sync1_resQuery id: a666f893-5be9-4022-9327-3a1507aa5485┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:01 │  89 │
└─────────────────────┴─────┘
┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:00 │  88 │
└─────────────────────┴─────┘2 rows in set. Elapsed: 0.049 sec.

从 ClickHouse 写入到 Kafka

kafka_writers_reader --(view)--> kafka_writers_queue ---> 

创建一个队列:

bin/kafka-topics.sh --topic kafka_writers --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

创建同步表:

CREATE TABLE kafka_writers_reader (     `id` Int,     `platForm` String,     `appname` String,     `time` DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'kafka_writers_reader', kafka_group_name = 'kafka_writers_reader_group', kafka_format = 'CSV';CREATE TABLE kafka_writers_queue (     id Int,     platForm String,     appname String,     time DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092',        kafka_topic_list = 'kafka_writers',        kafka_group_name = 'kafka_writers_group',        kafka_format = 'CSV',       kafka_max_block_size = 1048576;

测试1:queue->ck->queue

通过写入队列kafka_writers_reader,借助ClickHouse写入队列kafka_writers

bin/kafka-topics.sh --topic kafka_writers_reader --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kafka_writers_readerbin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers

测试2:ck->queue

通过写入表kafka_writers_reader,写入队列kafka_writers

$ :) INSERT INTO kafka_writers_reader (id, platForm, appname, time) 
VALUES (8,'Data','Test','2020-12-23 14:45:31'), 
(9,'Plan','Test1','2020-12-23 14:47:32'), 
(10,'Plan','Test2','2020-12-23 14:52:15'), 
(11,'Data','Test3','2020-12-23 14:54:39');INSERT INTO kafka_writers_reader (id, platForm, appname, time) FORMAT ValuesQuery id: 223a63ab-97fa-488d-8ea7-c2e194155d26Ok.4 rows in set. Elapsed: 1.054 sec. 
[$ kafka_2.13-3.6.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers
8,"Data","Test","1970-01-01 08:00:00"9,"Plan","Test1","1970-01-01 08:00:00"10,"Plan","Test2","1970-01-01 08:00:00"11,"Data","Test3","1970-01-01 08:00:00"

如果喜欢我的文章的话,可以去GitHub上给一个免费的关注吗?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/572749.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

YOLOv5实战记录02 模型检测

本人记录打卡,不够自习,慎看。 今天主要学了计组和计网,YOLO简单打个卡。 指路大佬:【手把手带你实战YOLOv5-入门篇】YOLOv5 模型检测_哔哩哔哩_bilibili 1. 主要讲了几个关键参数: 图源你可是处女座 运行示例&#…

在企业微信里面添加h5页面 进行登录授权

1.需求:在企业微信里面添加h5页面 进行登录授权,获取到用户的code,进行登入id的验证 2.步骤: 根据企业微信开发者中心中构造网页授权链接进行授权 在企业微信内部进行配置,拿到appid,redirect_uri&#x…

【八股】2024春招八股复习笔记2(大数据开发,Java)

【八股】2024春招八股复习笔记2(大数据开发) 文章目录 1、大数据存储(Flume、Hive、HBase、HDFS)2、大数据计算(MapReduce,Spark、Flink)3、大数据集群(Yarn、ZooKeeper、kafka&…

R语言 | 上下双向柱状图

1. 效果图 2. 代码 # 生成测试数据 difdata.frame(labelspaste0("pathway", 1:3),upc(30,15,1),downc(10,20,40) ) rownames(dif)dif$labels dif#变形 difreshape2::melt(dif) dif# 绘图 ggplot(dif, aes(xlabels, yifelse(variable"up", value, -value), …

Digital Image processing (DIP)

Camera FOV: Filed of view DOV: deep of view 景深 被F f/D 衡量,f 是焦距,D 是光圈大小。 当确定好了景深后,如何光线较暗,则需要补光,或者适当延长曝光时间(快门) 分辨率、像素尺寸&…

【GitLab】Ubuntu 22.04 快速安装 GitLab

在 Ubuntu 22.04 上安装最新版本的 GitLab,可以按照以下步骤操作: 1. 更新系统: 在终端中执行以下命令以确保系统是最新的: sudo apt update sudo apt upgrade2. 安装依赖: 安装 GitLab 所需的依赖包: …

2024年云计算使用报告,89%组织用多云,25%广泛使用生成式AI,45%需要跨云数据集成,节省成本是云首要因素

备注:本文来自Flexera2024年的云现状调研报告的翻译。原报告地址: https://info.flexera.com/CM-REPORT-State-of-the-Cloud Flexera是一家专注于做SaaS的IT解决方案公司,有30年发展历史,5万名客户,1300名员工。Flex…

35.HarmonyOS App(ArkUI)使用父组件@Builder装饰的方法初始化子组件@BuilderParam报错

HarmonyOS App(ArkUI)使用父组件Builder装饰的方法初始化子组件BuilderParam报错 Type void is not assignable to type () > void. <tsCheck> 去掉括号()就可以了 装饰器&#xff1a; 用于装饰类、结构、方法以及变量&#xff0c;并赋予其特殊的含义。如上述示例中En…

npm淘宝镜像源更新

目录 前情提要&#xff1a; 背景&#xff1a; 镜像源更新&#xff1a; 清楚缓存&#xff1a; 直接切换镜像源&#xff1a; 补充&#xff1a; 错误解释&#xff1a; 解决方法&#xff1a; 前情提要&#xff1a; 2024 /1 /22 &#xff0c;registry.npm.taobao.org淘宝镜像源的SSL…

数据结构之单链表的详细实现(图解)

前言 本次博客讲结合图例讲解单向不带头非循环链表 此后会讲解一些题目 1单链表的实现 1.1什么是单链表 我们先看数组&#xff0c;即顺序表的是什么样的&#xff0c;再看链表 1.2单链表的特点 实际中要实现的链表的结构非常多样&#xff0c;以下情况组合起来就有8种链表结…

HarmonyOS实战开发-为应用添加运行时权限

介绍 通过AbilityAccessCtrl动态向用户申请“允许不同设备间的数据交换”的权限&#xff0c;使用设备管理实例获取周边不可信设备列表。 说明&#xff1a; 查询周边不可信设备之前&#xff0c;请确保本设备与周边设备未进行配对。如果已配对&#xff0c;则恢复出厂设置之后重新…

树与二叉树的应用试题解析

01&#xff0e;在有n个叶结点的哈夫曼树中&#xff0c;非叶结点的总数是( A ). A. n-1 B. n C. 2n-1 D.2n 02.给定整数集合{3,5,6,9,12}&#xff0c;与之对应的哈夫曼树是( D…