202108151156 - kafka消费积压案例

news/2025/3/20 18:48:20/文章来源:https://www.cnblogs.com/route/p/18783731

0. 背景

上游厂家生产信令数据,我方消费kafka数据,过滤后插入HBase。
上游生产的信令数据分了4个主题,每个主题有若干分区,这4个主题的数据消费后都插入同一张HBase表。

问题:kafka消息积压达到百亿。
以下以topic1为例,有6个分区。

1. 查看消费滞后情况

kafka-consumer-groups.sh --zookeeper node1:2181 --describe --group group1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
group1 topic1 p1 763268411801 765604526970 2336115169 none
group1 topic1 p2 758609405216 760945694615 2336289399 none
group1 topic1 p3 792287818058 792288073608 255550 none
group1 topic1 p4 760978756871 760979013415 256544 none
group1 topic1 p5 726235588657 728571879016 2336290359 none
group1 topic1 p6 757874708350 760210948121 2336239771 none
  • 观察LOG-END-OFFSET,各分区都在不断增加,说明各分区都在正常地生产。
  • 观察CURRENT-OFFSET,除了p3、p4其他分区的CURRENT-OFFSET一直不变,说明已停止消费。
  • LAG在100W以内都正常,但是除了p3、p4 其他分区的LAG都很大,达到十亿级,而且还在不断地积压。

2. 分析原因及优化

将源代码反编译后,根据代码画出以下示意图

同一主题下各分区的消费流共享同一个HTable,每个HTable会创建一个HConnection

HConnectionManager.getConnection(conf);

2.1 集群的吞吐量统计

  • 生产者 主题topic1,共6个分区。每个分区生产消息1W3W/秒,生产消息520W/秒,每5分钟3000W上下,每天约100亿。
  • 消费者 每秒5~20W,每5分钟消费约3000W消息,每天约100亿
  • HBase 1000多W次put每5分钟,每天约40亿。

HBase吞吐量瓶颈,在6节点,万兆网卡情况下,每条记录put一次。一个HTable连接的put上限在3W次每秒。

2.2 优化

每个分区创建一个consumer,每个consumer创建一个HTable连接

根据图中思路优化代码后,再次查看LAG,无积压

3. KafkaStream相关API

官网API介绍

  /***  Create a list of message streams of type T for each topic, using the default decoder.*/public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);

This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.

这个 API 以迭代器为中心,由 KafkaStream 类实现。 每个 KafkaStream 代表来自一台或多台服务器上的一个或多个分区的消息流。 每个流都用于单线程处理,因此客户端可以在创建调用中提供所需流的数量。 因此一个流可能代表多个服务器分区的合并(对应于处理线程的数量),但每个分区只进入一个流。

The createMessageStreams call registers the consumer for the topic, which results in rebalancing the consumer/broker assignment. The API encourages creating many topic streams in a single call in order to minimize this rebalancing. The createMessageStreamsByFilter call (additionally) registers watchers to discover new topics that match its filter. Note that each stream that createMessageStreamsByFilter returns may iterate over messages from multiple topics (i.e., if multiple topics are allowed by the filter).

每次调用createMessageStreams 会为topic注册consumer,多次调用可能会导致rebalance consumer/broker分配。 API 鼓励在一次调用中创建多个主题流,以最大程度地减少这种重新平衡。 (另外)调用 createMessageStreamsByFilter 注册观察者以发现与其过滤器匹配的新主题。 请注意, createMessageStreamsByFilter 返回的每个流都可以迭代来自多个主题的消息(即,如果过滤器允许多个主题)。

kafkaStream如何消费消息?

// 创建 kafka stream流 的迭代器
ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();while (it.hasNext()) {// 读取消息byte[] message = (byte[])it.next().message();String s = new String(message);
}

4. 遗留问题

  • 采用put(list)优化, 各分区都消费,仍有滞后。

  • 如何更好地监控kafka / hbase?

    • kafka-eagle ?
    • HBase优化配置,提高吞吐量?
    • HBase吞吐量监控? jmx ?
// HBase 官网API
// BalancerRegionLoad.getWriteRequstCount()RegionMetrics.getWriteRequstCount()
  • kafka数据存储策略,保留一周?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/902140.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql 多表怎么连接的

前言 简单描述一下多表怎么连接的。 正文 首先,我们得抛开我们一些自以为是的想法。 我想过这个问题,就是为什么我们背乘法口诀的时候,我们总是背: 22 = 4, 99=81 这样背下去,似乎这是口诀。然而这是缓存,不是计算,既然不是计算那么就不是逻辑学。 我们理所当然的想9*9…

202108120808 - 类加载器及双亲委派机制

Bootstrap ClassLoader 这是加载器中的大 Boss,任何类的加载行为,都要经它过问。它的作用是加载核心类库,也就是 rt.jar、resources.jar、charsets.jar 等。当然这些 jar 包的路径是可以指定的,-Xbootclasspath 参数可以完成指定操作。 这个加载器是 C++ 编写的,随着 JVM …

keil仿真时导出数据操作

keil仿真时导出数据操作 save D:\savedata.txt 0x20001013,0x20001035

spring-boot-starter-validation

官方提供的注解 spring-boot-starter-validation 是 Spring Boot 提供的一个 starter,是一个用于验证 Java Bean 的标准,它提供了一套注解和相应的运行时 API 来定义和执行校验规则。 具体来说,当你在项目中引入 spring-boot-starter-validation 后,你可以使用一系列预定义…

省选算法复习

省选算法复习 1. 线段树优化建图 当我们需要向区间内所有点连边或者从区间中所有点连到某个点的时候,便可以使用线段树来优化,如果需要从区间每一个点连到另一个区间每一个点的话,加一个虚点就好了。 这不是一个很困难的技巧,关键在于要建模。 P5471 [NOI2019] 弹跳 - 洛谷…

fastadmin订单父子表管理端

fastadmin后台父子表使用方法 发布于 2021-01-22 12:48:10fastadmin后台的所有表格都是支持父子表配置的,只需要简单修改一下对应的JS即可,下面直接进入主题。示例是我的全国省市行政区划表,是从国家统计局网站采集下来的,共五级行政数据,非常适合用来做父子表,按照级别一…

Rudolf and k Bridges

Rudolf and k Bridges 题目 大致题意上图为俯视图 有一个\(nXm\)的网格,下标从\(1-n\) 以及从 \(1-m\),\((i, j)\) 的值就是这个这垂直一格水的深度 现在要安装支架,有几个信息:\((i, 1)\) 和 \((i, m)\) 处必须要安装相邻支架的距离不能超过 \(d\), 相邻距离为 \(abs(j - …

背离Divergence Trading ,贪小便宜

趋势交易(trend trading)和背离交易(divergence trading),代表了两种不同的交易策略。做背离交易相当于赌市场短期失效,承认你比市场聪明,虽然能赚小钱,但往往是亏大钱的根源。 贪小便宜爱背离,贪小便宜(gain small advantages)不爱止损(cut losses),所以背离和不止损…

在鸿蒙NEXT开发中实现一个语音识别组件

鸿蒙系统发布以后都不知道叫它5.0版本还是NEXT版本了,哈哈,反正是最新版本就对了。对于语音转换文字,鸿蒙系统提供了离线语音识别模型speechRecognizer,语种目前支持中文,识别效果非常不错。今天要分享的是使用speechRecognizer实现一个语音识别组件。要实现语音识别,首先…

激光代加工产品一览-代加工-外协加工-委外加工-激光代加工-河南郑州亚克力切割雕刻代加工-芯晨微纳(河南)

关键词:河南省郑州市、激光代加工、激光打标、激光切割、激光雕刻、激光打孔、激光毛化、激光分切 简介:芯晨微纳(河南)光电科技有限公司,专注于激光微纳代加工、设备/耗材代理销售、设备租赁、技术推广服务,可处理材料类型及应用范围十分广泛,欢迎来电咨询(韩经理1823…

20242801 2024-2025-2 《网络攻防实践》第4次作业

20242801 2024-2025-2 《网络攻防实践》第4次作业 一、实验内容 ​ 在虚拟机环境中完成TCP/IP协议栈重点协议的攻击实验,学习ARP缓存欺骗攻击、ICMP重定向攻击、SYN Flood攻击、TCP RST攻击、TCP会话劫持攻击的原理和相关知识,并动手进行实践。 二、实验过程 (一)ARP缓存欺…

缓存监控治理在游戏业务的实践和探索

通过对 Redis 和 Caffeine 的缓存监控快速发现和定位问题降低故障的影响面。作者:来自 vivo 互联网服务器团队- Wang Zhi 通过对 Redis 和 Caffeine 的缓存监控快速发现和定位问题降低故障的影响面。 一、缓存监控的背景游戏业务中存在大量的高频请求尤其是对热门游戏而言,而…