kafka学习笔记--如何保证生产者数据可靠、不重复、有序

本文内容来自尚硅谷B站公开教学视频,仅做个人总结、学习、复习使用,任何对此文章的引用,应当说明源出处为尚硅谷,不得用于商业用途。
如有侵权、联系速删
视频教程链接:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)


PS:本节内容尚硅谷的视频讲的不太友好,又查了很多资料才搞明白

文章目录

  • 数据可靠性
  • 数据不重复
  • 数据有序性

数据可靠性

首先数据的可靠性指的是:

  • 消息不会意外丢失
  • 消息不会重复传递

那回顾我们的数据发送流程,在确认数据发送成功的这一步,也就是ack应答这里,不同的参数对应着不同的策略,如果选择了0和1,则存在丢数的问题,如图:
0: 如果数据发送到某个主题的leader时,leader所在节点挂了,那么这条消息就丢失了
在这里插入图片描述
1: 同理,leader收到了,还没应答时挂了,也会丢数据
在这里插入图片描述
-1(all): 使用-1能保证数据落配盘后才回答,保证数据不丢失
在这里插入图片描述
但是,如果Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?

这就引出了ISR队列的概念了

ISR,是一个机制,也代表着一个同步合集,是由Leader维护的一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。它包含着所有处于同步状态的副本。当一个副本和Leader副本的差距超过一定程度时,这个副本就会被认为是不同步的,不再被加入到ISR中。也因此,Kafka中的 ISR 并不是一直不变的


那么,既然ISR是动态的,那哪些副本会被包含在ISR中呢?


主要依据就是 副本需要保证能够及时地接收并复制Leader副本的消息,也就是需要保证与leader副本的消息同步延迟在一定的时间范围内(默认情况下是10秒钟,由参数 replica.lag.time.max.ms 控制)。

换而言之,因为分区与ISR机制,我们的消息一旦被Kafka 接收后,就会复制多份并很快落盘。这意味着,即使某一台Broker节点宕机乃至硬盘损毁,也不会导致数据丢失。

我们将ISR与ACK应答结合起来使用,就形成了数据可靠条件

  • 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

数据不重复

上面讲解的,只能保证数据可靠,但是这又引出了一个新的问题
如果,leader在同步完成之后,向生产者回答时,挂掉了,这时候剩下的备份分区会自动选举出一个新leader出来,但是生产者并不知道它挂掉了,只会以为是消息发送失败了,触发重试,又将数据发送了一遍,然后新的leader就又接受了一遍消息,然后在备份分区上再存一遍。这就导致了这条消息存在两份,产生数据重复问题。
在这里插入图片描述
那么kafka是怎么保证数据不重复的呢?
其实这就是数据的幂等性问题了,幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

kafka默认启用数据幂等性,即设置 enable.idempotence = true

在生产者发消息时,这条消息是有它自己的属性的,其中有三个数据被拿来作为数据的主键,kafka会以此来判断这条消息是否重复,若重复,则只保留一条

PID:又叫生产者编号(producerid), Producer在初始化的时候(只有初始化的时候会随机生成PID,也就是重启就会再次生成)会被分配一个PID

Partition:又叫分区编号,即这条消息要发往的分区的paritionid

SeqNumber:又叫序列号,发往同一Partition的消息会附带Sequence Number(即发送数据的编号,代表着向分区发送的第几条消息)

这样<PID, PartitionID, SeqNumber>就相当于构成了一个主键。Broker端会对<PID, PartitionID, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条,这样就保证了数据的唯一,不重复。

但是幂等性只能保证的是在单分区单会话内不重复,如果发消息时生产者挂掉了,重启后它不知道是否发送成功了,又将这个消息再发送一遍,此时它的PID发生变化,那么这条消息就被认为是一条新的消息,导致重复存储,这种情况怎么解决呢?

这就要引入kafka的事务机制了,事务这个东西大家都知道啥意思,不再重复解释

我们通过事务,让客户端挂掉后继续处理,而不是重新从头来过,保证消息的仅一次发送

注意:开启事务,必须开启幂等性。

请添加图片描述

kafka使用事务,有5个API

// 初始化事务
void initializeTransactions ();// 开启事务
void beginTransaction () throws ProducerFencedException;// 在事务中提交已消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets, String consumerGroupId) throws ProducerFencedException;// 提交事务
void commitTransaction () throws ProducerFencedException;// 放弃事务(类似于回滚事务的操作)
void abortTransaction () throws ProducerFencedException;

举个例子:

package com.atguigu.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Test {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put("bootstrap.servers", "hadoop102:9092");properties.put("key.serializer", StringSerializer.class.getName());properties.put("value.serializer", StringSerializer.class.getName());properties.put("transactional.id", "transaction_id_0");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 初始化事务kafkaProducer.initTransactions();// 开启事务kafkaProducer.beginTransaction();try {// 4. 调用 send 方法,发送消息// 发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));}// int i = 1 / 0;// 提交事务kafkaProducer.commitTransaction();} catch (Exception e) {// 终止事务kafkaProducer.abortTransaction();} finally {// 5. 关闭资源kafkaProducer.close();}}
}

数据有序性

如果某主题TOPIC只有一个分区,那么它天生有序,因为分区其实就是一个有序队列

如果是多分区的,kafka是通过滑动窗口的思想解决这个问题的

我们知道kafka发送请求时,最多缓存5个,其实在发送时,每个请求都有自己的单调递增编号,kafka broker在接收数据时,会自动按照编号将数据排序,并且如果其中一个编号的请求失败时,后续再次成功,数据过来后,会自动的根据编号插入到应该在的位置上
请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/264354.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

区块链媒体宣发:揭示优势与趋势,引领信息传播新时代

在数字化潮流中&#xff0c;区块链技术正以惊人的速度改变着传媒行业的格局。从区块链媒体宣发中获得的种种优势和未来的趋势&#xff0c;不仅为企业带来了新的推广途径&#xff0c;也在信息传播领域掀起了一场革命。本文将深入探讨区块链媒体宣发的优势以及未来的发展趋势。 1…

Android12之解决:scripts/gcc-wrapper.py, line 79, in run_gcc(一百六十八)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

多维时序 | MATLAB实现TSOA-TCN-Multihead-Attention多头注意力机制多变量时间序列预测

多维时序 | MATLAB实现TSOA-TCN-Multihead-Attention多头注意力机制多变量时间序列预测 目录 多维时序 | MATLAB实现TSOA-TCN-Multihead-Attention多头注意力机制多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 MATLAB实现TSOA-TCN-Multihead-…

计网Lesson8 - NAT技术与链路层概述

文章目录 NAT 技术1. 因特网的接入方式2. 公网和私网3. NAT 技术 链路层1. 数据链路层概述2. 数据链路层的三个问题2.1 封装成帧2.2 透明传输2.3 差错检测 NAT 技术 1. 因特网的接入方式 光猫将电信号转换为数字信号发送给路由器 光纤入户 光纤传递的就是数字信号&#xff0c…

【Linux】进程间通信之共享内存/消息队列/信号量

文章目录 一、共享内存的概念及原理二、共享内存相关接口说明1.shmget函数2.ftok函数3.shmat函数4.shmdt函数5.shmctl函数 三、用共享内存实现server&client通信1.shm_server.cc2.shm_client.cc3.comm.hpp4.查看ipc资源及其特征5.共享内存的优缺点6.共享内存的数据结构 四、…

neuq-acm预备队训练week 8 P1144 最短路计数

题目描述 给出一个 N 个顶点 M条边的无向无权图&#xff0c;顶点编号为 1∼N。问从顶点 1 开始&#xff0c;到其他每个点的最短路有几条。 题目限制 输入格式 第一行包含 22 个正整数 N,M&#xff0c;为图的顶点数与边数。 接下来 M 行&#xff0c;每行 2个正整数 x,y&…

Docker容器的可视化管理工具—DockerUI本地部署与远程访问

文章目录 前言1. 安装部署DockerUI2. 安装cpolar内网穿透3. 配置DockerUI公网访问地址4. 公网远程访问DockerUI5. 固定DockerUI公网地址 前言 DockerUI是一个docker容器镜像的可视化图形化管理工具。DockerUI可以用来轻松构建、管理和维护docker环境。它是完全开源且免费的。基…

浅析不同NAND架构的差异与影响

SSD的存储介质是什么&#xff0c;它就是NAND闪存。那你知道NAND闪存是怎么工作的吗&#xff1f;其实&#xff0c;它就是由很多个晶体管组成的。这些晶体管里面存储着电荷&#xff0c;代表着我们的二进制数据&#xff0c;要么是“0”&#xff0c;要么是“1”。NAND闪存原理上是一…

MEMS制造的基本工艺介绍——晶圆键合

晶圆键合是一种晶圆级封装技术&#xff0c;用于制造微机电系统 (MEMS)、纳米机电系统 (NEMS)、微电子学和光电子学&#xff0c;确保机械稳定和气密密封。用于 MEMS/NEMS 的晶圆直径范围为 100 毫米至 200 毫米&#xff08;4 英寸至 8 英寸&#xff09;&#xff0c;用于生产微电…

idea 本身快捷键ctrl+d复制 无法像eclipse快捷键ctrl+alt+上下键,自动换行格式问题解决

问题 例如我使用ctrld 想复制如下内容 复制效果如下&#xff0c;没有自动换行&#xff0c;还需要自己在进行调整 解决 让如下快捷键第一个删除 修改成如下&#xff0c;将第二个添加ctrld 提示&#xff1a;对应想要修改的item&#xff0c;直接右键&#xff0c;remove是删…

计算机服务器中了Mallox勒索病毒怎么解密,Mallox勒索病毒解密步骤

计算机网络技术的不断发展与应用&#xff0c;为企业的生产运营提供了坚实的基础&#xff0c;大大提高了企业的生产与工作效率&#xff0c;但随之而来的网络安全威胁也在不断增加。在本月&#xff0c;云天数据恢复中心接到了很多企业的求助&#xff0c;企业的计算机服务器遭到了…

亚马逊云科技re_Invent 2023产品体验:亚马逊云科技产品应用实践 国赛选手带你看Elasticache Serverless

抛砖引玉 讲一下作者背景&#xff0c;曾经参加过国内世界技能大赛云计算的选拔&#xff0c;那么在竞赛中包含两类&#xff0c;一类是架构类竞赛&#xff0c;另一类就是TroubleShooting竞赛&#xff0c;对应的分别为AWS GameDay和AWS Jam&#xff0c;想必也有朋友玩过此类竞赛&…