【Kafka】Kafka 架构深入

Kafka 工作流程及文件存储机制

Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。

topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号

例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。

index 和 log 文件以当前 segment 的第一条消息的 offset 命名

“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址


数据可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。


数据一致性问题

LEO:指的是每个副本最大的 offset; 
HW:指的是消费者能见到的最大的 offset,所有副本中最小的 LEO

1)follower 故障

follower 发生故障后会被临时踢出 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合),待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

2)leader 故障

leader 发生故障之后,会从 ISR 中选出一个新的 leader, 之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。

注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。 


ack 应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡选择。

当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:
●0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。

●1(默认配置):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。

●-1(或者是all):producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。

三种机制性能依次递减,数据可靠性依次递增。

注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。


Filebeat+Kafka+ELK

环境准备
node1:192.168.67.11        elasticsearch  kibana
node2:192.168.67.12        elasticsearch
apache:192.168.67.10               logstash  apache/nginx/mysql
Filebeat节点:filebeat/192.168.67.13           Filebeat
zk-kfk01:192.168.67.21                zookeeper、kafka
zk-kfk02:192.168.67.22                zookeeper、kafka
zk-kfk03:192.168.67.23                zookeeper、kafkasystemctl stop firewalld
systemctl enable firewalld
setenforce 0

1、部署 Zookeeper+Kafka 集群

2、部署 Filebeat 

cd /usr/local/filebeatvim filebeat.yml
filebeat.prospectors:
- type: logenabled: truepaths:- /var/log/httpd/access_logtags: ["access"]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: ["error"]......
#添加输出到 Kafka 的配置
output.kafka:enabled: true#指定 Kafka 集群配置hosts: ["192.168.67.21:9092","192.168.67.22:9092","192.168.67.23:9092"]#指定 Kafka 的 topictopic: "httpd"

 注释掉logstash 出口,留下kafka出口;出口只能有一个

 
启动 filebeat
systemctl restart filebeat.service
systemctl status filebeat.service# ./filebeat -e -c filebeat.yml

报错:服务起不来;查看日志;

原因:是filebeat.yml中将日志同时输出到了kafka和logstash

解决:注释掉logstash即可

3、部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件

cd /etc/logstash/conf.d/vim kafka.conf
input {kafka {#kafka集群地址bootstrap_servers => "192.168.67.21:9092,192.168.67.22:9092,192.168.67.23:9092"#拉取的kafka的指定topictopics  => "httpd"#指定 type 字段type => "httpd_kafka"#解析json格式的日志数据codec => "json"#拉取最近数据,earliest为从头开始拉取auto_offset_reset => "latest"#传递给elasticsearch的数据额外增加kafka的属性数据decorate_events => true}
}output {if "access" in [tags] {elasticsearch {hosts => ["192.168.67.11:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.67.11:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug }
}

启动 logstash
logstash -f kafka.conf

报错:路径重复

解决:指定一个新的路径

logstash -f kafka.conf --path.data=/opt

报错:配置文件有错

解决:配置文件删了重写

注:生产黑屏操作es时查看所有的索引:

curl -X GET "192.168.67.11:9200/_cat/indices?v"

4、浏览器访问

http://192.168.67.11:5601/

浏览器访问 http://192.168.67.11:5601 登录 Kibana,单击“Create Index Pattern”按钮添加索引“filebeat_test-*”,单击 “create” 按钮创建,单击 “Discover” 按钮可查看图表信息及日志信息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/617291.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

服务器docker应用一览

文章目录 一、需求概况二、业务流程三、运行效果四、实现过程1. 基础前提2. 源码放送3.核心代码4. 项目打包5.部署步骤 一、需求概况 现有某云主机服务器,用来做项目演示用,上面运行了docker应用,现希望有一总览页面,用来展示部署…

Java并发--锁

锁 volatile关键字 volatile可以保证变量的可见性,如果我们将变量声明为volatile,那么每次使用它都在主存内进行读取,保证其他线程能准确无误的读取到本线程对变量的修改。 底层: volatile关键字修饰变量的时候,本…

【C++庖丁解牛】底层为红黑树结构的关联式容器--哈希容器(unordered_map和unordered_set)

🍁你好,我是 RO-BERRY 📗 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 🎄感谢你的陪伴与支持 ,故事既有了开头,就要画上一个完美的句号,让我们一起加油 目录 1. unordered系列关联式容…

网络篇10 | 网络层 IP

网络篇10 | 网络层 IP 01 简介02 名称解释03 IP报文格式(IPv4)1)4位版本协议(version)2)4位首部长度(header length)3)8位服务类型(Type Of Service, TOS)4)16位总长度5)16位(分片)标识6)3位(分片)标志7&am…

【C++类和对象】构造函数与析构函数

💞💞 前言 hello hello~ ,这里是大耳朵土土垚~💖💖 ,欢迎大家点赞🥳🥳关注💥💥收藏🌹🌹🌹 💥个人主页&#x…

Linux虚拟内存简介

Linux,像多数现代内核一样,采用了虚拟内存管理技术。该技术利用了大多数程序的一个典型特征,即访问局部性(locality of reference),以求高效使用CPU和RAM(物理内存)资源。大多数程序…

大语言模型总结整理(不定期更新)

《【快捷部署】016_Ollama(CPU only版)》 介绍了如何一键快捷部署Ollama,今天就来看一下受欢迎的模型。 模型简介gemmaGemma是由谷歌及其DeepMind团队开发的一个新的开放模型。参数:2B(1.6GB)、7B&#xff…

ROS 2边学边练(22)-- 又见动作(action)

前言 我们在之前体验过各种通信方法(主题、服务、动作),并且也构建了自定义了一些msg和srv,那么同样的,对于action,同样支持自定义。我们来试试吧(动作的概念及流程还请翻到之前的博客进行查看&…

LeetCode-705. 设计哈希集合【设计 数组 哈希表 链表 哈希函数】

LeetCode-705. 设计哈希集合【设计 数组 哈希表 链表 哈希函数】 题目描述:解题思路一:超大数组解题思路二:拉链法解题思路三:定长拉链数组 题目描述: 不使用任何内建的哈希表库设计一个哈希集合(HashSet&…

机器学习和深度学习--李宏毅(笔记与个人理解)Day15

Day 15 重温宝可梦分类器 – 浅谈机器学习基本原理 REview 见我之前的笔记即可~ More parameters , easier to overfit ,why ? Step 1 a function (Based on domain knowedge) 线条的复杂程度? Edge Detction Step 2 Loss 这里注意一下哈,这个corss-en…

PCL 高斯滤波(C++详细过程版)

目录 一、概述二、代码实现三、结果展示1、滤波前2、滤波后3、对比PCL 高斯滤波(C++详细过程版)由CSDN点云侠原创,爬虫自重。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫。 一、概述 高斯滤波在PCL里有现成的调用函数,具体算法原理和实现代码见:

网络篇12 | 链路层 ARP

网络篇12 | 链路层 ARP 01 简介1)工作过程2)ARP缓存2.1 动态ARP表项2.2 静态ARP表项2.3 短静态ARP表项2.4 长静态ARP表项 02 ARP报文格式1)ARP请求报文格式2)ARP响应报文格式3)套一层以太网帧(ARP帧&#x…