【Kakfa】初识Kafka

news/2025/3/10 14:51:12/文章来源:https://www.cnblogs.com/cmxb/p/18757884

简介

Kafka是一个分布式消息系统,有LinkedIn公司开发,现已成为Apache基金顶级开源项目。
是一种快速、可扩展的、分布式的消息发布-订阅系统

基础组成

  • producer
  • consumer
  • broker
  • topic
  • partition
    image

消息和批次

  • 消息:Kafka把数据单元称之为消息,可以把数据消息看成数据库中的一个“数据行”。
  • 键:消息的可选元数据,当消息要以可控的方式写入不同分区时,需要用到键。(为键生成一个一致性哈希值,用哈希值对主题分区数进行取模,来确定分区)
  • 批次:包含一组属于统一主题和分区的消息。合理使用批次,可以极大的提高效率。
    • 在延迟和吞吐量之间做出权衡:批次越大,单位时间处理消息越多,对于单个消息来说传输时间越长
    • 消息批次会被压缩,可以提升数据的传输和存储性能。

主题和分区

  • 主题:Kafka的消息通过主题进行分类。(类似于数据库的表)
  • 分区:主题可以被分为若干个分区
    • 一个分区就是一个提交日志。消息以追加的方式写入分区
    • 按照先入先出的顺序读取。
    • 由于一个topic包含多个分区,所以整个topic中无法保证消息的顺序;但是能保证消息在单个分区中的顺序
    • Kafka通过分区来实现数据的冗余和伸缩,分区可以分布在不同的服务器上,分区可以被复制,相同分区的多个副本可以保存在多台服务器上。

生产者和消费者

Kafka的客户端就是Kafka系统的用户分为生产者和消费者

生产者:用于创建消息

  • 一条消息会被发布到一个特定的主题上,默认情况下生产者会把消息均匀的分不到主题所有分区总
  • 特殊情况下,生产者会被消息写入制定分区,通过消息键+分区器来实现。
    • 分区器会为键生成一个哈希值,并将其映射到特定分区,这样可以保证同样的键被写入到同一个分区。
    • 可以自定义分区器,根据业务规则不同将消息映射到不同的分区

消费者: 读取消息

  • 消费者可以订阅一个或多个主题(???),并按照消息写入分区的顺序读取他们。
  • 消费者通过检查偏移量来区分读取过的消息。
  • 偏移量:创建消息时,Kafka会把他添加到消息中。
    • 在给定的分区中,每一条消息的偏移量都是唯一的,越往后的消息偏移量越大(不保证单调递增)
    • 消费者会将每一个分区的下一个偏移量保存起来(通常保存在Kafka中),如果消费者关闭/重启,保证读取状态不会丢失
  • 消费者组:消费者可以使消费者组的一部分,属于统一群组的一个或多个消费者共同读取的一个主题。
    • 群组可以保证每个分区只被这个群组里的一个消费者读取。
    • 消费者与分区之间的映射成为消费者对分区的所有权关系

broker和集群

broker

  • 一个单独的Kafka服务器成为broker,broker会接收来自生产者的消息,为其设置偏移量,并提交到磁盘保存。
  • broker为消费者提供服务,对读取分区的请求做出响应,并返回已经发布的消息。

集群

  • 多个broker组成了集群。每个集群都有一个同时充当了集群控制器角色的broker(自动从活动的成员中选举)。
  • 控制器负责管理工作,为broker分配分区和监控broker。
    • 一个分区从属于一个broker,这个broker成为分区的首领
    • 一个被分配给其他broker的分区副本,叫做这个分区的“跟随者”。
    • 一个首领broker发生故障,其中跟随者可以接管领导权。
    • 生产者要发布消息,必须链接到首领;但是消费者可以从首领或跟随者那里读取消息。
  • 保留消息:
    • 保留一段时间(如7天),或者保留消息总量达到一定大小(如1GB)
    • 如果消息数量达到上限,旧消息会过期并被删除。
    • 主题可以配置自己的保留策略

多集群

  • 当broker数量增多,最好使用多个集群
    • 数据类型分离
    • 安全需求隔离
    • 多数据中心(容灾)
  • 如果有多数据中心,则需要在它们之间进行消息复制,是的一个用户修改了信息后,不管从哪个数据中心都可以看到更新。
    • Kafka的消息复制仅限于单集群,不能跨集群复制
    • Kafka提供MirrorMaker,可以用来将数据复制到其他集群中。
    • MirrorMaker 包含一个消费者和生产者,消费者从一个集群读取消息,生产者负责发送到另一个集群中

borker参数配置

  • broker.id
    • 默认是0,可以被设置成其他任意整数。
    • 值在kafka集群中必须是唯一的,并且可以在服务器节点之间移动
    • 建议降ID设置成与主机名相关的整数(host1,host2),那么用1,2来代表broker.id
  • zookeeper.connect
    • 保存broker元数据的Zookeeper地址通过zookeeper.connect来指定
    • 参数是用逗号分隔的 hostname:port/path
      • hostname:服务器主机名或者IP地址
      • port:Zookeeper服务器端口
      • path:Zookeeper路径,可以作为Kafka集群的chroot。不指定默认使用根路径。
      • 为什么使用chroot?这样配置可以在不发生冲突的情况下降Zookeeper群组共享给其他应用程序。
  • log.dirs
    • log.dirs一组使用逗号分隔的文件路径
    • Kafka把所有消息保存在磁盘上,存放日志的片段的目录是通过log.dir来指定的,如果有多个目录可以使用log.dirs指定
    • 如果指定多条路径,broker会根据“最少使用”原则,把同一个分区的日志片段保存到同一条路径下。
    • broker会向分区数了最少的目录新增分区,而不根据可用磁盘空间大小的来判断新增分区;所以不能保证数据会被均匀的分布在多个目录中,会导致小磁盘提前写满
  • num.recoverythread.per.data.dir每个日志文件对应的线程数
    • Kafka使用线程池来处理日志片段,目前线程池用于以下三种
      • 服务正常启动时,用于打开每个分区的日志片段
      • 服务崩溃重启时,用于检查和截短每个分区的日志片段
      • 服务正常关闭时,用于关闭日志片段
    • 默认情况下,每个日志目录使用一个线程。
    • 该参数如果设置为2,log.dirs下有3个目录,总共会有6个线程
  • auto.create.topic.enable
    • 默认情况下Kafka会在以下三种情况自动创建topic
      • 当一个生产者向主题写入消息时
      • 当一个消费者从主题读取消息时
      • 客户端向主题获取元数据请求时
    • 不希望手动创建则 可以把auto.create.topic.enable设置为false
  • auto.leader.rebalance.enable
    • 为了确保主题的所有权不集中在同一个borker上,可以将找个参数设置为true,让主题所有权尽可能的在集群中保持均衡
  • delete.topic.enable:设置为false,禁用删除topic的功能

Topic参数配置

  • num.partitions
    • 指定创建topic将包含多少个分区
    • 默认分区是1,可以增加topic的分区,但是不能减少
    • 分区数建议是broker的倍数,这样可以使得分区均衡的分布到broker上
    • 避免使用太多分区,每个分区都会占用broker的内存和资源,还会增加元数据更新和首领选举的时间
    • 计算分区:如果写入是1GB,消费是50MB,那么至少需要20个分区,这样可以使得20个消费者同时读取这些分区,达到1GB的吞吐量
  • log.retention.ms
    • 日志保留时间
    • 默认168小时,7天,
    • log.retention.minuteslog.retention.ms 都是确认消息将在多久之后被删除
  • log.retention.bytes
    • 通过保留策略计算保留的消息的字节总数来判断消息是否过期,
    • 对应的是一个分区的大小,如果大小设置的1G,一共两个分区,那么topic总大小是2g
  • log.segment.bytes
    • 作用范围:日志片段,当消息到达broker时,会被追加到当前分区的日志片段上。
    • 当日志片段大小达到log.segment.bytes,当前日志片段会被关闭,打开一个新的日志片段。
    • 如果参数设置的越小,日志片段的关闭/分配就会很频繁,因为是一个IO操作,会降低整体磁盘写入效率
    • 日志片段为关闭,消息是不会过期的。所以如果一个topic每条消息100MB,日志片段大小1GB,那么填满则需要10天,消息过期的时间就会是10(填满)+7(过期时间)=17天
  • log.roll.ms
    • 用于控制日志片段关闭时间的参数,指定多长时间后日志片段可以被关闭
    • 默认是没有赋值的,过期时间168小时,与log.segment.bytes 共同作用,要么到达时间关闭,要么到达大小关闭
  • min.insync.replicas
    • 分区的副本数,如果设置为2,代表每个分区有2个副本。
    • 生产者需要将ack设置为all,这样可以确保至少两个副本确认写入成功,从而防止丢失数据
  • message.max.bytes
    • 用于控制单条消息的大小,默认1MB,指的是压缩后的消息大小
    • 消息大于1MB,会被broker拒收,返回错误信息。
    • 值越大->网络连接和请求时间越长->磁盘写入块越大->IO性能越差
  • fetch.max.bytes
    • 消费者消费数据的大小,需要与消息大小保持一致,否则无法读取数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/895648.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算子 get_contour_attrib_xld - 返回 XLD 轮廓的点属性值

算子 get_contour_attrib_xld 名称 get_contour_attrib_xld — 返回 XLD 轮廓的点属性值。 签名 get_contour_attrib_xld(Contour : : Name : Attrib) 描述 get_contour_attrib_xld 算子将 XLD 轮廓 Contour 中属性 Name 的值返回到 Attrib 中。轮廓点属性是为每个轮廓点定义的…

川大网安挑战赛游记

会赢吗部分在手机上完成,格式可能有误,见谅。前:可能是最后的比赛了,还是记一下吧Day -1 下午三点半就走了,吃了汉堡王上火车,路上看网课,听歌。 到家快十点了,最终后半夜前睡了。 Day 0 早起,7:30 的飞机。发现手机没电了,不过还是顽强的让我听了一路的歌。 酒店严格…

win11开启22端口ssh服务器

步骤按win,输入可选功能条读完后,搜索service找到ssh然后进行设置,或者也可以使用命令。启动后端口就打开了。 配置默认shell 按win,搜索regedit,打开注册表编辑器。 导航到以下路径: HKEY_LOCAL_MACHINE\SOFTWARE\OpenSSH 在右侧窗口中,找到或创建一个名为 DefaultShel…

Python+Django网上招聘系统的设计与实现

平台采用B/S结构,后端采用主流的Python语言进行开发,前端采用主流的Vue.js进行开发。 整个平台包括前台和后台两个部分。 - 前台功能包括:首页、岗位详情页、简历中心、用户设置模块。 - 后台功能包括:总览、岗位管理、公司管理、分类管理、标签管理、评论管理、用户管理、…

elasticseach-分页搜索

背景 使用es通过常规分页来做导出是遇到不能超过from不能跳过1万的问题。结合这个问题契机深入了解一下es的分页。 入参{"from":10601,"size": 5}响应{"error": {"root_cause": [{"type": "illegal_argument_exception…

牛客 周赛83 20250304

牛客 周赛83 20250304 https://ac.nowcoder.com/acm/contest/102896 A: 题目大意:给定字符,不同输出 #include<bits/stdc++.h> #define cintie ios::sync_with_stdio(false);cin.tie(0);cout.tie(0); #define Trd int T;cin>>T;while (T--)solve(); #define LLi…

3.2 练习

在使用torch.autograd.grad计算二阶导数时,可以仅设置create_graph=True而无需设置retain_graph=True。以下是关键点总结:create_graph的作用: 当设置为True时,会保留梯度计算图,使得后续能对梯度再次求导(如计算二阶导数)。这是高阶导数计算的关键参数。retain_graph的…

6、seq2seq - Transformer-Encoder、Transformer-Decoder

Attention - 注意力机制seq2seq是 Sequence to Sequence 的简写,seq2seq模型的核心就是编码器(Encoder)和解码器(Decoder)组成的通过在seq2seq结构中加入Attention机制,是seq2seq的性能大大提升,先在seq2seq被广泛的用于机器翻译、对话生成、人体姿态序列生成等各种任务…

作业-个人项目编程

作业gitHub 作业链接这个作业属于哪个课程 软件工程这个作业要求在哪里 作业要求这个作业的目标 完成一次个人项目计算模块接口的设计与实现过程1. 代码组织 计算模块的核心功能是读取文件内容并计算两个文本的相似度。代码组织如下: 模块划分文件读取模块:函数:readFile。 …

mybatis-plus02--Lesson2

CRUD和myBatis-plus插件 1.Insert方法和雪花算法 当一个数据表中的id为主键时,且插入的数据的时候不插入主键id,那么会发生什么呢?接下来就进行一次简单测试,还是那个User表,插入其它属性,不插入主键id。 测试方法:@Testpublic void testInsert(){User user = new User(…

免费好用的云服务器提供商

最近倒腾云服务器,想在阿里云上购买,结果发现价格太高,是在买不起。我只好在百度上翻呀翻,终于找到一个免费的,实名一下就能用。链接我贴这儿了,要用请自取:免费好用的云服务器提供商

2020-PTA总决赛-L3-1 那就别担心了(记忆化搜索)

dfs,记忆化搜索思路:读懂题到28分花了十分钟左右,做的时候就感觉可能要超时,因为结点稍微有点多 但是还是继续硬着头皮写下去了,果不其然,最后一个测试点超时,那么就要开dp数组了 题目大意就是找到A到B有几条路径,且是否走哪条路都能通向B28分Code: #include<bits/st…