kafka知识点汇总

kafka是什么?

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

消息队列的两种模式

  1. 点对点模式
    一个生产者对应一个消费者,消费者消费消息后消息队列删除消息。
  2. 发布/订阅模式
  • 可以有多个topic
  • 消费者消费数据后,不删除数据
  • 每个消费者相互独立,都可以消费到数据。

Kafka基础架构

在这里插入图片描述
kafa的架构是由生产者、kafka集群、消费者、Zookeeper组成的。生产者生产消息,发送到kafka集群,消费者从kafka集群消费消息,Zookeeper存储Kafka相关的元数据信息。
kafka由多个Broker(每个Broker就是一个Kafka服务实例)节点组成。
kafka里面的消息按主题进行管理,每个主题可以分成多个分区,每个分区会有一个或多个副本,分区和副本分别称为Leader分区和Follower分区,Leader分区和Follower分区在不同的Broker上,当Leader分区无法提供服务时,Follower分区会升级成Leader分区为消费者和生产者提供服务,Follower只负责同步数据,不提供服务。
消费者负责消费消息,多个消费者可以组成一个消费者组共同消费一个主题的消息,一个分区只能有消费者组中的一个消费者消费。

ISR、OS、AR

kafka中所有的Partition(Leader+Follower)称为AR(Assigned Replicas),与Leader保持同步的Partition集合称为ISR(In-Sync Replicas),与Leader不同步的Partition则称为OSR(Out-of-Sync Replicas),ISR的维护是根据Follower的同步情况实时维护的。Leader节点选举的时候,从AR中取在ISR中的第一个节点作为Leader。

生产者消息发送流程

在这里插入图片描述
生成者生产消息,然后消息经过拦截器做一些业务处理,然后通过序列化器做消息序列化,接着通过分区器对消息进行分区,分区器里面可以指定消息发往那个分区。
生产者将消息分好区后发送到RecordAccumulator中进行缓存,该缓冲器默认大小是32M,不同分区的消息不缓存到不同的内存缓存队列(ProducerBatch)中。
Sender线程负责不断的从RecordAccumulator中拉取消息发送到KafkaBroker,Sender线程拉取消息的策略可以通过batch.size和linger.ms来配置。

  • batch.size配置ProducerBatch中数据累积到一个阈值后,Sender才会拉取数据,默认是16K。
  • linger.ms配置sender拉取数据的时间间隔,默认为0;
  • Sender拉取数据的条件就是ProducerBatch中数据累积达到batch.size或者时间间隔达到linger.ms,linger.ms为0表示,数据来了就会立马被Sender拉走。

Sendder内部通过InFightRequest来缓存数据发送到Broker但尚未收到应答的请求,底层通过Selector进行数据传输。 应答ACK有三种配置策略。

  • 0:生产者发送过来的数据,不需要等待数据落盘应答,可靠性差,效率高。
  • 1:生产者发送过来的数据,Leader收到数据后应答,可靠性中等,效率中等。
  • -1/all:生成者发送过来的数据,需要Leader和ISR队列里面的所有节点收齐数据后应答,可靠性高,效率低。

kafka消息的顺序

kafka只能在一定条件下保证单分区消息的有序。

  • 在1.x版本之前,需要配置max.in.flight.requests.per.connection=1,也就是Sender线程向Broker发送请求的InFightRequest只缓存一个连接。保证前一条消息成功落地后才发送下一条。
  • 在1.x版本后提供类消息的幂等性,在开启幂等性的前提下,max.in.flight.requests.per.connection <= 5即可,未开启幂等性的情况下,max.in.flight.requests.per.connection=1。

Kafka中的几个偏移量

Kafka每个Partition中的消息都是append进去的,Kafka用几个重要的偏移量对消息进行维护。
在这里插入图片描述
每个Partition分别维护着这些偏移量,用于数据同步和Leader重新选举后恢复数据一致性。
新的Leader出现后,如果其他Follower的HW小于新Leader的HW,则会先截取新HW之后的数据再开始同步,所以,如果Leader重选的Partition的HW小于先前的Leader的HW,则会出现消息丢失

消息丢失场景

生产者端

  1. ACK配置为0;
  2. ACK配置为1,消息发到Leader成功,但是Leader还没来得及同步到Follower就挂掉了。
  3. unclean.leader.election.enable配置为true,允许选举ISR以外的副本作为Leader,会导致数据丢失,默认为false。

解决方案:

  1. ACK配置为all/-1.
  2. 配置:min.insync.replicas>1,副本指定必须确认写操作成功的最小副本数量。
  3. 生产者发送消息会自动重试,要不可恢复的异常会抛出,这个时候可以捕获异常对发送失败的消息单独处理。

消费者端

  1. 先commit再处理消息,如果处理消息时遇到异常,但offset已经提交,则消息会丢失。

解决方案:先处理再commit。

如果先处理消息再commit,消息处理完成后,commit提交失败则会导致重复消费问题。

Broker
broker在消息刷盘到磁盘之前挂掉,则会导致未刷盘的消息丢失。

解决方案:减少刷盘间隔。

如何保障消息不被重复消费

MQ无法解决消息的重复消费,所以需要消费者来保障消息的不被重复消费,可以采用幂等来解决(一个数据或请求重复,确保对应数据不会改变,不能出错)。

  • 如果是写redis等缓存,则天然幂等。
  • 生产者发送消息带上一个全局唯一的id,消费者拿到消息后,先更加这个id去redis里查一下,如果消费过则不再消费。
  • 基于数据库的唯一键。

Kafa脚本

kafka-topics.sh

参数描述
–bootstrap-server <String:server toconnect to>连接的KafkaBroker主机名称和端口号
–topic <String:topic>操作的topic名称
–create创建主题
–delete删除主题
–alter修改主题
–list查看所有主题
–describe查看主题详细描述
–partitions <Integer:#of partitions>设置分区数
–replication-factor <Intger:replication factor>设置分区的副本数
–config <String:name=value>更新系统默认配置

创建主题

bash./kafka-topics.sh --zookeeper localhost:2181 --create --topic topic_name --partitions partition_num --replication-factor replication_num

其中:

–zookeeper localhost:2181:指定Zookeeper的地址。
–create:表示创建主题。
–topic topic_name:指定主题名称。
–partitions partition_num:指定主题的分区数。
–replication-factor replication_num:指定主题的副本数。

分区数只能增加,不能减少;
命令行不能修改副本数;

kafka-console-producer.sh

bash./kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name
> message

其中:

–broker-list localhost:9092:指定Kafka的地址。
–topic topic_name:指定要发送消息到的主题。
–message`:要发送的消息内容。

kafka-console-consumer.sh

bash./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_name --from-beginning

其中:
–bootstrap-server localhost:9092:指定Kafka的地址。
–topic topic_name:指定要消费的主题。
–from-beginning:从消息队列头部开始消费。

是否从头消费根据实际情况决定。

SpringBoot集成Kafka

maven

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

customer

@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic",groupId = "my-group")public void kafkaListen(String message){System.out.println(message);}
}

producer

@RestController
@RequestMapping("/api/kafka")
public class KafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("send")public Result send(String topic,String msg){kafkaTemplate.send(topic,msg);return Result.ok();}
}

消费者配置:
auto.offset.rese:

  • earliest:消费未消费过的数据。
  • latest:消费最新的消息,消费者启动前产生的消息不会再次消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/111888.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

K8S:pod资源限制及探针

文章目录 一.pod资源限制1.pod资源限制方式2.pod资源限制指定时指定的参数&#xff08;1&#xff09;request 资源&#xff08;2&#xff09; limit 资源&#xff08;3&#xff09;两种资源匹配方式 3.资源限制的示例&#xff08;1&#xff09;官网示例&#xff08;2&#xff0…

2023年最热门的编程语言:前进的趋势和机会

2023年最热门的编程语言&#xff1a;前进的趋势和机会 2023年最热门的编程语言&#xff1a;前进的趋势和机会摘要引言1. 编程语言的热门趋势1.1 新兴编程语言的崛起1.2 编程语言的可持续性发展1.3 跨平台编程语言的兴起1.4 人工智能和机器学习编程语言的需求 2. 编程语言职业机…

在PHP8中对数组进行排序-PHP8知识详解

在php8中&#xff0c;提供了丰富的排序函数&#xff0c;可以对数组进行排序操作。常见的排序函数如下几个&#xff1a;sort() 函数、rsort() 函数、asort() 函数、arsort() 函数、ksort() 函数、krsort() 函数、natsort()函数和natcascsort()函数。 1、sort() 函数&#xff1a;…

行业追踪,2023-09-18

自动复盘 2023-09-18 凡所有相&#xff0c;皆是虚妄。若见诸相非相&#xff0c;即见如来。 k 线图是最好的老师&#xff0c;每天持续发布板块的rps排名&#xff0c;追踪板块&#xff0c;板块来开仓&#xff0c;板块去清仓&#xff0c;丢弃自以为是的想法&#xff0c;板块去留让…

PHP8中获取并删除数组中最后一个元素-PHP8知识详解

在php8中&#xff0c;array_pop()函数将返回数组的最后一个元素&#xff0c;并且将该元素从数组中删除。语法格式如下&#xff1a; array_pop(目标数组) 获取并删除数组中最后一个元素&#xff0c;参考代码&#xff1a; <?php $stu array(s001>明明,s002>亮亮,s…

Vulnhub系列靶机---XXE Lab: 1

文章目录 信息收集主机发现端口扫描目录扫描 漏洞利用 靶机文档&#xff1a;XXE Lab: 1 下载地址&#xff1a;Download (Mirror) 告诉了利用点&#xff1a;http://your-ip/xxe 信息收集 主机发现 端口扫描 目录扫描 访问robots.txt 访问/xxe 随意输入账密&#xff0c;登陆抓包…

Ansible自动化:简化你的运维任务

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

【数据结构】堆的应用+TOP-K问题+二叉树遍历

欢迎来到我的&#xff1a;世界 希望作者的文章对你有所帮助&#xff0c;有不足的地方还请指正&#xff0c;大家一起学习交流 ! 目录 前言堆的时间复杂度向下调整算法的时间复杂度向上调整算法的时间复杂度 堆的应用堆排序TOP—K问题链式二叉树二叉树的节点&#xff1a;初始化节…

leetcode725. 分隔链表(java)

分隔链表 题目描述拆分链表代码演示 题目描述 给你一个头结点为 head 的单链表和一个整数 k &#xff0c;请你设计一个算法将链表分隔为 k 个连续的部分。 每部分的长度应该尽可能的相等&#xff1a;任意两部分的长度差距不能超过 1 。这可能会导致有些部分为 null 。 这 k 个部…

倾情奉献,纯css(无图,无JS)原创中秋贺卡!!!

&#x1fab4; 背景故事 中秋节马上就要到了&#xff0c;在这里我提前祝大家生活美满万年长&#xff0c;阖家幸福永平安&#xff01;&#x1f973; 好了进入正题&#xff0c;最近掘金出了一个“中秋创意投稿”活动&#xff0c;我向来对这种可以写一些具有创意性的代码的活动很…

二叉树题目:层数最深叶子结点的和

文章目录 题目标题和出处难度题目描述要求示例数据范围 解法一思路和算法代码复杂度分析 解法二思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;层数最深叶子结点的和 出处&#xff1a;1302. 层数最深叶子结点的和 难度 4 级 题目描述 要求 给定一个二叉树…

Qt的ui文件不能简单复制

在使用vsQt开发时&#xff0c;直接复制另外一个widget类的ui文件&#xff0c;简单改名成当前类对应的ui文件&#xff0c;会导致编译出错。尽可能使用添加的Qt class自带的ui文件&#xff0c;因为ui文件的配置文件中有许多与当前类相关的字符串&#xff0c;简单复制容易报错。