Kafka常见生产问题详解

目录

生产环境常见问题分析

消息零丢失方案

1、生产者发消息到Broker不丢失

2、Broker端保存消息不丢失

3、消费者端防止异步处理丢失消息

消息积压如何处理

如何保证消息顺序

​问题一、如何保证Producer发到Partition上的消息是有序的

问题二:Partition中的消息有序后,如何保证Consumer的消费顺序是有序的


生产环境常见问题分析

消息零丢失方案

1、生产者发消息到Broker不丢失

​ Kafka的消息生产者Producer,支持定制一个参数,ProducerConfig.ACKS_CONFIG。

acks配置为0 : 生产者只负责往Broker端发消息,而不关注Broker的响应。也就是说不关心Broker端有没有收到消息。性能高,但是数据会有丢消息的可能。

acks配置为1:当Broker端的Leader Partition接收到消息后,只完成本地日志文件的写入,然后就给生产者答复。其他Partiton异步拉取Leader Partiton的消息文件。这种方式如果其他Partiton拉取消息失败,也有可能丢消息。

acks配置为-1或者all:Broker端会完整所有Partition的本地日志写入后,才会给生产者答复。数据安全性最高,但是性能显然是最低的。

​       对于KafkaProducer,只要将acks设置成1或-1,那么Producer发送消息后都可以拿到Broker的反馈RecordMetadata,里面包含了消息在Broker端的partition,offset等信息。通过这这些信息可以判断消息是否发送成功。如果没有发送成功,Producer就可以根据情况选择重新进行发送。

2、Broker端保存消息不丢失

        首先,合理优化刷盘频率,防止服务异常崩溃造成消息未刷盘。Kafka的消息都是先写入操作系统的PageCache缓存,然后再刷盘写入到硬盘。PageCache缓存中的消息是断电即丢失的。如果消息只在PageCache中,而没有写入硬盘,此时如果服务异常崩溃,这些未写入硬盘的消息就会丢失。Kafka并不支持写一条消息就刷一次盘的同步刷盘机制,只能通过调整刷盘的执行频率,提升消息安全。主要涉及几个参数:

flush.ms : 多长时间进行一次强制刷盘。

log.flush.interval.messages:表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。

 log.flush.interval.ms:当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。

​       然后,配置多备份因子,防止单点消息丢失。在Kafka中,可以给Topic配置更大的备份因子replication-factors。配置了备份因子后,Kafka会给每个Partition分配多个备份Partition。这些Partiton会尽量平均的分配到多个Broker上。并且,在这些Partiton中,会选举产生Leader Partition和Follower Partition。这样,当Leader Partition发生故障时,其他Follower Partition上还有消息的备份。就可以重新选举产生Leader Partition,继续提供服务。

3、消费者端防止异步处理丢失消息

       消费者端由于有消息重试机制,正常情况下是不会丢消息的。每次消费者处理一批消息,需要在处理完后给Broker应答,提交当前消息的Offset。Broker接到应答后,会推进本地日志的Offset记录。如果Broker没有接到应答,那么Broker会重新向同一个消费者组的消费者实例推送消息,最终保证消息不丢失。这时,消费者端采用手动提交Offset的方式,相比自动提交会更容易控制提交Offset的时机。

       消费者端唯一需要注意的是,不要异步处理业务逻辑。因为如果业务逻辑异步进行,而消费者已经同步提交了Offset,那么如果业务逻辑执行过程中出现了异常,失败了,那么Broker端已经接收到了消费者的应答,后续就不会再重新推送消息,这样就造成了业务层面的消息丢失。


消息积压如何处理

       

       通常情况下,Kafka本身是能够存储海量消息的,他的消息积压能力是很强的。但是,如果发现消息积压问题已经影响了业务处理进度,这时就需要进行一定的优化。

1、如果业务运行正常,只是因为消费者处理消息过慢,造成消息加压。那么可以增加Topic的Partition分区数,将消息拆分到更到的Partition。然后增加消费者个数,最多让消费者个数=Partition分区数,让一个Consumer负责一个分区,将消费进度提升到最大。

​        另外,在发送消息时,还是要尽量保证消息在各个Partition中的分布比较均匀。比如,在原有Topic下,可以调整Producer的分区策略,让Producer将后续的消息更多的发送到新增的Partition里,这样可以让各个Partition上的消息能够趋于平衡。如果你觉得这样太麻烦,那就新增一个Topic,配置更多的Partition以及对应的消费者实例。然后启动一批Consumer,将消息从旧的Topic搬运到新的Topic。这些Consumer不处理业务逻辑,只是做消息搬运,所以他们的性能是很高的。这样就能让新的Topic下的各个Partition数量趋于平衡。

2、如果是消费者的业务问题导致消息阻塞了,从而积压大量消息,并影响了系统正常运行。比如消费者序列化失败,或者业务处理全部异常。这时可以采用一种降级的方案,先启动一个Consumer将Topic下的消息先转发到其他队列中,然后再慢慢分析新队列里的消息处理问题。类似于死信队列的处理方式。


如何保证消息顺序

  

问题要分两个方面来考虑:

 1、因为kafka中各个Partition的消息是并发处理的,所以要保证消息顺序,对于Producer,要保证将一组有序的消息发到同一个Partition里。因为Partition的数据是顺序写的,所以自然就能保证消息是按顺序保存的。

​ 2、对于消费者,需要能够按照1,2,3的顺序处理消息。

问题一、如何保证Producer发到Partition上的消息是有序的

       首先,要保证Producer将消息都发送到一个Partition上,其实有两种方法。一种简答粗暴的想法就是给Topic只配一个Partition,没有其他Partition可选了,自然所有消息都到同一个Partition上了。表示从创建Topic时就放弃了多Partition带来的吞吐量便利,是不现实的。另一种是Topic依然配置多个Partition,但是通过定制Producer的Partition分区器,将消息分配到同一个Partition上。这样对于某一些要求局部有序的场景是至少是可行的。例如在电商场景,我可能只是需要保证同一个订单相关的多条消息有序,但是并不要求所有消息有序。这样就可以通过自定义分区路由器,将订单相同的多条消息发送到同一个Partition。

       但是Producer都将消息往同一个Partition,也不能保证消息顺序。因为消息可能发送失败。比如Producer依次发送1,2,3,三条消息。如果消息1因为网络原因发送失败了,2 和3 发送成功了,这样消息顺序就乱了。如果把producer的acks参数设置成1或-1,这样每次发送消息后,可以根据Broker的反馈判断消息是否成功。思路是可行的,但是重试的次数,发送消息的数量等都是需要考虑的问题。

回顾一下之前对于生产者消息幂等性的设计:

  

         Kafka的这个sequenceNumber是单调递增的。如果只是为了消息幂等性考虑,那么只要保证sequenceNumber唯一就行了,为什么要设计成单调递增呢?其实Kafka这样设计的原因就是可以通过sequenceNumber来判断消息的顺序。也就是说,在Producer发送消息之前就可以通过sequenceNumber定制好消息的顺序,然后Broker端就可以按照顺序来保存消息。与此同时, SequenceNumber单调递增的特性不光保证了消息是有顺序的,同时还保证了每一条消息不会丢失。一旦Kafka发现Producer传过来的SequenceNumber出现了跨越,那么就意味着中间有可能消息出现了丢失,就会往Producer抛出一个OutOfOrderSequenceException异常。

在生产者的配置类ProducerConfig中很快能找到很多和消息顺序ordering的描述:

public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."+ " Note that if this configuration is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"+ " message reordering after a failed send due to retries (i.e., if retries are enabled); "+ " if retries are disabled or if <code>enable.idempotence</code> is set to true, ordering will be preserved."+ " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. ";

问题二:Partition中的消息有序后,如何保证Consumer的消费顺序是有序的
    public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of data the server should return for a fetch request. " +"Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than " +"this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. " +"The maximum record batch size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +"<code>max.message.bytes</code> (topic config). Note that the consumer performs multiple fetches in parallel.";public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;

        这里明确提到Consumer其实是每次并行的拉取多个Batch批次的消息进行处理的。也就是说Consumer拉取过来的多批消息并不是串行消费的。所以在Kafka提供的客户端Consumer中,是没有办法直接保证消费的消息顺序。其实这也比较好理解,因为Kafka设计的重点是高吞吐量,所以他的设计是让Consumer尽最大的能力去消费消息。而只要对消费的顺序做处理,就必然会影响Consumer拉取消息的性能。

​        所以这时候,我们能做的就是在Consumer的处理逻辑中,将消息进行排序。比如将消息按照业务独立性收集到一个集合中,然后在集合中对消息进行排序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/450186.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Echarts+Vue 首页大屏静态示例Demo 第三版

效果图: 源码: <template><div class="content bg" style="height: 100vh;overflow-y: auto" :class="{ fullscreen-container: isFullScreen }"><div class="reaDiv" style="height: 10vh"><div…

阿里云AI通义千问出bug,今天修复了,一切都是莫名其妙,国产AI又可以了?

怎么隔一天就好了&#xff1f; 引言我的处理感想再次提问AI代码结尾 引言 前天我的阿里云AI 通义千问 不是抽风了嘛 详情见 阿里云AI通义千问出bug,解决不了直接弃,开始对国产AI由支持变失望 就是我的一些对话莫名消失了 我的处理 我在这里进行了反馈 但是没有回应 我以为…

【leetcode题解C++】98.验证二叉搜索树 and 701.二叉搜索树中的插入操作

98. 验证二叉搜索树 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&#xff1a; 节点的左子树只包含 小于 当前节点的数。节点的右子树只包含 大于 当前节点的数。所有左子树和右子树自身必须也是二叉搜索树。 示例…

Jupyter Notebook中的%matplotlib inline详解

Jupyter Notebook中的%matplotlib inline详解 &#x1f335;文章目录&#x1f335; &#x1f333;引言&#x1f333;&#x1f333;什么是魔术命令&#x1f333;&#x1f333;%matplotlib inline详解&#x1f333;(&#x1f448;直入主题请点击)&#x1f333;小结&#x1f333;&…

如何把vue项目打包成桌面程序 electron-builder

引入 我们想要把我们写的vue项目,打包成桌面程序&#xff0c;我们需要使用electron-builder这个库 如何使用 首先添加打包工具 vue add electron-builder 选择最新版本 下载完毕 我们可以看到我们的package.json中多了几行 electron:build&#xff1a;打包我们的可执行程序 e…

[python]基于opencv实现的车道线检测

【检测原理】 一、首先进行canny边缘检测&#xff0c;为获取车道线边缘做准备 二、进行ROI提取获取确切的车道线边缘&#xff08;红色线内部&#xff09; 三、利用概率霍夫变换获取直线&#xff0c;并将斜率正数和复数的线段给分割开来 四、离群值过滤&#xff0c;剔除斜率…

重写Sylar基于协程的服务器(5、IO协程调度模块的设计)

重写Sylar基于协程的服务器&#xff08;5、IO协程调度模块的设计&#xff09; 重写Sylar基于协程的服务器系列&#xff1a; 重写Sylar基于协程的服务器&#xff08;0、搭建开发环境以及项目框架 || 下载编译简化版Sylar&#xff09; 重写Sylar基于协程的服务器&#xff08;1、…

Unity | YooAssetV2.1.0 + HybridCLR热更新

目录 一、项目更改 二、使用YooAsset热更 1.资源配置 2.资源构建 3.将两个文件夹下的资源上传CDN服务器 4.修改代码 5.运行效果 本文记录利用YooAssetHybridCLR来进行资源和dll的更新。YooAsset使用的是新版V2.1.0。相比于旧版&#xff0c;dll(原生文件)和资源要建两个p…

前端JavaScript篇之map和Object的区别、map和weakMap的区别

目录 map和Object的区别map和weakMap的区别 map和Object的区别 Object是JavaScript的内置对象&#xff0c;用于存储键值对。Object的键必须是字符串或符号&#xff0c;值可以是任意类型。Map是ES6引入的新数据结构&#xff0c;用于存储键值对。Map的键可以是任意类型&#xff…

山东淄博刑侦大队利用无人机抓获盗窃团伙

山东淄博刑侦大队利用无人机抓获盗窃团伙 近期&#xff0c;山东淄博临淄区发生多起盗窃案件。通过视频追踪和调查访问&#xff0c;推断临淄区某村可能为嫌疑人藏匿地点。刑侦大队无人机应急小组迅速到达现场&#xff0c;经无人机高空侦查&#xff0c;发现并锁定了嫌疑人的藏匿…

AES算法:数据传输的安全保障

在当今数字化时代&#xff0c;数据安全成为了一个非常重要的问题。随着互联网的普及和信息技术的发展&#xff0c;我们需要一种可靠的加密算法来保护我们的敏感数据。Advanced Encryption Standard&#xff08;AES&#xff09;算法应运而生。本文将介绍AES算法的优缺点、解决了…

决策树的相关知识点

&#x1f4d5;参考&#xff1a;ysu老师课件西瓜书 1.决策树的基本概念 【决策树】&#xff1a;决策树是一种描述对样本数据进行分类的树形结构模型&#xff0c;由节点和有向边组成。其中每个内部节点表示一个属性上的判断&#xff0c;每个分支代表一个判断结果的输出&#xff…