Kafka—ISR机制

ISR机制
Kafka 中的 ISR(In-Sync Replicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR 是一组副本,它包括分区的领导者(Leader)和追随者(Follower)副本,这些副本与领导者保持数据同步。

ISR 关键概念

领导者和追随者:每个分区有一个领导者和零个或多个追随者。领导者负责处理客户端的写请求,而追随者主要用于数据复制。

  • ISR 集合:ISR 集合是分区领导者的一组追随者副本,它们与领导者保持数据同步。只有在 ISR 集合中的追随者副本可以参与数据的写入和读取操作。

  • 数据复制:领导者将消息写入其本地日志,并定期将这些消息发送给 ISR 集合中的追随者。追随者接收消息后,将其写入本地日志,以保持数据同步。

  • Leader Epoch 和 Log Start Offset:ISR 集合中的每个追随者都维护了领导者的日志信息,包括领导者的 Leader Epoch 和 Log Start Offset。这些信息用于确保数据的正确复制和同步。

  • 数据一致性:只有在 ISR 集合中的所有追随者都成功复制了一条消息后,领导者才会将该消息标记为已提交,确保数据的一致性。

  • 故障处理:如果某个追随者发生故障或者追赶进度过慢,那么该追随者可能会被从 ISR 集合中移除。这有助于保持数据的可靠性和避免影响性能。

其中,需要注意的的概念:

  • 分区中的所有副本统称为AR(Assigned Replicas)。

  • 所有Leader副本加上和Leader副本保持同步的Follower副本组成ISR(In-Sync Replicas)。

  • 所有没有保持同步的Follower副本组成OSR(Out-of-Sync Replicas)。

  • AR = ISR + OSR。正常情况下,所有Follower副本都应该和Leader副本一致,即AR=ISR。

  • 当Leader故障时,在ISR集合中的Follower才有资格被选举为新的Leader。

HW和LEO
在 Kafka 中,HW(High Watermark)和 LEO(Log End Offset)是与数据复制和消费有关的两个重要概念。

HW(High Watermark):HW 是指在分区中,已经被所有追随者(Follower)副本复制的消息的位置。HW 是每个分区的属性,它表示已经提交的消息。只有在 HW 之前的消息才被认为是已经提交的,这些消息已经被写入分区的所有追随者副本,并且被认为是安全的,不会丢失。HW 是为了确保数据一致性和可靠性而引入的。

LEO(Log End Offset):LEO 是指在分区中当前最新消息的位置。LEO 表示分区日志中的最后一条消息的偏移量。LEO 包括已经被写入但尚未被所有追随者副本复制的消息,以及正在等待被写入的消息。LEO 是一个动态的属性,它会随着新消息的写入而逐渐增加。

HW 和 LEO 之间的关系非常重要,它们可以帮助确保数据的可靠性和一致性

  • HW 之前的消息是已经提交的消息,它们在数据复制中是安全的,不会丢失。

  • LEO 之前的消息是已经写入但尚未被所有追随者副本复制的消息。这些消息可能会在 HW 之前被提交,也可能会在之后被提交。

  • 一旦 HW 追赶上 LEO,表示所有的消息都已经提交,分区的数据一致性得到了保障。

Kafka的消息同步流程:

  1. 初始状态,HW和LEO在同一位置。消费者可以读取的有效消息为0,1,2,3.
    在这里插入图片描述
  2. 消息写入Leader,LEO位置改变。Follower进行同步。

在这里插入图片描述
3. Follower同步进度决定HW位置,消费者可读的有效消息0,1,2,3,4。
在这里插入图片描述

  1. 完成同步,消费者可读的有效消息0,1,2,3,4,5,6。

在这里插入图片描述
可以看出,Kafka的复制机制既不是完全的同步复制,也不是单纯异步复制。

  • 同步复制要求所有Follower副本都复制完,太影响性能了。

  • 异步复制只要数据被写入Leader副本就认为提交成功,在这种情况下,如果Leader宕机时候Follower还是落后于Leader就会造成数据丢失。

而Kafka使用的ISR机制则有效地权衡了数据可靠性和性能之间的关系。

Java使用Kafka通信

以下是 Kafka 生产者和消费者的简单示例,使用 Kafka 的 Java 客户端库(Kafka Producer 和 Kafka Consumer)来创建一个基本的消息传递示例。
Kafka 生产者示例

import org.apache.kafka.clients.producer.*;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {String bootstrapServers = "localhost:9092"; // Kafka 服务器地址String topic = "my-topic"; // Kafka 主题名称Properties properties = new Properties();properties.put("bootstrap.servers", bootstrapServers);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(properties);// 发送消息producer.send(new ProducerRecord<>(topic, "key", "Hello, Kafka!"), (metadata, exception) -> {if (exception == null) {System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());} else {System.err.println("Error sending message: " + exception.getMessage());}});
​producer.close();}
}

Kafka 消费者示例

import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.time.Duration;
import java.util.Collections;public class KafkaConsumerExample {public static void main(String[] args) {String bootstrapServers = "localhost:9092"; // Kafka 服务器地址String groupId = "my-group"; // 消费者组 IDString topic = "my-topic"; // Kafka 主题名称Properties properties = new Properties();properties.put("bootstrap.servers", bootstrapServers);properties.put("group.id", groupId);properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: key = " + record.key() + ", value = " + record.value());}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/611355.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

element问题总结之el-table使用fixed固定列后滚动条滑动到底部或者最右侧的时候错位问题

el-table使用fixed固定列后滚动条滑动到底部或者最右侧的时候错位 效果图前言解决方案纵向滑动滚动条滑动到底部的错位解决横向滚动条滑动到最右侧的错位解决 效果图 前言 在使用el-table固定行的时候移动滚动条会发现移动到底部或者移动到最右侧的时候会出现表头和内容错位或…

2024-4-11-arm作业

汇编实现三个灯的闪烁 源代码&#xff1a; .text .global _start _start: 时钟使能LDR r0,0x50000A28ldr r1,[r0]orr r1,r1,#(0x3<<4)str r1,[r0]设置PE10输出LDR r0,0x50006000ldr r1,[r0]bic r1,r1,#(0x3<<20)orr r1,r1,#(0x1<<20)str r1,[r0]设置PE1…

Android源码解析之截屏事件流程

今天这篇文章我们主要讲一下Android系统中的截屏事件处理流程。用过android系统手机的同学应该都知道&#xff0c;一般的android手机按下音量减少键和电源按键就会触发截屏事件&#xff08;国内定制机做个修改的这里就不做考虑了&#xff09;。那么这里的截屏事件是如何触发的呢…

test4122

欢迎关注博主 Mindtechnist 或加入【Linux C/C/Python社区】一起学习和分享Linux、C、C、Python、Matlab&#xff0c;机器人运动控制、多机器人协作&#xff0c;智能优化算法&#xff0c;滤波估计、多传感器信息融合&#xff0c;机器学习&#xff0c;人工智能等相关领域的知识和…

Shenandoah GC算法

概述 最早由Red Hat公司发起&#xff0c;目标是利用现代多核CPU的优势&#xff0c;减少大堆内存在GC时产生的停顿时间。随OpenJDK 12一起发布&#xff0c;暂停时间不依赖于堆的大小&#xff1b;这意味着无论堆的大小如何&#xff0c;暂停时间都是差不多的。 Shenandoah最初的…

Multisim仿真二极管、晶体管和场效应管学习笔记

Multisim仿真二极管、晶体管和场效应管 &#xff08;note&#xff1a;使用Multisim14.0版本进行仿真&#xff09; 文章目录 Multisim仿真二极管、晶体管和场效应管二极管的I-V特性晶体管的I-V特性场效应管的I-V特性 二极管的I-V特性 插入I-V analyzer 原理图绘制 改变仿真…

【Python】Python城乡人口数据分析可视化(代码+数据集)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

c# .net 香橙派 Orangepi GPIO高低电平、上升沿触发\下降沿触发 监听回调方法

c# .net 香橙派GPIO高低电平、上升沿触发\下降沿触发 监听回调方法 通过gpio readall 查看 gpio编码 这里用orangepi zero3 ,gpio= 70为例 当gpio 70 输入高电平时,触发回调 c# .net 代码 方法1: Nuget 包 System.Device.Gpio ,微软官方库对香橙派支持越来越好了,用得…

日程安排组件DHTMLX Scheduler v7.0新版亮点 - 拥有多种全新的主题

DHTMLX Scheduler是一个类似于Google日历的JavaScript日程安排控件&#xff0c;日历事件通过Ajax动态加载&#xff0c;支持通过拖放功能调整事件日期和时间&#xff0c;事件可以按天、周、月三个种视图显示。 备受关注的DHTMLX Scheduler 7.0版本日前正式发布了&#xff0c;如…

为什么你选择成为一名程序员?

逐码探梦&#xff1a;我选择程序员之路 在数字化的纹理中编织梦想&#xff0c;于逻辑的海洋里追寻真理&#xff0c;程序员&#xff0c;这个职业对我而言不仅仅是一份工作&#xff0c;更是一扇通向无限可能性的大门。选择成为一名程序员&#xff0c;是一个交织着兴趣和职业规划…

Python中sort()函数、sorted()函数的用法深入讲解(具体实例:蓝桥杯数位排序)

前置知识&#xff1a; 可迭代对象的定义&#xff1a;可迭代对象是指可以被迭代或遍历的对象&#xff0c;即可以使用循环结构对其进行逐个访问的对象。 在Python中常见的可迭代对象有&#xff1a;列表(list)、元组&#xff08;tuple&#xff09;、字符串&#xff08;sting&…

微信小程序 django+nodejs电影院票务售票选座系统324kd

小程序Android端运行软件 微信开发者工具/hbuiderx uni-app框架&#xff1a;使用Vue.js开发跨平台应用的前端框架&#xff0c;编写一套代码&#xff0c;可编译到Android、小程序等平台。 前端&#xff1a;HTML5,CSS3 VUE 后端&#xff1a;java(springbootssm)/python(flaskdja…