docker-compose部署kafka

docker-compose.yml配置

version: "3"
services:kafka:image: 'bitnami/kafka:latest'ports:- '7050:7050'environment:- KAFKA_ENABLE_KRAFT=yes- KAFKA_CFG_PROCESS_ROLES=broker,controller- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_LISTENERS=PLAINTEXT://:7050,CONTROLLER://:7051- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://183.56.203.157:7050- KAFKA_BROKER_ID=1- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@0.0.0.0:7051- ALLOW_PLAINTEXT_LISTENER=yes

kafka UI界面

docker run -d --name kafka-map -p 8049:8080 -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin dushixiang/kafka-map:latest

docker run -p 8080:8080 -e KAFKA_BROKERS=host.docker.internal:9092 docker.redpanda.com/vectorized/console:master-173596f

UI界面总览

https://towardsdatascience.com/overview-of-ui-tools-for-monitoring-and-management-of-apache-kafka-clusters-8c383f897e80

kafka学习

生产者
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.Test
import java.util.*/*** @Description :* @Author  xiaomh* @date  2022/8/5 15:58*/
class CustomProducer {//异步发送@Testfun customProducer() {//配置val properties = Properties()//链接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 5) {//黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)kafkaProducer.send(ProducerRecord("xiao1", "customProducer,count::$i"))}//关闭资源kafkaProducer.close()}//同步发送@Testfun customProducerSync() {//配置val properties = Properties()//链接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 5) {//黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)kafkaProducer.send(ProducerRecord("xiao1", "customProducerSync,count::$i")).get()}//关闭资源kafkaProducer.close()}//回调异步发送@Testfun customProducerCallback() {//配置val properties = Properties()//链接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 500) {//黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)kafkaProducer.send(ProducerRecord("xiao1", "customProducerCallback,count::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})//测试分区策略Thread.sleep(1)}//关闭资源kafkaProducer.close()}//回调异步发送+使用分区@Testfun customProducerCallbackPartitions1() {//配置val properties = Properties()//链接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 5) {//1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值//2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)//key可以作为producer数据名,让consumer通过key找到kafkaProducer.send(ProducerRecord("xiao1", 1, "", "customProducerCallbackPartitions,count::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})}//关闭资源kafkaProducer.close()}//回调异步发送+自定义分区@Testfun customProducerCallbackPartitions2() {//配置val properties = Properties()//链接kafka,集群链接使用"183.56.203.157:7050,183.56.203.157:7051"properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//关联自定义分区器properties[ProducerConfig.PARTITIONER_CLASS_CONFIG] ="com.umh.medicalbookingplatform.b2bapi.config.MyPartitioner"//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 50) {//1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值//2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)//key可以作为producer数据名,让consumer通过key找到kafkaProducer.send(ProducerRecord("xiao1", "felix is strong,count::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})}//关闭资源kafkaProducer.close()}//自定义配置缓冲区、批次、等待时间、压缩@Testfun customProducerParameters() {//配置val properties = Properties()properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//缓冲区大小。默认32,64=33554432x2properties[ProducerConfig.BUFFER_MEMORY_CONFIG] = 33554432//批次大小。默认16kproperties[ProducerConfig.BATCH_SIZE_CONFIG] = 16384//等待时间。默认0properties[ProducerConfig.LINGER_MS_CONFIG] = 1//压缩.压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties[ProducerConfig.COMPRESSION_TYPE_CONFIG] = "snappy"//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)for (i in 0 until 10) {//1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值//2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)//key可以作为producer数据名,让consumer通过key找到kafkaProducer.send(ProducerRecord("xiao1", "customProducerParameters::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})}//关闭资源kafkaProducer.close()}//ack、重试次数配置@Testfun customProducerAck() {//配置val properties = Properties()properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//ackproperties[ProducerConfig.ACKS_CONFIG] = "1"//重试次数properties[ProducerConfig.RETRIES_CONFIG] = 30//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)for (i in 0 until 10) {//1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值//2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)//key可以作为producer数据名,让consumer通过key找到kafkaProducer.send(ProducerRecord("xiao1", "customProducerAck::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})}//关闭资源kafkaProducer.close()}//事物@Testfun customProducerTransaction() {//配置val properties = Properties()properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//指定事务id,一定要指定!!properties[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = UUID.randomUUID().toString()//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//开启事务kafkaProducer.initTransactions()kafkaProducer.beginTransaction()try {for (i in 0 until 10) {//1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值//2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)//key可以作为producer数据名,让consumer通过key找到kafkaProducer.send(ProducerRecord("xiao1", "customProducerTransaction::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})}
//            val test: Int = 1 / 0kafkaProducer.commitTransaction()} catch (e: Exception) {kafkaProducer.abortTransaction()} finally {//关闭资源kafkaProducer.close()}}}

消费者

1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个 partition的数据。

2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用 多个分区分配策略。

3、每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的过长(max.poll.interval.ms5分钟),也会触发再 平衡

package com.umh.medicalbookingplatform.apiimport com.alibaba.fastjson.parser.ParserConfig
import com.fasterxml.jackson.databind.MapperFeature
import com.umh.medicalbookingplatform.core.audit.SpringSecurityAuditorAware
import com.umh.medicalbookingplatform.core.config.CoreConfiguration
import com.umh.medicalbookingplatform.core.jsonview.JsonViews
import com.umh.medicalbookingplatform.core.properties.ApplicationProperties
import com.umh.medicalbookingplatform.core.utils.ApplicationJsonObjectMapper
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder
import org.keycloak.OAuth2Constants
import org.keycloak.admin.client.Keycloak
import org.keycloak.admin.client.KeycloakBuilder
import io.swagger.v3.oas.models.Components
import io.swagger.v3.oas.models.OpenAPI
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.boot.web.servlet.ServletComponentScan
import org.springframework.cache.annotation.EnableCaching
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Import
import org.springframework.data.domain.AuditorAware
import org.springframework.data.jpa.repository.config.EnableJpaAuditing
import org.springframework.http.MediaType
import org.springframework.http.converter.HttpMessageConverter
import org.springframework.http.converter.ResourceHttpMessageConverter
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer
import java.security.Security
import java.util.*
import io.swagger.v3.oas.models.info.Info
import io.swagger.v3.oas.models.info.License
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.keycloak.adapters.KeycloakConfigResolver
import org.keycloak.adapters.springboot.KeycloakSpringBootConfigResolver
import org.keycloak.adapters.springboot.KeycloakSpringBootProperties
import org.springframework.http.converter.StringHttpMessageConverter
import java.time.Duration
import java.util.concurrent.TimeUnit@EnableJpaAuditing
@EnableCaching
@EnableScheduling
@SpringBootApplication
@Import(CoreConfiguration::class)
@ServletComponentScan("com.umh.medicalbookingplatform")
open class ApiApplication : WebMvcConfigurer {@Autowiredprivate lateinit var appProperties: ApplicationProperties@Autowiredprivate lateinit var keycloakSpringBootProperties: KeycloakSpringBootProperties@Beanfun keycloakConfigResolver(): KeycloakConfigResolver {return KeycloakSpringBootConfigResolver()}@Beanfun fastJson(){ParserConfig.getGlobalInstance().isAutoTypeSupport = true}@Beanfun customConsumer() {//配置val properties = Properties()//连接properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//反序列化(注意写法:生产者是序列化,消费者是反序列化)properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.nameproperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name//配置消费者组id(就算消费者组只有一个消费者也需要)//当消费者组ID相同时,表示他们在同一个消费者组//当有三个分区,而消费者组里又有三个消费者时,消费者会各自自动选取一个分区进行消费properties[ConsumerConfig.GROUP_ID_CONFIG] = "test"//1.创建一个消费者val kafkaConsumer = KafkaConsumer<String, String>(properties)//2.定义主题 xiao1val topics = mutableListOf<String>()topics.add("xiao1")kafkaConsumer.subscribe(topics)//3.消费数据while (true) {val consumerRecord: ConsumerRecords<String, String> = kafkaConsumer.poll(Duration.ofSeconds(1))for (msg in consumerRecord) {println("consumer,msg:::$msg")}}}//    @Beanfun customConsumerPartition() {//配置val properties = Properties()//连接properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//反序列化(注意写法:生产者是序列化,消费者是反序列化)properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.nameproperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name//配置消费者组id(就算消费者组只有一个消费者也需要)//当消费者组ID相同时,表示他们在同一个消费者组properties[ConsumerConfig.GROUP_ID_CONFIG] = UUID.randomUUID().toString()//1.创建一个消费者val kafkaConsumer = KafkaConsumer<String, String>(properties)//2.定义主题对应的分区val topicPartition = mutableListOf<TopicPartition>()topicPartition.add(TopicPartition("xiao1", 1))kafkaConsumer.assign(topicPartition)//3.消费数据while (true) {val consumerRecord: ConsumerRecords<String, String> = kafkaConsumer.poll(Duration.ofSeconds(1))for (msg in consumerRecord) {println("msg:::$msg")}}}@Bean(name = ["keycloakGlobalCmsApi"])fun keycloakGlobalCmsApiInstance(): Keycloak {return KeycloakBuilder.builder().serverUrl(appProperties.keycloakAuthServerUrl)//https://keycloak.umhgp.com/auth.realm(appProperties.keycloakGlobalCmsRealm)//global_cms.clientId(appProperties.keycloakGlobalCmsClient)//global-cms.username(appProperties.keycloakApiUsername)//medical-booking-platform-system-uat.password(appProperties.keycloakApiPassword)//Kas7aAnC76eGVHv5.grantType(OAuth2Constants.PASSWORD).resteasyClient(ResteasyClientBuilder().connectTimeout(10, TimeUnit.SECONDS).readTimeout(10, TimeUnit.SECONDS).connectionPoolSize(100).build()).build()}@Bean(name = ["keycloakGlobalProfileApi"])fun keycloakGlobalProfileApiInstance(): Keycloak {return KeycloakBuilder.builder().serverUrl(appProperties.keycloakAuthServerUrl).realm(appProperties.keycloakGlobalProfileRealm).clientId(appProperties.keycloakGlobalProfileClient).username(appProperties.keycloakApiUsername).password(appProperties.keycloakApiPassword).grantType(OAuth2Constants.PASSWORD).resteasyClient(ResteasyClientBuilder().connectTimeout(10, TimeUnit.SECONDS).readTimeout(10, TimeUnit.SECONDS).connectionPoolSize(100).build()).build()}@Bean(name = ["keycloakBookingSystemApi"])fun keycloakBookingSystemApiInstance(): Keycloak {return KeycloakBuilder.builder().serverUrl(appProperties.keycloakAuthServerUrl).realm(appProperties.keycloakBookingSystemRealm).clientId(appProperties.keycloakBookingSystemClient).username(appProperties.keycloakApiUsername).password(appProperties.keycloakApiPassword).grantType(OAuth2Constants.PASSWORD).resteasyClient(ResteasyClientBuilder().connectTimeout(10, TimeUnit.SECONDS).readTimeout(10, TimeUnit.SECONDS).connectionPoolSize(100).build()).build()}@Bean(name = ["keycloakUmhBookingSystemApi"])fun keycloakBookingSystemUmhApiInstance(): Keycloak {return KeycloakBuilder.builder().serverUrl(appProperties.keycloakAuthServerUrl).realm(appProperties.keycloakUmhBookingSystemRealm).clientId(appProperties.keycloakUmhBookingSystemClient).username(appProperties.keycloakApiUsername).password(appProperties.keycloakApiPassword).grantType(OAuth2Constants.PASSWORD).resteasyClient(ResteasyClientBuilder().connectTimeout(10, TimeUnit.SECONDS).readTimeout(10, TimeUnit.SECONDS).connectionPoolSize(100).build()).build()}@Beaninternal fun auditorProvider(): AuditorAware<UUID> {return SpringSecurityAuditorAware()}@Beanfun customOpenAPI(): OpenAPI? {return OpenAPI().components(Components()).info(Info().title("medical-booking-platform").version("1.5.8").license(License().name("Apache 2.0").url("http://springdoc.org")))}override fun configureMessageConverters(converters: MutableList<HttpMessageConverter<*>>) {
//        ActuatorMediaTypes()val supportedMediaTypes = ArrayList<MediaType>()supportedMediaTypes.add(MediaType.APPLICATION_JSON)supportedMediaTypes.add(MediaType.valueOf("application/vnd.spring-boot.actuator.v3+json"))supportedMediaTypes.add(MediaType.TEXT_PLAIN)val converter = MappingJackson2HttpMessageConverter()val objectMapper = ApplicationJsonObjectMapper()objectMapper.setConfig(objectMapper.serializationConfig.withView(JsonViews.Admin::class.java))objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, true)converter.objectMapper = objectMapperconverter.setPrettyPrint(true)converter.supportedMediaTypes = supportedMediaTypesconverters.add(0, StringHttpMessageConverter())converters.add(1, converter)converters.add(ResourceHttpMessageConverter())}}fun main(args: Array<String>) {Security.setProperty("crypto.policy", "unlimited")runApplication<ApiApplication>(*args)
}

range(范围)

Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策 略。

消费者分区操作:7分区2个消费者时

消费者1:消费分区0123

消费者2:消费分区456


在同一个消费者组,三消费者的情况下,如果其中一个宕机,45秒后会把消费者0需要处理的数据整个搬到消费者1或者消费者2.

结果:Consumer1=01234 或者 Consumer2=01256

随后如果再传输数据,消费者组会根据当前的消费者重新组织分配

Consumer0宕机45秒后再次传数据结果:Consumer1=0123 Consumer2=456

RoundRobin(轮询)

RoundRobin 针对集群中所有Topic而言。 RoundRobin 轮询分区策略,是把所有的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。

策略分配的修改

    @Beanfun customConsumer() {//配置val properties = Properties()//连接properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//反序列化(注意写法:生产者是序列化,消费者是反序列化)properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.nameproperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name//配置消费者组id(就算消费者组只有一个消费者也需要)//当消费者组ID相同时,表示他们在同一个消费者组//当有三个分区,而消费者组里又有三个消费者时,消费者会各自自动选取一个分区进行消费properties[ConsumerConfig.GROUP_ID_CONFIG] = "test"//设置分区分配策略properties[ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG] = "org.apache.kafka.clients.consumer.RoundRobinAssignor"//1.创建一个消费者val kafkaConsumer = KafkaConsumer<String, String>(properties)//2.定义主题 xiao1val topics = mutableListOf<String>()topics.add("xiao1")kafkaConsumer.subscribe(topics)//3.消费数据while (true) {val consumerRecord: ConsumerRecords<String, String> = kafkaConsumer.poll(Duration.ofSeconds(1))for (msg in consumerRecord) {println("consumer,msg:::$msg")}}}

注意:06为一组给到一个消费者,3为一组给到另外一个消费者。45秒后重新发送数据,consumer2:0246,consumer3:135

Sticky (黏性)

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 2、5、3 号分区数据。

2 号消费者:消费到 4、6 号分区数据。

0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别 由 1 号消费者或者 2 号消费者消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需 要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 2、3、5 号分区数据。

2 号消费者:消费到 0、1、4、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

随机+均匀

宕机后分配的消费者和45秒后分配消费者一样

宕机(3消费者变2消费者):1403,235

45秒后2消费者:1403,235

本文转自 https://blog.csdn.net/weixin_52925162/article/details/126280062?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522170100111416800225544545%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=170100111416800225544545&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allfirst_rank_ecpm_v1~rank_v31_ecpm-8-126280062-null-null.142v96pc_search_result_base9&utm_term=keycloak%20docker-compose&spm=1018.2226.3001.4187,如有侵权,请联系删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/298811.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

usb设备驱动程序(一)

代码&#xff1a; #include <linux/atomic.h> #include <linux/kernel.h> #include <linux/list.h> #include <linux/module.h> #include <linux/slab.h> #include <linux/usb.h> #include <linux/videodev2.h> #include <linux…

巅峰画师Midjourney:新时代的独角兽

介绍 AI绘画领域中&#xff0c;Midjourney处于绝对地位&#xff0c;并且一年时间就登顶。 Midjourney是一家独立的AI研究实验室,探索新的思维媒介,拓展人类的想象力。 它由一个小型的自筹资金团队组成,专注于设计、人类基础设施和AI。 在AI绘画领域,Midjourney取得了非常突出…

2023年国赛高教杯数学建模E题黄河水沙监测数据分析解题全过程文档及程序

2023年国赛高教杯数学建模 E题 黄河水沙监测数据分析 原题再现 黄河是中华民族的母亲河。研究黄河水沙通量的变化规律对沿黄流域的环境治理、气候变化和人民生活的影响&#xff0c;以及对优化黄河流域水资源分配、协调人地关系、调水调沙、防洪减灾等方面都具有重要的理论指导…

PyTorch深度学习实战(27)——变分自编码器(Variational Autoencoder, VAE)

PyTorch深度学习实战&#xff08;27&#xff09;——变分自编码器 0. 前言1. 变分自编码器1.1 自编码器的局限性1.2 VAE 工作原理1.3 VAE 构建策略1.4 KL 散度1.5 重参数化技巧 2. 构建 VAE小结系列链接 0. 前言 变分自编码器 (Variational Autoencoder, VAE) 是一种生成模型&…

测试C#使用AForge从摄像头获取图片

百度“C# 摄像头”关键词&#xff0c;从搜索结果来看&#xff0c;使用OpenCV、AForge、window动态链接库获取摄像头数据的居多&#xff0c;本文学习基于Aforge.net连接摄像头并从摄像头获取图片的基本方法。   AForge相关包&#xff08;尤其是相关的控件&#xff09;主要针对…

【Mathematical Model】Python拟合一元一/二次方程(线性回归)

Python中可以使用多种库进行拟合方程&#xff0c;其中最常用的是NumPy和SciPy。NumPy是一个用于处理数组和矩阵的库&#xff0c;而SciPy则提供了大量的科学计算函数&#xff0c;包括拟合算法。 1 一元一次方程拟合 需要注意的是我们这里的方程需要我们自己定义好&#xff0c;然…

OpenCV-Python(9):图像基础操作

目录 学习目标 获取图像像素并修改像素值 获取图像属性 图像ROI 拆分及合并图像通道 图像边缘扩充 学习目标 获取像素值并修改获取图像的属性(信息)图像的ROI获取图像通道拆分及合并图像扩边 获取图像像素并修改像素值 几乎所有这些操作与Numpy 的关系要比与OpenCV 的…

Unity Shader Early-Z技术

Unity Shader Early-Z技术 Early-Z技术Unity渲染顺序总结Alpha Test&#xff08;Discard&#xff09;在移动平台消耗较大的原因 Early-Z技术 传统的渲染管线中&#xff0c;ZTest其实是在Blending阶段&#xff0c;这时候进行深度测试&#xff0c;所有对象的像素着色器都会计算一…

电力系统风储联合一次调频MATLAB仿真模型

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 简介&#xff1a; 同一电力系统在不同风电渗透率下遭受同一负荷扰动时&#xff0c;其频率变化规律所示&#xff1a; &#xff08;1&#xff09;随着电力系统中风电渗透率的不断提高&#xff0c;风电零惯性响…

LaTex设置标题页、修改文字颜色和文字高亮

目录 一、标题页 1&#xff09;常用的代码 2&#xff09;添加脚注 二、修改文字颜色和文字高亮 1&#xff09;设置文本的颜色 2&#xff09;添加文本高亮 3&#xff09;给文本添加有颜色的方框 一、标题页 主要的代码&#xff1a; \begin{titlepage} \noindent\fonts…

OpenAI开发者大会简介

文章目录 GPT-4 Turbo 昨天晚上 OpenAI的首届开发者大会召开 Sam Altman也做了公开演讲&#xff0c;应该说 这是继今年春天发布GPT-4之后 OpenAI在AI行业又创造的一个不眠夜 过去一年 ChatGPT绝对是整个科技领域最热的词汇 OpenAI 也依靠ChatGPT取得了惊人的成绩 ChatG…

VL53L4CX TOF开发(1)----驱动TOF进行测距

VL53L4CX TOF开发.1--驱动TOF进行测距 概述视频教学样品申请完整代码下载主要特点硬件准备技术规格系统框图应用示意图生成STM32CUBEMX选择MCU串口配置IIC配置 XSHUTX-CUBE-TOF1演示结果 概述 VL53L4CX 是一款先进的激光距离传感器&#xff0c;专为长距离和多目标测量设计&…