03 数仓平台 Kafka

kafka概述

定义

Kafka 是一个开源的分布式事件流平台(Event Streaming Plantform),主要用于大数据实时领域。本质上是一个分布式的基于发布/订阅模式的消息队列(Message Queue)。

消息队列

在大数据场景中主要采用Kafka 作为消息队列。传统消息队列主要应用场景包括:缓存/削峰、解耦和异步通信。
消息队列的模式包含了 2 种,点对点订阅模式和发布/订阅模式。
在这里插入图片描述
Kafka采用了发布/订阅模式,这种模式有以下特点:

  • 可以有多个topic 主题
  • 消费者消费后,不会立即删除数据
  • 每个消费者组相互独立,不会影响。

Kafka 基础架构

在这里插入图片描述
为了方便扩展,提高吞吐量,一个 topic可以分为多个 partition。为了配合分区设计,提出了消费者组的概念,组内每个消费者并行消费。为提高可用性,每个 partition 增加若干可配置副本。在 2.8 之下的版本,将数据 leader提交给 Zookeeper 保管,2.8 版本之后,可以不配置 zookeeper。

Kafka 快速安装

规划

Hadoop101Hadoop102Hadoop103
ZKZKZK
KafkaKafkaKafka

集群部署

  1. 下载地址: https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz
  2. 解压安装:[logan@hadoop101 software]$ tar -zxf kafka_2.12-3.0.0.tgz -C /opt/module
  3. 创建链:[logan@hadoop101 module]$ ln -snf kafka_2.12-3.0.0/ kafka
  4. 进入到/opt/module/kafka/config/目录,修改配置文件vim server.properties
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/data
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181/kafka
  1. 分发安装包xsync /opt/module/kafka
  2. 分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
  3. 配置环境变量,在/etc/profile.d/my_env.sh文件中增加kafka环境变量配置。增加如下内容:
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
  1. 刷新环境变量source /etc/profile
  2. 分发环境变量文件到其他节点,并source。
  3. 先启动 Zookeeper zk.sh start
  4. 编写 kafka 集群启动脚本vim ~/bin/kf.sh,增加执行权限chmod +x ~/bin/kf.sh
#! /bin/bashcase $1 in
"start"){for i in hadoop101 hadoop102 hadoop103doecho " --------启动 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
};;
"stop"){for i in hadoop101 hadoop102 hadoop103doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "done
};;
esac
  1. 启动集群 kf.sh start

kafka命令行操作

  1. topic操作命令
操作指令
查看kafka-topics.sh --bootstrap-server hadoop101:9092 --list
创建kafka-topics.sh --bootstrap-server hadoop101:9092 --create --partitions 1 --replication-factor 3 --topic first
查看 topic 详情kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first
修改分区数kafka-topics.sh --bootstrap-server hadoop101:9092 --alter --topic first --partitions 3
删除 topickafka-topics.sh --bootstrap-server hadoop101:9092 --delete --topic first

说明:

  • –topic 定义topic名
  • –replication-factor 定义副本数
  • –partitions 定义分区数(分区数在修改时只能增加,不能减少)
  1. 生产者命令行
kafka-console-producer.sh --bootstrap-server hadoop101:9092 --topic first

3.消费者命令行

# 消费first主题中的数据。
kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic first
# 从头开始消费主题所有数据
kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --from-beginning --topic first

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/247078.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

电压驻波比

电压驻波比 关于IF端口的电压驻波比 一个信号变频后,从中频端口输出,它的输出跟输入是互异的。这个电压柱波比反映了它输出的能量有多少可以真正的输送到后端连接的器件或者设备。

ffmpeg 任意文件读取漏洞/SSRF漏洞 (CVE-2016-1897/CVE-2016-1898)

漏洞描述 影响范围 FFmpeg 2.8.x < 2.8.5FFmpeg 2.7.x < 2.7.5FFmpeg 2.6.x < 2.6.7FFmpeg 2.5.x < 2.5.10 漏洞环境及利用 搭建docker环境 访问8080端口看到上传界面 由于vulhub并没有讲述该漏洞如何复现&#xff0c;我们需要进入环境查看源码 <?php if(!…

Motion Plan之轨迹生成笔记 (2)

Motion Plan之搜索算法笔记 Motion Plan之基于采样的路径规划算法笔记 Motion Plan之带动力学约束路径搜索 什么是基于优化的轨迹生成 Optimization-Based Trajectory Planning&#xff08;基于优化的轨迹规划&#xff09;是一种常用的方法&#xff0c;用于生成自动化系统&am…

【数值计算方法(黄明游)】函数插值与曲线拟合(二):Newton插值【理论到程序】

​ 文章目录 一、近似表达方式1. 插值&#xff08;Interpolation&#xff09;2. 拟合&#xff08;Fitting&#xff09;3. 投影&#xff08;Projection&#xff09; 二、Lagrange插值1. 拉格朗日插值方法2. Lagrange插值公式a. 线性插值&#xff08;n1&#xff09;b. 抛物插值&…

EM32DX-C2【C#】

1说明&#xff1a; 分布式io&#xff0c;CAN总线&#xff0c;C#上位机二次开发&#xff08;usb转CAN模块&#xff09; 2DI&#xff1a; 公共端是&#xff1a; 0V【GND】 X0~X15&#xff1a;自带24v 寄存器地址&#xff1a;0x6100-01 6100H DI输入寄存器 16-bit &#x…

【NI-RIO入门】Real Time(实时系统解释)

1.什么是实时系统&#xff1f; 实时系统可以非常精确可靠的执行需要特定时许要求的系统&#xff0c;对于许多工程项目来说&#xff0c;通用操作系统&#xff0c;例如WINDOWS等标准PC上运行测量和控制程序是无法精确控制计时的&#xff0c;这些系统很容易受用户的其他程序、图像…

MYSQL练题笔记-聚合函数-各赛事的用户注册率

一、题目相关内容 1&#xff09;相关的表 2&#xff09;题目 3&#xff09;帮助理解题目的示例&#xff0c;提供返回结果的格式 二、自己初步的理解 有两张不同左右的表&#xff0c;用户表和赛事注册表。然后解题。 1.各种赛事的用户注册百分率 各种赛事的意味着通过contes…

Python实现FA萤火虫优化算法优化卷积神经网络分类模型(CNN分类算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 萤火虫算法&#xff08;Fire-fly algorithm&#xff0c;FA&#xff09;由剑桥大学Yang于2009年提出 , …

数据结构学习笔记——广义表

目录 一、广义表的定义二、广义表的表头和表尾三、广义表的深度和长度四、广义表与二叉树&#xff08;一&#xff09;广义表表示二叉树&#xff08;二&#xff09;广义表表示二叉树的代码实现 一、广义表的定义 广义表是线性表的进一步推广&#xff0c;是由n&#xff08;n≥0&…

4.grid_sample理解与使用

pytorch中的grid_sample 文章目录 pytorch中的grid_samplegrid_samplegrid_sample函数原型实例 欢迎访问个人网络日志&#x1f339;&#x1f339;知行空间&#x1f339;&#x1f339; grid_sample 直译为网格采样&#xff0c;给定一个mask patch&#xff0c;根据在目标图像上的…

C语言 操作符详解

C语言学习 目录 文章目录 前言 一、算术操作符 二、移位操作符 2.1 左移操作符 2.2 右移操作符 三、位操作符 3.1 按位与操作符 & 3.2 按位或操作符 | 3.3 按位异或操作符 ^ 四、赋值操作符 五、单目操作符 5.1 逻辑反操作符&#xff01; 5.2 正值、负值-操作符 5.3 取地址…

前端项目中获取浏览器版本的方法

在我们的前端项目中&#xff0c;navigator.userAgent属性含有当前浏览器相关信息&#xff08;比如版本号&#xff09;。 所以当我们想要获取用户当前访问的浏览器的版本时直接去解析navigator.userAgent字段就中。 废话不多说&#xff0c;下面看封装的获取浏览器版本的函数&am…