简介
Kafka是一个分布式消息系统,有LinkedIn公司开发,现已成为Apache基金顶级开源项目。
是一种快速、可扩展的、分布式的消息发布-订阅系统
基础组成
- producer
- consumer
- broker
- topic
- partition
消息和批次
- 消息:Kafka把数据单元称之为消息,可以把数据消息看成数据库中的一个“数据行”。
- 键:消息的可选元数据,当消息要以可控的方式写入不同分区时,需要用到键。(为键生成一个一致性哈希值,用哈希值对主题分区数进行取模,来确定分区)
- 批次:包含一组属于统一主题和分区的消息。合理使用批次,可以极大的提高效率。
- 在延迟和吞吐量之间做出权衡:批次越大,单位时间处理消息越多,对于单个消息来说传输时间越长
- 消息批次会被压缩,可以提升数据的传输和存储性能。
主题和分区
- 主题:Kafka的消息通过主题进行分类。(类似于数据库的表)
- 分区:主题可以被分为若干个分区,
- 一个分区就是一个提交日志。消息以追加的方式写入分区
- 按照先入先出的顺序读取。
- 由于一个topic包含多个分区,所以整个topic中无法保证消息的顺序;但是能保证消息在单个分区中的顺序
- Kafka通过分区来实现数据的冗余和伸缩,分区可以分布在不同的服务器上,分区可以被复制,相同分区的多个副本可以保存在多台服务器上。
生产者和消费者
Kafka的客户端就是Kafka系统的用户分为生产者和消费者
生产者:用于创建消息
- 一条消息会被发布到一个特定的主题上,默认情况下生产者会把消息均匀的分不到主题所有分区总
- 特殊情况下,生产者会被消息写入制定分区,通过消息键+分区器来实现。
- 分区器会为键生成一个哈希值,并将其映射到特定分区,这样可以保证同样的键被写入到同一个分区。
- 可以自定义分区器,根据业务规则不同将消息映射到不同的分区
消费者: 读取消息
- 消费者可以订阅一个或多个主题(???),并按照消息写入分区的顺序读取他们。
- 消费者通过检查偏移量来区分读取过的消息。
- 偏移量:创建消息时,Kafka会把他添加到消息中。
- 在给定的分区中,每一条消息的偏移量都是唯一的,越往后的消息偏移量越大(不保证单调递增)
- 消费者会将每一个分区的下一个偏移量保存起来(通常保存在Kafka中),如果消费者关闭/重启,保证读取状态不会丢失
- 消费者组:消费者可以使消费者组的一部分,属于统一群组的一个或多个消费者共同读取的一个主题。
- 群组可以保证每个分区只被这个群组里的一个消费者读取。
- 消费者与分区之间的映射成为消费者对分区的所有权关系
broker和集群
broker
- 一个单独的Kafka服务器成为broker,broker会接收来自生产者的消息,为其设置偏移量,并提交到磁盘保存。
- broker为消费者提供服务,对读取分区的请求做出响应,并返回已经发布的消息。
集群
- 多个broker组成了集群。每个集群都有一个同时充当了集群控制器角色的broker(自动从活动的成员中选举)。
- 控制器负责管理工作,为broker分配分区和监控broker。
- 一个分区从属于一个broker,这个broker成为分区的首领
- 一个被分配给其他broker的分区副本,叫做这个分区的“跟随者”。
- 一个首领broker发生故障,其中跟随者可以接管领导权。
- 生产者要发布消息,必须链接到首领;但是消费者可以从首领或跟随者那里读取消息。
- 保留消息:
- 保留一段时间(如7天),或者保留消息总量达到一定大小(如1GB)
- 如果消息数量达到上限,旧消息会过期并被删除。
- 主题可以配置自己的保留策略
多集群
- 当broker数量增多,最好使用多个集群
- 数据类型分离
- 安全需求隔离
- 多数据中心(容灾)
- 如果有多数据中心,则需要在它们之间进行消息复制,是的一个用户修改了信息后,不管从哪个数据中心都可以看到更新。
- Kafka的消息复制仅限于单集群,不能跨集群复制
- Kafka提供MirrorMaker,可以用来将数据复制到其他集群中。
- MirrorMaker 包含一个消费者和生产者,消费者从一个集群读取消息,生产者负责发送到另一个集群中
borker参数配置
broker.id
:- 默认是0,可以被设置成其他任意整数。
- 值在kafka集群中必须是唯一的,并且可以在服务器节点之间移动
- 建议降ID设置成与主机名相关的整数(host1,host2),那么用1,2来代表broker.id
zookeeper.connect
- 保存broker元数据的Zookeeper地址通过
zookeeper.connect
来指定 - 参数是用逗号分隔的 hostname:port/path
- hostname:服务器主机名或者IP地址
- port:Zookeeper服务器端口
- path:Zookeeper路径,可以作为Kafka集群的chroot。不指定默认使用根路径。
- 为什么使用chroot?这样配置可以在不发生冲突的情况下降Zookeeper群组共享给其他应用程序。
- 保存broker元数据的Zookeeper地址通过
log.dir
slog.dirs
一组使用逗号分隔的文件路径- Kafka把所有消息保存在磁盘上,存放日志的片段的目录是通过
log.dir
来指定的,如果有多个目录可以使用log.dirs
指定 - 如果指定多条路径,broker会根据“最少使用”原则,把同一个分区的日志片段保存到同一条路径下。
- broker会向分区数了最少的目录新增分区,而不根据可用磁盘空间大小的来判断新增分区;所以不能保证数据会被均匀的分布在多个目录中,会导致小磁盘提前写满
num.recoverythread.per.data.dir
:每个日志文件对应的线程数- Kafka使用线程池来处理日志片段,目前线程池用于以下三种
- 服务正常启动时,用于打开每个分区的日志片段
- 服务崩溃重启时,用于检查和截短每个分区的日志片段
- 服务正常关闭时,用于关闭日志片段
- 默认情况下,每个日志目录使用一个线程。
- 该参数如果设置为2,
log.dirs
下有3个目录,总共会有6个线程
- Kafka使用线程池来处理日志片段,目前线程池用于以下三种
auto.create.topic.enable
- 默认情况下Kafka会在以下三种情况自动创建topic
- 当一个生产者向主题写入消息时
- 当一个消费者从主题读取消息时
- 客户端向主题获取元数据请求时
- 不希望手动创建则 可以把
auto.create.topic.enable
设置为false
- 默认情况下Kafka会在以下三种情况自动创建topic
auto.leader.rebalance.enable
- 为了确保主题的所有权不集中在同一个borker上,可以将找个参数设置为true,让主题所有权尽可能的在集群中保持均衡
delete.topic.enable
:设置为false,禁用删除topic的功能
Topic参数配置
num.partitions
- 指定创建topic将包含多少个分区
- 默认分区是1,可以增加topic的分区,但是不能减少
- 分区数建议是broker的倍数,这样可以使得分区均衡的分布到broker上
- 避免使用太多分区,每个分区都会占用broker的内存和资源,还会增加元数据更新和首领选举的时间
- 计算分区:如果写入是1GB,消费是50MB,那么至少需要20个分区,这样可以使得20个消费者同时读取这些分区,达到1GB的吞吐量
log.retention.ms
- 日志保留时间
- 默认168小时,7天,
log.retention.minutes
和log.retention.ms
都是确认消息将在多久之后被删除
log.retention.bytes
- 通过保留策略计算保留的消息的字节总数来判断消息是否过期,
- 对应的是一个分区的大小,如果大小设置的1G,一共两个分区,那么topic总大小是2g
log.segment.bytes
- 作用范围:日志片段,当消息到达broker时,会被追加到当前分区的日志片段上。
- 当日志片段大小达到
log.segment.bytes
,当前日志片段会被关闭,打开一个新的日志片段。 - 如果参数设置的越小,日志片段的关闭/分配就会很频繁,因为是一个IO操作,会降低整体磁盘写入效率
- 日志片段为关闭,消息是不会过期的。所以如果一个topic每条消息100MB,日志片段大小1GB,那么填满则需要10天,消息过期的时间就会是10(填满)+7(过期时间)=17天
log.roll.ms
- 用于控制日志片段关闭时间的参数,指定多长时间后日志片段可以被关闭
- 默认是没有赋值的,过期时间168小时,与
log.segment.bytes
共同作用,要么到达时间关闭,要么到达大小关闭
min.insync.replicas
- 分区的副本数,如果设置为2,代表每个分区有2个副本。
- 生产者需要将
ack
设置为all
,这样可以确保至少两个副本确认写入成功,从而防止丢失数据
message.max.bytes
- 用于控制单条消息的大小,默认1MB,指的是压缩后的消息大小
- 消息大于1MB,会被broker拒收,返回错误信息。
- 值越大->网络连接和请求时间越长->磁盘写入块越大->IO性能越差
fetch.max.bytes
- 消费者消费数据的大小,需要与消息大小保持一致,否则无法读取数据