ELK安装、部署、调试(四)KAFKA消息队列的安装和部署

1.简介

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

Kafka [1]  是一种高吞吐量 [2]  的分布式发布订阅消息系统,有如下特性:

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

高吞吐量 [2]  :即使是非常普通的硬件Kafka也可以支持每秒数百万 [2]  的消息。

1.1 使用消息队列好处
1)、提高扩展性:因为消息队列解耦了处理过程,有新增需求时只要另外增加处理过程即可。

2)、提高峰值处理能力:在访问量剧增的情况下,应用仍然需要继续发挥作用。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷请求而完全崩溃;

3)、提高系统的可恢复性:消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统回复后被处理
1.2 kafka的结构

  • Producer:消息生产者,产生的消息将会被发送到某个topic
  • Consumer:消息消费者,消费的消息内容来自某个topic
  • Topic:消息根据topic进行归类,topic其本质是一个目录,即将同一主题消息归类到同一个目录
  • Broker:每一个kafka实例(或者说每台kafka服务器节点)就是一个broker,一个broker可以有多个topic

2.下载

获取我的baidu网盘中ELK
kafka.apache.org 官方网址
由于我下载的filebeat是7.9.3,与这个版本filebeat配合的kafka版本为0.11--2.2.2
https://www.elastic.co/guide/en/beats/filebeat/7.9/kafka-output.html 这里有说明。

Compatibility
This output works with all Kafka versions in between 0.11 and 2.2.2. Older versions might work as well, but are
not supported.

3.安装

tar zxvf kafka_2.11-2.0.1.tgz -C /usr/local
ls
cd /usr/local/
mv kafka_2.11-2.0.1 kafka
./bin 执行文件
./config 配置文件主配置文件为 server.properties

4.配置

vi server.properties

broker.id=0 集群内的唯一标识
listeners=PLAINTEXT://ip[或者主机名]:9092          #配置监听端口  9092位kafka默认端口
log.dirs=/usr/local/kafka/logs      #日志及传输的数据存放目录可以通过“,”分隔存放到多个目录,如
log.dirs=/data1,/data2
num.partitions=6  #定义新的topic有多少个分区
num.retention.hours=60  消息日志保留的时间,单位是小时
#num.retention.minutes 或者num.retention.ms,可以多个参数同时设置,时间最小生效
log.segment.bytes=1073741824  每个段文件的大小,1G为默认值,topic--partition--segement
zookeeper.connect=10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181
auto.create.topics.enable=true  #可以自动创建topics
delete.topic.enable=true  #可以自动删除空闲的topics,是逻辑的删除,非物理删除。
zookeeper.connect.timeout.ms=6000

mkdir /data1
mkdir /data2

5.启动kafka 

cd /usr/local/kafka
./bin/kafka-server-start.sh ./config/server.properties -d      #-d 后台执行
或者nohup ./kafka-server-start.sh ../config/server.properties &

启动时可能会报错,注意关闭服务器的防火墙

jps查看

[root@localhost kafka]# jps
23696 Kafka
9271 QuorumPeerMain
24077 Jps

kafka提供了多个命令来查看,创建,修改,删除topic信息,也可以通过命令来测试日志消息
命令存放路径为usr/local/kafka/bin

[root@localhost bin]# pwd
/usr/local/kafka/bin

[root@localhost bin]# ls
connect-distributed.sh        kafka-consumer-groups.sh     kafka-preferred-replica-election.sh  kafka-streams-
application-reset.sh  zookeeper-server-start.sh
connect-standalone.sh         kafka-consumer-perf-test.sh  kafka-producer-perf-test.sh          kafka-topics.sh  zookeeper-server-stop.sh
kafka-acls.sh                 kafka-delegation-tokens.sh   kafka-reassign-partitions.sh         kafka-
verifiable-consumer.sh        zookeeper-shell.sh
kafka-broker-api-versions.sh  kafka-delete-records.sh      kafka-replica-verification.sh        kafka-
verifiable-producer.sh
kafka-configs.sh              kafka-dump-log.sh            kafka-run-class.sh                   trogdor.sh
kafka-console-consumer.sh     kafka-log-dirs.sh            kafka-server-start.sh                windows
kafka-console-producer.sh     kafka-mirror-maker.sh        kafka-server-stop.sh                 zookeeper-
security-migration.sh

例子:kafka的任何操作都需要zookeeper
查看有多少个topic

./kafka-topics.sh  --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181 --list

创建topic

./kafka-topics.sh  --create --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181 --replication-factor
1 --partitions 3 --topic mytopic

实例:

[root@localhost bin]# ./kafka-topics.sh  --create --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181
--replication-factor 1 --partitions 3 --topic mytopic
Created topic "mytopic".
[root@localhost bin]# ./kafka-topics.sh  --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181 --list  mytopic
[root@localhost bin]#

看看topic的具体信息

./kafka-topics.sh  --describe --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181  --topic mytopic[root@localhost bin]# ./kafka-topics.sh  --describe --zookeeper
10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181  --topic mytopic
Topic:mytopic   PartitionCount:3        ReplicationFactor:1     Configs:Topic: mytopic  Partition: 0    Leader: 1       Replicas: 1     Isr: 1Topic: mytopic  Partition: 1    Leader: 2       Replicas: 2     Isr: 2Topic: mytopic  Partition: 2    Leader: 3       Replicas: 3     Isr: 3

消费信息
交互式的生成消息到topic mytopic中

./kafka-console-producer.sh  --broker-list 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic mytopic


ctrl +c  结束查看消费消息

./kafka-console-consumer.sh  --bootstrap-server 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic
mytopic./kafka-console-consumer.sh  --bootstrap-server 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic
mytopic--from-beginning  #从头开始消费

删除topic

[root@localhost bin]# ./kafka-topics.sh  --delete --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181
--topic mytopic[root@localhost bin]# ./kafka-topics.sh  --delete --zookeeper 10.10.10.71:2181,10.10.10.72:2181,10.10.10.73:2181
--topic mytopic
Topic mytopic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/94966.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

K8S访问控制------认证(authentication )、授权(authorization )、准入控制(admission control )体系

一、账号分类 在K8S体系中有两种账号类型:User accounts(用户账号),即针对human user的;Service accounts(服务账号),即针对pod的。这两种账号都可以访问 API server,都需要经历认证、授权、准入控制等步骤,相关逻辑图如下所示: 二、authentication (认证) 在…

Linux学习之vsftpd虚拟用户

/etc/vsftpd/vsftpd.conf里边有几项跟vsftpd虚拟用户有关的主要配置: guest_enableYES,允许匿名用户登录vsftpd guest_usernamevirtual,指定虚拟用户账户为virtual,就是把虚拟用户映射成Linux本地用户,这样可以使用Lin…

IP地址、网关、网络/主机号、子网掩码关系

一、IP地址 IP地址组成 IP地址分为两个部分:网络号和主机号 (1)网络号:标识网段,保证相互连接的两个网段具有不同的标识。 (2)主机号:标识主机,同一网段内,主机之间具有相同的网…

自然语言处理(六):词的相似性和类比任务

词的相似性和类比任务 在前面的章节中,我们在一个小的数据集上训练了一个word2vec模型,并使用它为一个输入词寻找语义相似的词。实际上,在大型语料库上预先训练的词向量可以应用于下游的自然语言处理任务,为了直观地演示大型语料…

详解 SpringMVC 的 @RequestMapping 注解

文章目录 1、RequestMapping注解的功能2、RequestMapping注解的位置3、RequestMapping注解的value属性4、RequestMapping注解的method属性5、RequestMapping注解的params属性(了解)6、RequestMapping注解的headers属性(了解)7、Sp…

面试官:说一下 MyBatis 的一级缓存和二级缓存 ?

目录 1. MyBatis 的缓存机制 2. 为什么不默认开启 MyBatis 的二级缓存 3. MyBatis 如何开启二级缓存 4. MyBatis 有哪些缓存清除策略 1. MyBatis 的缓存机制 MyBayis 中包含两级缓存:一级缓存和二级缓存 1. 一级缓存是 SqlSession 级别的,是 MyBati…

swagger 接口测试,用 python 写自动化时该如何处理?

在使用Python进行Swagger接口测试时,可以使用requests库来发送HTTP请求,并使用json库和yaml库来处理响应数据。以下是一个简单的示例代码: import requests import json import yaml# Swagger API文档地址和需要测试的接口路径 swagger_url …

Nginx从入门到精通(超级详细)

文章目录 一、什么是Nginx1、正向代理2、反向代理3、负载均衡4、动静分离 二、centos7环境安装Nginx1、安装依赖2、下载安装包3、安装4、启动5、停止 三、Nginx核心基础知识1、nginx核心目录2、常用命令3、默认配置文件讲解4、Nginx虚拟主机-搭建前端静态服务器5、使用nignx搭建…

Docker技术--Docker简介和架构

1.Docker简介 (1).引入 我们之前学习了EXSI,对于虚拟化技术有所了解,但是我们发现类似于EXSI这样比较传统的虚拟化技术是存在着一定的缺陷:所占用的资源比较多,简单的说,就是你需要给每一个用户提供一个操作平台,这一个…

npm报错sass

1.删除node模块 2.删除node-sass: npm uninstall node-sass 3.重新下载对应版本node-sass: npm i node-sass7.0.3(指定版本 控制台报错什么版本就写什么版本) 4.再运行项目 或者

Java运行时jar时终端输出的中文日志是乱码

运行Jar时在控制台输出的中文日志全是乱码,这是因为cmd/bash默认的编码是GBK,只要把cmd的编码改成UTF-8即可 两种方式修改:临时修改和注册表永久修改 临时修改 只对当前的cmd页面有效,关闭后重新打开都会恢复成GBK, 打开cmd&am…

实战黑马苍穹外卖项目8.1-10.1

文章目录 软件开发的基本流程用户层网关层应用层数据层工具 数据库设计导入准备好的前端和后端工程基础工程代码分析完成员工功能完成菜品功能入门Redis实现店铺营业HttpClient微信小程序开发缓存Spring Cache实现地址功能用户下单实现订单推送状态apache对应的工具使用项目用到…