Kafka学习笔记(高级篇)

目录

高级功能

高效读写

涉及技术

ZooKeeper

自定义拦截器

监控

延迟消费

一些改进手段


高级功能

高效读写

涉及技术

  • 高吞吐量:Kafka 每秒可以处理数百万消息。这是因为 Kafka 消息的处理是以批处理(Batching)的方式来完成的,生产者可以将多个消息一起发送到 Kafka 集群,以减少网络开销以及加速处理速度。
  • 低延迟:Kafka 利用磁盘存储加缓存,可以在微秒级别内完成消息处理。Kafka 具有高效的消息传递能力,也可以在微秒级别内完成消息处理。这是由于 Kafka 的消息存储设计是基于磁盘的,但同时消息缓存也是放在内存里的。也就是说,在处理消息时,Kafka 集群会先将消息写入到磁盘中进行持久化存储,并且在内存中缓存一份消息以便进行更快的消息传递和读取。
  • 分布式架构:Kafka 采用分布式的架构设计,可以通过水平扩展增加集群规模和负载容量。集群中的每个节点都可以独立完成消息处理和写入,可以有效地提高整个系统的吞吐量。
  • 高可靠性:Kafka 在存储消息时,使用了多副本机制,可以保证消息的高可靠性。当消息发送失败或者其中一个节点失效时,可以通过复制副本来实现自动故障转移,以确保消息的可靠性、可用性与一致性。
  • 顺序写:Kafka 内部的消息存储结构是一个连续的、顺序写入的日志文件(Log File)集合,也称“分区”(Partition)。分区中的每一条消息都被分配一个唯一的偏移量(Offset),并且保留在磁盘上直到被消费。通过这种消息存储方式,Kafka 可以实现高效的顺序写入操作。因为 Kafka 可以将流式的消息按顺序追加到 Log 文件的末尾,这避免了随机写入所产生的磁盘寻址和寻道时间,从而大大提高了写入性能,并降低了延迟。此外,由于只有新的消息会追加到 Log 文件中,而没有数据被修改或删除,因此,读取数据时,Kafka 也可以通过顺序扫描磁盘获取最新的消息,这样也大大提高了读取数据的效率。
  • 数据压缩:Kafka 提供了数据压缩功能,可以将传输的消息进行压缩和解压,减少了磁盘和网络带宽的使用。
  • 零拷贝:Kafka 零拷贝技术可以避免在传输数据时进行数据缓冲和复制,从而减少了 CPU 和内存的使用,提高了性能。

ZooKeeper

Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线、所有topic的分区副本分配和leader的选举等工作。Controller的工作管理是依赖于zookeeper的。

Partition的Leader的选举过程

Partition的Leader选举流程

自定义拦截器

拦截器原理

Producer拦截器interceptor是在Kafka0.10版本引入的,主要用于Clients端的定制化控制逻辑。对于Producer而言,interceptor使得用户在消息发送之前以及Producer回调逻辑之前有机会对消息做一些定制化需求,比如修改消息的展示样式等,同时Producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链interceptor chain,Interceptor实现的接口为ProducerInterceptor,主要有四个方法:

  • configure(Map<String, ?> configs):获取配置信息和初始化数据时调用
  • onSend(ProducerRecord record):该方法封装在KafkaProducer.send()方法中,运行在用户主线程中,Producer确保在消息被序列化之前及计算分区前调用该方法,并且通常都是在Producer回调逻辑出发之前。
  • onAcknowledgement(RecordMetadata metadata, Exception exception):onAcknowledgement运行在Producer的IO线程中,因此不要再该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
  • close():关闭inteceptor,主要用于执行资源清理工作。

Inteceptor可能被运行到多个线程中,在具体使用时需要自行确保线程安全,另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并紧紧是捕获每个interceptor可能抛出的异常记录到错误日志中而非向上传递。

自定义加入时间戳拦截器

/*** @author caoduanxi* @Date 2021/1/13 14:15* @Motto Keep thinking, keep coding!*/
public class TimeInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),"TimeInterceptor:" + System.currentTimeMillis() + "," + record.value());}// 其余方法省略
}

自定义消息发送统计拦截器

/*** @author caoduanxi* @Date 2021/1/13 14:18* @Motto Keep thinking, keep coding!*/
public class CounterInterceptor implements ProducerInterceptor<String, String> {private int errorCounter = 0;private int successCounter = 0;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if (exception == null) {successCounter++;} else {errorCounter++;}}@Overridepublic void close() {// 输出结果,结束输出System.out.println("Sent successful:" + successCounter);System.out.println("Sent failed:" + errorCounter);}
}

在CustomProducer中加入拦截器

// 加入拦截器
List<Object> interceptors = new ArrayList<>();
interceptors.add(TimeInterceptor.class);
interceptors.add(CounterInterceptor.class);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

注意:拦截器的close()方法是收尾的,一定要调用Producer.close()方法,否则拦截器的close()方法不会被调用。

监控

Eagle

Eagle是开源的额可视化和管理软件,允许查询、可视化、提醒和探索存储在任何地方的指标,简而言之,Eagle为您提供了将Kafka集群数据转换为漂亮的图形和可视化的工具。

实质: 一个运行在tomcat上的web应用

延迟消费

kafka目前默认可支持1h以内的延迟消费。

使用方式:consumer启动参数增加 --delay-time-seconds n 设置消费延迟时间,单位秒,默认不延迟消费。仅能拉取到消费延迟时间之前的消息。

注意:此参数默认限制最大值为3600s,超过限制可能导致consumer启动失败。如有调整最大延迟时间的需求,请联系李锦涛(KIM:lijintao)

注意:消息拉取可能有分钟级别的误差。

注意:由于目前每4kb数据构建一次时间索引,如果最后一批数据的size不够4kb,可能导致这些数据不能被延迟消费到。

一些改进手段

  • Rebalance优化
  • Federation架构应用
  • 存算分离等等

相关推荐文章:

Kafka学习笔记(基础篇)_Cat凯94的博客-CSDN博客

看完这篇Kafka,你也许就会了Kafka_心的步伐的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/26683.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot+Vue的学生选课管理系统

1. 技术栈 前端&#xff1a;Vue ElementUI Axios后端&#xff1a;Spring BootMyBatis Plus Jwt MysqlSwagger 2. 系统设计 该系统主要分为五个模块&#xff0c;分别是&#xff1a;学生管理、教师管理、课程管理、开课表管理以及学生成绩管理 角色分为学生、教师、管理员&…

如何搭建自己的图床(GitHub版)

文章目录 1.图床的概念2.用GitHub创建图床服务器2.1.新建仓库2.2.生成Token令牌2.3.创建img分支和该分支下的img文件夹(可选) 3.使用PicGo软件上传图片3.1 下载PicGo软件3.2配置PicGo3.3用PicGo实现上传 4. Typora实现自动上传5.免费图片网站 前言&#xff1a; 如果没有自己的服…

Xshell配置以及使用教程

目录 一、Xshell介绍 二、安装Xshell 三、使用Xshell连接Linux服务器 一、Xshell介绍 Xshell 分为免费版和专业版&#xff0c;是一款远程连接虚拟机系统的 SSH 客户机软件&#xff1b; Xshell免费版官网下载地址&#xff1a;家庭/学校免费 - NetSarang Websitehttps://www…

半小时漫画计算机

ISBN: 978-7-121-41557-9 作者&#xff1a;刘欣&#xff08;码农翻身&#xff09; 绘画&#xff1a;刘奕君 页数&#xff1a;210页 阅读时间&#xff1a;2023-06-03 推荐指数&#xff1a;★★★★★ 以漫画的形式来讲解计算机的基础知识&#xff0c; 主要涉及到CPU、内存、网络…

“layui助力博客管理升级!用增删改查功能打造优质博客体验“

目录 引文1.前置条件2.数据接口2.1 UserDao(CRUD)2.2 R工具类 3.HTML 结构3.1 主界面的HTML3.2 用户的查询所有界面的HTML3.3 新增修改通用的的HTML 4.JavaScript 代码4.1 用户的CRUD javaScript 代码(userManage)4.2 新增修改的javaScript代码(userEdit) 5. 运行截图总结 引文…

kubernetes中特定域名使用自定义DNS服务器出现的解析异常

故障发生背景&#xff1a; 租户反馈生产业务服务连接到中间件的时候&#xff0c;偶尔会有连接失败的情况&#xff0c;然后我们查看对应组件服务正常&#xff0c;手动请求组件服务也显示正常&#xff0c;让租户查看业务服务日志发现报错无法解析对应的域名&#xff0c;我们手动是…

【雕爷学编程】Arduino动手做(163)---大尺寸8x8LED方格屏模块7

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…

laravel6.x文档阅读手册

laravel中文文档6.x 目录 一、入门指南 安装 服务器要求 安装 Laravel Laravel 使用 Composer 来管理项目依赖。因此&#xff0c;在使用 Laravel 之前&#xff0c;请确保你的机器已经安装了 Composer。 通过 Laravel 安装器 首先&#xff0c;通过使用 Composer 安装 Lara…

Pod:Kubernetes里最核心的概念

为了解决这样多应用联合运行的问题&#xff0c;同时还要不破坏容器的隔离&#xff0c;就需要在容器外面再建立一个“收纳舱”&#xff0c;让多个容器既保持相对独立&#xff0c;又能够小范围共享网络、存储等资源&#xff0c;而且永远是“绑在一起”的状态。 Pod 的概念也就呼…

我在VScode学Java类与对象(Java的类与对象、this关键字)第一辑

我的个人博客主页&#xff1a;如果’真能转义1️⃣说1️⃣的博客主页 关于Java基本语法学习---->可以参考我的这篇博客&#xff1a;《我在VScode学Java》 关于我在VScode学Java&#xff08;Java方法method&#xff09; 类是描述了一组有相同特性&#xff08;属性&#xff09…

从实体按键看 Android 车载的自定义事件机制

在汽车数字化、智能化变革的进程中&#xff0c;越来越多的车机设计或部分、或全部地舍弃了实体按键&#xff0c;进而把车主操作的入口转移到了车机 UI 以及语音助手。 但统一、高效的零层级 UI 颇为困难&#xff0c;语音的准确率、覆盖率亦不够完善&#xff0c;那么在当下的阶段…

PCIe总线的链路训练

目录 概述 链路训练的目的 几个关键概念 Lane reveral &#xff1a; Polarity inversion&#xff1a; De-skew&#xff1a; link number&#xff1a; Lane number&#xff1a; Bit lock&#xff1a; Symbol lock&#xff1a; 几个特殊序列&#xff1a; TS1和TS2&am…