Kafka性能测试初探

相信大家对Kafka不会陌生,但首先还是要简单介绍一下。

Kafka是一种高性能的分布式消息系统,由LinkedIn公司开发,用于处理海量的实时数据流。它采用了发布/订阅模式,可以将数据流分发到多个消费者端,同时提供了高可靠性、高吞吐量和低延迟的特性。

Kafka的应用场景非常广泛,例如日志收集、事件流处理、实时监控等。在这些场景中,Kafka可以提供高可靠性和低延迟的数据传输,确保数据的稳定性和实时性。与此同时,Kafka还提供了丰富的API和管理工具,使得用户可以方便地配置和管理Kafka集群。

很多高性能方案都会用到Kafka,今天我来分享如何使用Kafka Client API进行Kafka生产者和消费者压测。

依赖

我用了Gradle创建的项目,依赖配置如下:

compile group: ‘org.apache.kafka’, name: ‘kafka-clients’, version: ‘3.4.0’

kafka服务端

我本地用了Kafka最新版本:kafka_2.12-3.4.0,这个版本可以不依赖zookeeper,非常方便,用来本地功能验证和测试我是十分推荐的。基本做到了开箱即用。

具体的流程可以自行搜索。

生产者压测Demo

在创建生产者时,会有不少的参数需要配置,这里建议使用默认的。或者使用待测试参数组合。下面是我自己的配置,常用的参数我都列了出来。具体参数含义,可以自行搜索,这方面资料还是很多的,下面直接进入压测用例环节。

package com.funtest.kafkaimport com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import com.funtester.utils.StringUtil
import groovy.util.logging.Log4j2
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer@Log4j2
class Produce extends SourceCode {static void main(String[] args) {Properties properties = new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); //所有分区副本都收到确认信息,才能确认写入properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384")properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name);KafkaProducer<String, String> producer = new KafkaProducer<>(properties);def topic = "testkafka"def test = {producer.send(new ProducerRecord<>(topic, StringUtil.getString(10)))}new FunQpsConcurrent(test,"Kafka测试").start()producer.close();}}

这里用到了动态QPS模型,最后的close()也可以不使用,毕竟main方法的代码结束了就真的结束了。

消费者

呼应生产者,消费者也有一堆需要配置的参数。这里先按下不表,有兴趣的可以自行学习。

Kafka消费者有两种订阅消息的方式,分别是订阅模式和分配模式。

订阅模式是指消费者订阅一个或多个主题,然后自动分配分区进行消费。这种模式下,Kafka会自动管理消费者与分区之间的关系,当有新的消费者加入或者退出消费组时,Kafka会自动重新分配分区,保证每个消费者都能够获取到消息。

而分配模式则是由消费者主动向Kafka请求分配指定的分区进行消费。这种模式下,消费者需要手动管理分区与消费者之间的关系,需要注意的是,当有新的消费者加入或者退出消费组时,需要手动重新分配分区。

订阅模式相对于分配模式来说更加简单易用,但是分配模式可以更加灵活地控制消费者与分区之间的关系。所以我选择了订阅模式。

package com.funtest.kafkaimport com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumerimport java.time.Durationclass Cunsumer extends SourceCode {static void main(String[] args) {KafkaConsumer<String, String> consumer;Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FunTester32");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"10000");properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");consumer = new KafkaConsumer<>(properties);String topic = "testkafka";
//        TopicPartition topicPartition = new TopicPartition(topic, 0);
//        List<TopicPartition> topics = Arrays.asList(topicPartition);
//        consumer.assign(topics);
//        consumer.seekToEnd(topics);
//        long current = consumer.position(topicPartition);
//        consumer.seek(topicPartition, current - 10);//手动设置偏移量consumer.subscribe([topic])//订阅模式,不能与assign混用while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}sleep(1.0)}def test = {consumer.poll(Duration.ofMillis(1000));}new FunQpsConcurrent(test,"Kafka消费").start()consumer.close()}
}

由于本地机器原因,需要在服务器上启动一个Kafka服务,用来测试不同参数组合情况下Kafka的性能表现。后续有机会再来分享。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/195643.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

弗洛伊德算法(C++)

目录 介绍&#xff1a; 代码&#xff1a; 结果&#xff1a; 介绍&#xff1a; 弗洛伊德算法&#xff08;Floyd algorithm&#xff09;也称为Floyd-Warshall算法&#xff0c;是一种用于求解所有节点对之间的最短路径的动态规划算法。它使用了一个二维数组来存储所有节点…

defer和async

如果两个属性浏览器都不兼容&#xff0c;推荐把<script>标签放到底部 一般情况下&#xff0c;浏览器在解析html源文件时&#xff0c;如果遇到外部的<script>标签&#xff0c;解析过程就会先暂停&#xff0c;这时会对script进行加载&#xff0c;执行两个过程&…

深入解析具名导入es6规范中的具名导入是在做解构吗

先说答案&#xff0c;不是 尽管es6的具名导入和语法非常相似 es6赋值解构 const obj {a: 1,f() {this.a}}const { a, f } objes6具名导入 //导出文件代码export let a 1export function f() {a}export default {a,f}//导入文件代码import { a, f } from ./tsVolution可以看出…

2023.11.19 hadoop之MapReduce

目录 1.简介 2.分布式计算框架-Map Reduce 3.mapreduce的步骤 4.MapReduce底层原理 map阶段 shuffle阶段 reduce阶段 1.简介 Mapreduce是一个分布式运算程序的编程框架&#xff0c;是用户开发“基于hadoop的数据分析应用”的核心框架&#xff1b; Mapreduce核心功能是…

Diffusion Models CLIP

Introduction to Diffusion Models 生成模型 主要指的是无监督学习中的生成模型&#xff0c;在无监督学习中的主要任务是让机器学习给定的样本&#xff0c;然后生成一些新的东西出来。比如&#xff1a;给机器看一些图片&#xff0c;能够生成一些新的图片出来&#xff0c;给机器…

pythorch的numel()函数计算模型大小与现存占用

本文解释简单给一个模型列子记录如何计算该模型参数量与模型显存占用情况&#xff0c;该文直接调用torchvision库的模型文件构建模型model&#xff0c;在使用parameters()函数遍历&#xff0c;并在遍历情况下使用numel()函数记录模型参数量与显存占用。 代码如下&#xff1a; …

大数据HCIE成神之路之数学(3)——概率论

概率论 1.1 概率论内容介绍1.1.1 概率论介绍1.1.2 实验介绍 1.2 概率论内容实现1.2.1 均值实现1.2.2 方差实现1.2.3 标准差实现1.2.4 协方差实现1.2.5 相关系数1.2.6 二项分布实现1.2.7 泊松分布实现1.2.8 正态分布1.2.9 指数分布1.2.10 中心极限定理的验证 1.1 概率论内容介绍…

Java-整合OSS

文章目录 前言一、OSS 简介二、OSS 的使用1. Bucket 的创建与文件上传2. 创建 RAM 与用户授权3. 图形化管理工具-ossbrowser 三、Java 整合 OSS1. 基本实现2. 客户端直传 前言 最近公司的技术负责人让我整合下 OSS 到项目中&#xff0c;所以花了一点时间研究了下 OSS&#xff…

【网络安全】——区块链安全和共识机制

区块链安全和共识机制 摘要&#xff1a;区块链技术作为一种分布式去中心化的技术&#xff0c;在无需第三方的情况下&#xff0c;使得未建立信任的交易双方可以达成交易。因此&#xff0c;区块链技术近年来也在金融&#xff0c;医疗&#xff0c;能源等多个行业得到了快速发展。然…

ElasticSearch学习篇6_ES实践与Lucene对比及原理分析技术分享小记

前言 QBM、MFS的试题检索、试题查重、公式转换映射等业务场景以及XOP题库广泛使用搜索中间件&#xff0c;业务场景有着数据量大、对内容搜索性能要求高等特点&#xff0c;其中XOP题库数据量更是接近1亿&#xff0c;对检索性能以及召回率要求高。目前QBM、MFS使用的搜索中间件是…

数据分析思维与模型:群组分析法

群组分析法&#xff0c;也称为群体分析法或集群分析法&#xff0c;是一种研究方法&#xff0c;用于分析和理解群体内的动态、行为模式、意见、决策过程等。这种方法在社会科学、心理学、市场研究、组织行为学等领域有广泛应用。它可以帮助研究人员或组织更好地理解特定群体的特…