7.kafka+ELK连接

文章目录

  • kafka+ELK连接
    • 部署Kafka
    • kafka操作命令
    • kafka架构深入
    • Filebeat+Kafka+ELK连接

kafka+ELK连接

部署Kafka

###关闭防火墙systemctl stop firewalld
systemctl disable firewalldsetenforce 0vim /etc/selinux/configSELINUX=disabled
###下载安装包官方下载地址:http://kafka.apache.org/downloads.htmlcd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz
###安装 Kafkacd /opt/
tar xf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka
###修改配置文件##备份配置文件
cd /usr/local/kafka/config/
cp server.properties{,.bak}vim server.properties---21行--
broker.id=0    
###21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2---31行---
listeners=PLAINTEXT://192.168.242.70:9092    
###31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改---42行---
num.network.threads=3    
###42行,broker 处理网络请求的线程数量,一般情况下不需要去修改---45行---
num.io.threads=8            
#45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数---48行---
socket.send.buffer.bytes=102400       
#48行,发送套接字的缓冲区大小---51行---
socket.receive.buffer.bytes=102400    
#51行,接收套接字的缓冲区大小---54行---
socket.request.max.bytes=104857600    
#54行,请求套接字的缓冲区大小---60行---
log.dirs=/usr/local/kafka/logs        
#60行,kafka运行日志存放的路径,也是数据存放的路径---65行---
num.partitions=1    
#65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖---69行---
num.recovery.threads.per.data.dir=1    
#69行,用来恢复和清理data下数据的线程数量---103行---
log.retention.hours=168    
#103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除---110行---
log.segment.bytes=1073741824    
#110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件---123行---
zookeeper.connect=192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181    
###123行,配置连接Zookeeper集群地址
###修改环境变量vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin###加载配置
source /etc/profile###复制文件到zookeeper服务器哦scp -r /usr/local/kafka 192.168.242.71:/usr/local
scp -r /usr/local/kafka 192.168.242.72:/usr/local
###部署kafka启动脚本vim /etc/init.d/kafka#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esac
###设置开机自启cd /etc/init.d/
chmod +x /etc/init.d/kafka
chkconfig --add kafka###分别启动 Kafka
service kafka start###另外一种启动方式cd /usr/local/kafka/bin./kafka-server-start.sh -daemon /usr/local/kafka/config/server.propertiesnetstat -lntp | grep 9092

在这里插入图片描述

kafka操作命令

####创建topickafka-topics.sh --create --zookeeper 192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181 --replication-factor 2 --partitions 3 --topic test--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
--replication-factor:定义分区副本数,1 代表单副本,建议为 2 
--partitions:定义分区数 
--topic:定义 topic 名称

在这里插入图片描述

###查看当前服务器中的所有 topickafka-topics.sh --list --zookeeper 192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181 
###查看某个 topic 的详情kafka-topics.sh  --describe --zookeeper 192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181 

在这里插入图片描述

###发布消息kafka-console-producer.sh --broker-list 192.168.242.70:9092,192.168.242.71:9092,192.168.242.72:9092  --topic test
###消费消息kafka-console-consumer.sh --bootstrap-server 192.168.242.70:9092,192.168.242.71:9092,192.168.242.72:9092 --topic test --from-beginning--from-beginning:会把主题中以往所有的数据都读取出来

在这里插入图片描述

###修改分区数kafka-topics.sh --zookeeper 192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181  --alter --topic test --partitions 6

在这里插入图片描述

###删除 topickafka-topics.sh --delete --zookeeper 192.168.242.70:2181,192.168.242.71:2181,192.168.242.72:2181 --topic test

在这里插入图片描述

kafka架构深入

  • Kafka 工作流程及文件存储机制

    • Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。
  • topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。

  • Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。

  • 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

  • 由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。

  • 每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。

  • 这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。

  • index 和 log 文件以当前 segment 的第一条消息的 offset 命名。

    • “.index” 文件存储大量的索引信息,
    • “.log” 文件存储大量的数据,
    • 索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。
  • 数据可靠性保证

    • 为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
  • 数据一致性问题

    • LEO:指的是每个副本最大的 offset;
    • HW:指的是消费者能见到的最大的 offset,所有副本中最小的 LEO。
  • follower 故障

    • follower 发生故障后会被临时踢出 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合),待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
  • leader 故障

    • leader 发生故障之后,会从 ISR 中选出一个新的 leader, 之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。
  • 注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

  • ack 应答机制

    • 对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。
    • 所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡选择。
  • 当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

    • 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。
    • 1(默认配置):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。
    • -1(或者是all):producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。
  • 三种机制性能依次递减,数据可靠性依次递增。

注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。

+ kafka会通过ack机制保证数据的可靠性

  • ack配置参数有
  • 0(效果类似异步复制):不等待follower同步完成就让生产者发送下一条消息
  • 1(效果类似半同步复制):至少等待一个follower同步完成才让生产者发送下一条消息
  • -1(效果类似全同步复制):等待所有follower同步完成才让生产者发送下一条消息

Filebeat+Kafka+ELK连接

在这里插入图片描述

###部署 Zookeeper+Kafka 集群###部署 Filebeat,修改配置文件 cd /usr/local/filebeatvim filebeat.ymlfilebeat.prospectors:
- type: logenabled: truepaths:- /var/log/httpd/access_logtags: ["access"]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: ["error"]......#添加输出到 Kafka 的配置output.kafka:enabled: truehosts: ["192.168.242.70:9092","192.168.242.71:9092","192.168.242.72:9092"]    #指定 Kafka 集群配置topic: "httpd"    #指定 Kafka 的 topic

在这里插入图片描述
在这里插入图片描述

###启动 filebeat./filebeat -e -c filebeat.yml

在这里插入图片描述

###部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件cd /etc/logstash/conf.d/vim kafka.confinput {kafka {bootstrap_servers => "192.168.242.70:9092,192.168.242.71:9092,192.168.242.72:9092"  #kafka集群地址topics  => "httpd"     #拉取的kafka的指定topictype => "httpd_kafka"  #指定 type 字段codec => "json"        #解析json格式的日志数据auto_offset_reset => "latest"  #拉取最近数据,earliest为从头开始拉取decorate_events => true   #传递给elasticsearch的数据额外增加kafka的属性数据}
}output {if "access" in [tags] {elasticsearch {hosts => ["192.168.242.66:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.242.66:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug }
}
####启动 logstashlogstash -f kafka.conf

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/22055.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】初识

🍁 博客主页:江池俊的博客_CSDN博客-C语言——探索高效编程的基石领域博主 🍁 专栏:https://blog.csdn.net/2201_75743654/category_12348274.html 🍁 如果觉得博主的文章还不错的话,请点赞👍收藏&#x1f…

【Linux】进程优先级

Linux 进程优先级 为什么要有优先级的划分?Linux 环境设置优先级的具体做法并发运行环境变量如何通过代码获取环境变量 环境变量的来源 为什么要有优先级的划分? 优先级的规定就是为了确定某种资源获取的先后顺序。 本质原因是因为CPU资源是有限的。进程…

LIS检验信息系统

LIS检验信息系统是以病人为中心、以业务处理为基础、以提高检验科室管理水平和工作效率为目标,将医学检验、科室管理和财务统计等工作进行整合,全面改善检验科室的工作现状。 LIS把检验、检疫、放免、细菌微生物及科研使用的各类分析仪器,通…

uniapp实现聊天消息触,vue3和vue2实现聊天消息触底 scrollTop ,scrollHeight Pc端H5端都适用

uniapp触底SDN链接如下(本人的另一篇博客) uniapp聊天时时触底链接 Pc端 模拟手机端H5 vue3写法 <template><div><!-- 聊天窗体 --><div class"test" id"gundong"><div class"text" v-for"p in chat"&…

SQL Server数据库 -- 表的高级查询

文章目录 一、子查询 嵌套子查询相关子查询二、查询运算 并运算union交运算intersect差运算except三、函数的使用 if语句while语句case语句四、总结 前言 高级子查询是对查询更灵活的运用&#xff0c;学会了高级查询将对数据库使用有很大的帮助。 一、子查询 1、子查询简介 在…

比亚迪海外市场势起

监制 | 何玺 排版 | 叶媛 海外市场正成为比亚迪新的增长点。 据媒体报道&#xff0c;从去年下半年至今&#xff0c;比亚迪已经在至少四个国家的纯电车型市场占据“销冠”位置。对于国内用户群体来说&#xff0c;比亚迪销量“霸屏”早已不是新闻&#xff0c;但在海外也保持这个…

易基因“多区域DNA甲基化检测探针设计及其检测方法”获专利授权!

大家好&#xff0c;这里是专注表观组学十余年领跑多组学科研服务的易基因。 DNA甲基化是表观遗传学研究中&#xff0c;修饰最为稳定&#xff0c;含量最为丰富&#xff0c;对基因调控最为活跃、途径最为广泛的一种修饰。不同基因区域或位点的修饰与胚胎发育、疾病发生和发展密切…

数据库--->MySQL(2)【事务、SQL优化】

文章目录 事务什么是事务&#xff1f;隔离性中的不同隔离级别事务实现的原理隔离级别的实现原理&#xff08;MVCC&#xff09;MySQL中的锁机制 SQL优化 事务 什么是事务&#xff1f; 事务就是逻辑上的一组操作&#xff0c;在同一个事务中&#xff0c;如果有多条sql语句执行&am…

android studio 使用lib中的framework.jar编译

本文参考了网上搜索到的内容总结了一下&#xff0c;感谢大神们的无私奉献。 在App中的build.gradle中的android{}下添加&#xff1a; android{...gradle.projectsEvaluated {tasks.withType(JavaCompile) {Set<File> fileSet options.bootstrapClasspath.getFiles()Li…

100种思维模型之安全边际思维模型-92

安全边际&#xff0c; 简而言之即距离某一件糟糕的事件发生&#xff0c;还有多大的空间&#xff0c;安全边际越高&#xff0c;我们就越安全&#xff01; 安全边际思维模型一个 让生活变得更从容 的 思维模型。 01、何谓安全边际思维模型 一、安全边际思维 安全边际 源于…

(9)基础强化:元字符,正则表达式,匹配,提取组,Regex,Match与Matches

一、作业 1、问&#xff1a;下面解压程序出错&#xff0c;什么原因&#xff1f; string src "E:\1.txt";string des "E:\2.txt";using (FileStream read File.OpenRead(src)){using (GZipStream gzip new GZipStream(read, CompressionMode.Decompress…

PHP代码审计(一)之PHP代码审计的意义

PHP代码审计的意义 什么是代码审计 什么是代码审计&#xff1f;代码审计就是获取目标的源代码&#xff0c;这个目标可以是一个网站&#xff0c;也可以是一个手机app&#xff0c;只要我们得到了目标的源代码&#xff0c;我们就可以去挖掘目标系统的漏洞&#xff0c;代码审计是…