SpringBoot-集成Kafka详解

SpringBoot集成Kafka

1、构建项目

1.1、引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.5.RELEASE</version>
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.28</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency>
</dependencies>
1.2、application.yml配置
spring:application:name: application-kafkakafka:bootstrap-servers: localhost:9092 #这个是kafka的地址,对应你server.properties中配置的producer:batch-size: 16384 #批量大小acks: -1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)retries: 10 # 消息发送重试次数#transaction-id-prefix: transactionbuffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger:ms: 2000 #提交延迟#partitioner: #指定分区器#class: pers.zhang.config.CustomerPartitionHandlerconsumer:group-id: testGroup #默认的消费组IDenable-auto-commit: true #是否自动提交offsetauto-commit-interval: 2000 #提交offset延时# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset;# latest:重置为分区中最新的offset(消费分区中新产生的数据);# none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: latestmax-poll-records: 500 #单次拉取消息的最大条数key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session:timeout:ms: 120000 # 消费会话超时时间(超过这个时间 consumer 没有发送心跳,就会触发 rebalance 操作)request:timeout:ms: 18000 # 消费请求的超时时间listener:missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
#      type: batch
1.3、简单生产
@RestController
public class kafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/normal/{message}")public void sendNormalMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message);}
}
1.4、简单消费
@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"sb_topic"})public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +record.value());}}
简单消费:sb_topic-0=111
简单消费:sb_topic-0=222
简单消费:sb_topic-0=333

2、生产者

2.1、带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

/*** 回调的第一种写法* @param message*/
@GetMapping("/kafka/callbackOne/{message}")
public void sendCallbackOneMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message).addCallback(new SuccessCallback<SendResult<String, Object>>() {//成功的回调@Overridepublic void onSuccess(SendResult<String, Object> success) {// 消息发送到的topicString topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();System.out.println("发送消息成功1:" + topic + "-" + partition + "-" + offset);}}, new FailureCallback() {//失败的回调@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败1:" + throwable.getMessage());}});
}

 

发送消息成功1:sb_topic-0-3
简单消费:sb_topic-0=one
/*** 回调的第二种写法* @param message*/
@GetMapping("/kafka/callbackTwo/{message}")
public void sendCallbackTwoMessage(@PathVariable("message") String message) {kafkaTemplate.send("sb_topic", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {System.out.println("发送消息失败2:"+throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());}});
}

发送消息成功2:sb_topic-0-4
简单消费:sb_topic-0=two
2.2、监听器

Kafka提供了ProducerListener 监听器来异步监听生产者消息是否发送成功,我们可以自定义一个kafkaTemplate添加ProducerListener,当消息发送失败我们可以拿到消息进行重试或者把失败消息记录到数据库定时重试。

@Configuration
public class KafkaConfig {@AutowiredProducerFactory producerFactory;@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>();kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {@Overridepublic void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {System.out.println("发送成功 " + producerRecord.toString());}@Overridepublic void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) {System.out.println("发送成功 topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);}@Overridepublic void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {System.out.println("发送失败" + producerRecord.toString());System.out.println(exception.getMessage());}@Overridepublic void onError(String topic, Integer partition, String key, Object value, Exception exception) {System.out.println("发送失败" + "topic = " + topic + " ; partion = " + partition + "; key = " + key + " ; value=" + value);System.out.println(exception.getMessage());}});return kafkaTemplate;}
}

注意:当我们发送一条消息,既会走 ListenableFutureCallback 回调,也会走ProducerListener回调。

2.3、自定义分区器

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

1、若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
2、若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
3、patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区
 

public class CustomizePartitioner implements Partitioner {@Overridepublic int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {//自定义分区规则,默认全部发送到0号分区return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

在application.properties中配置自定义分区器,配置的值就是分区器类的全路径名

# 自定义分区器
spring.kafka.producer.properties.partitioner.class=pers.zhang.config.CustomizePartitioner
2.4、事务提交

如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务:

@GetMapping("/kafka/transaction/{message}")
public void sendTransactionMessage(@PathVariable("message") String message) {//声明事务:后面报错消息不会发出去kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {@Overridepublic Object doInOperations(KafkaOperations<String, Object> operations) {operations.send("sb_topic", message + " test executeInTransaction");throw new RuntimeException("fail");}});// //不声明事务:后面报错但前面消息已经发送成功了// kafkaTemplate.send("sb_topic", message + " test executeInNoTransaction");// throw new RuntimeException("fail");
}

注意:如果声明了事务,需要在application.yml中指定:

spring:kafka:producer:transaction-id-prefix: tx_ #事务id前缀

3、消费者

3.1、指定topic、partition、offset消费

前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供

spring:kafka:listener:type: batch #设置批量消费consumer:max-poll-records: 50 #每次最多消费多少条消息

属性解释:

  • id:消费者ID
  • groupId:消费组ID
  • topics:监听的topic,可监听多个
  • topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听,手动分区。
//批量消费
@KafkaListener(id = "consumer2", topics = {"sb_topic"}, groupId = "sb_group")
public void onBatchMessage(List<ConsumerRecord<String, Object>> records) {System.out.println(">>> 批量消费一次,recoreds.size()=" + records.size());for (ConsumerRecord<String, Object> record : records) {System.out.println(record.value());}
}
>>> 批量消费一次,recoreds.size()=4
hello
hello
hello
hello
>>> 批量消费一次,recoreds.size()=2
hello
hello
3.2、异常处理

ConsumerAwareListenerErrorHandler 异常处理器,新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器。
 

//异常处理器
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {return new ConsumerAwareListenerErrorHandler() {@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {System.out.println("消费异常:" + message.getPayload());return null;}};
}// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"sb_topic"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {throw new Exception("简单消费-模拟异常");
}// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "sb_topic",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {System.out.println("批量消费一次...");throw new Exception("批量消费-模拟异常");
}
批量消费一次...
消费异常:[ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 19, CreateTime = 1692604586558, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello), ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 20, CreateTime = 1692604587164, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello), ConsumerRecord(topic = sb_topic, partition = 0, leaderEpoch = 0, offset = 21, CreateTime = 1692604587790, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)]
3.3、消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
 

@Autowired
ConsumerFactory consumerFactory;//消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);//被过滤的消息将被丢弃factory.setAckDiscarded(true);//消息过滤策略factory.setRecordFilterStrategy(new RecordFilterStrategy() {@Overridepublic boolean filter(ConsumerRecord consumerRecord) {if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {return false;}return true;}});return factory;
}//消息过滤监听
@KafkaListener(topics = {"sb_topic"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?, ?> record) {System.out.println(record.value());
}

上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向topic发送0-9总共10条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数:

3.4、消息转发

在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下:
 

//消息转发 从sb_topic转发到sb_topic2
@KafkaListener(topics = {"sb_topic"})
@SendTo("sb_topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {return record.value()+"-forward message";
}@KafkaListener(topics = {"sb_topic2"})
public void onMessage8(ConsumerRecord<?, ?> record) {System.out.println("收到sb_topic转发过来的消息:" + record.value());
}
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
收到sb_topic转发过来的消息:hello-forward message
3.5、定时启动、停止

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

1、禁止监听器自启动;
2、创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在Spring中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动
 

@EnableScheduling
@Component
public class CronTimer {/*** @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,* 而是会被注册在KafkaListenerEndpointRegistry中,* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean**/@Autowiredprivate KafkaListenerEndpointRegistry registry;@Autowiredprivate ConsumerFactory consumerFactory;
​// 监听器容器工厂(设置禁止KafkaListener自启动)@Beanpublic ConcurrentKafkaListenerContainerFactory delayContainerFactory() {ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();container.setConsumerFactory(consumerFactory);//禁止KafkaListener自启动container.setAutoStartup(false);return container;}// 监听器@KafkaListener(id="timingConsumer",topics = "sb_topic",containerFactory = "delayContainerFactory")public void onMessage1(ConsumerRecord<?, ?> record){System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());}// 定时启动监听器@Scheduled(cron = "0 42 11 * * ? ")public void startListener() {System.out.println("启动监听器...");// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器if (!registry.getListenerContainer("timingConsumer").isRunning()) {registry.getListenerContainer("timingConsumer").start();}//registry.getListenerContainer("timingConsumer").resume();}
​// 定时停止监听器@Scheduled(cron = "0 45 11 * * ? ")public void shutDownListener() {System.out.println("关闭监听器...");registry.getListenerContainer("timingConsumer").pause();}
}

启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作,

11:42分监听器启动开始工作,消费消息

11:45分监听器停止工作:

3.6、手动确认消息

默认情况下Kafka的消费者是自动确认消息的,通常情况下我们需要在业务处理成功之后手动触发消息的签收,否则可能会出现:消息消费到一半消费者异常,消息并未消费成功但是消息已经自动被确认,也不会再投递给消费者,也就导致消息丢失了。

当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式;
 

public enum AckMode {// 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交RECORD,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交BATCH,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交TIME,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交COUNT,// TIME | COUNT 有一个条件满足时提交COUNT_TIME,// 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交MANUAL,// 手动调用Acknowledgment.acknowledge()后立即提交MANUAL_IMMEDIATE,
}

如果设置AckMode模式为MANUAL或者MANUAL_IMMEDIATE,则需要对监听消息的方法中,引入Acknowledgment对象参数,并调用acknowledge()方法进行手动提交;

第一步:添加kafka配置,把 spring.kafka.listener.ack-mode = manual 设置为手动
 

spring:kafka:listener:ack-mode: manual consumer:enable-auto-commit: false

第二步;消费消息的时候,给方法添加Acknowledgment参数签收消息:

@KafkaListener(topics = {"sb_topic"})
public void onMessage9(ConsumerRecord<String, Object> record, Acknowledgment ack) {System.out.println("收到消息:" + record.value());//确认消息ack.acknowledge();
}

4、配置详解

4.1、生产者yml方式
server:port: 8081
spring:kafka:producer:# Kafka服务器bootstrap-servers: 175.24.228.202:9092# 开启事务,必须在开启了事务的方法中发送,否则报错transaction-id-prefix: kafkaTx-# 发生错误后,消息重发的次数,开启事务必须设置大于0。retries: 3# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。# 开启事务时,必须设置为allacks: all# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 生产者内存缓冲区的大小。buffer-memory: 1024000# 键的序列化方式key-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
4.2、生产者Config方式
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;
import java.util.Map;/*** kafka配置,也可以写在yml,这个文件会覆盖yml*/
@SpringBootConfiguration
public class KafkaProviderConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.transaction-id-prefix}")private String transactionIdPrefix;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.retries}")private String retries;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.buffer-memory}")private String bufferMemory;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>(16);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。//acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。//acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。//开启事务必须设为allprops.put(ProducerConfig.ACKS_CONFIG, acks);//发生错误后,消息重发的次数,开启事务必须大于0props.put(ProducerConfig.RETRIES_CONFIG, retries);//当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送//批次的大小可以通过batch.size 参数设置.默认是16KB//较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。//比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟//实测batchSize这个参数没有用props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);//有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,//即使数据没达到16KB,也将这个批次发送出去props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");//生产者内存缓冲区的大小props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);//反序列化,和生产者的序列化方式对应props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return props;}@Beanpublic ProducerFactory<Object, Object> producerFactory() {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());//开启事务,会导致 LINGER_MS_CONFIG 配置失效factory.setTransactionIdPrefix(transactionIdPrefix);return factory;}@Beanpublic KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic KafkaTemplate<Object, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
4.3、消费者yml方式
server:port: 8082
spring:kafka:consumer:# Kafka服务器bootstrap-servers: 175.24.228.202:9092group-id: firstGroup# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D#auto-commit-interval: 2s# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)# none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式#key-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要properties:spring:json:trusted:packages: "*"# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况max-poll-records: 3properties:# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancemax:poll:interval:ms: 600000# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10ssession:timeout:ms: 10000listener:# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数concurrency: 4# 自动提交关闭,需要设置手动消息确认ack-mode: manual_immediate# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000
4.4、消费者Config方式
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;/*** kafka配置,也可以写在yml,这个文件会覆盖yml*/
@SpringBootConfiguration
public class KafkaConsumerConfig {@Value("${spring.kafka.consumer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Value("${spring.kafka.properties.session.timeout.ms}")private String sessionTimeout;@Value("${spring.kafka.properties.max.poll.interval.ms}")private String maxPollIntervalTime;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.listener.concurrency}")private Integer concurrency;@Value("${spring.kafka.listener.missing-topics-fatal}")private boolean missingTopicsFatal;@Value("${spring.kafka.listener.poll-timeout}")private long pollTimeout;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>(16);propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);//自动提交的时间间隔,自动提交开启时生效propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");//该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理://earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)//none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);//两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepropsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);//这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。//这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,//如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,//然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。//要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数//注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10spropsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);return propsMap;}@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {//配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要try(JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {deserializer.trustedPackages("*");return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);}}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//在侦听器容器中运行的线程数,一般设置为 机器数*分区数factory.setConcurrency(concurrency);//消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误factory.setMissingTopicsFatal(missingTopicsFatal);//自动提交关闭,需要设置手动消息确认factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setPollTimeout(pollTimeout);//设置为批量监听,需要用List接收//factory.setBatchListener(true);return factory;}
}

5、注解消费示例

5.1、简单消费
    /*** 指定一个消费者组,一个主题主题。* @param record*/@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)public void simpleConsumer(ConsumerRecord<String, String> record) {System.out.println("进入simpleConsumer方法");System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.2、监听多个主题
    /*** 指定多个主题。** @param record*/@KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)public void topics(ConsumerRecord<String, String> record) {System.out.println("进入topics方法");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.3、监听一个主题,指定分区消费
    /*** 监听一个主题,且指定消费主题的哪些分区。* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})},concurrency = "2")public void consumeByPattern(ConsumerRecord<String, String> record) {System.out.println("consumeByPattern");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.4、指定多个分区,指定起始偏移量,多线程消费
    /*** 指定多个分区从哪个偏移量开始消费。* 10个线程,也就是10个消费者*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPAD_TOPIC,partitions = {"0","1"},partitionOffsets = {@PartitionOffset(partition = "2", initialOffset = "10"),@PartitionOffset(partition = "3", initialOffset = "0"),})},concurrency = "10")public void consumeByPartitionOffsets(ConsumerRecord<String, String> record) {System.out.println("consumeByPartitionOffsets");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.5、监听多个主题,指定多个分区,指定起始偏移量
    /*** 指定多个主题。参数详解如上面的方法。* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "4")public void topics2(ConsumerRecord<String, String> record) {System.out.println("topics2");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}
5.6、指定多个监听器
    /*** 指定多个消费者组。参数详解如上面的方法。** @param record*/@KafkaListeners({@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3"),@KafkaListener(groupId = XM_GROUP,topicPartitions = {@TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = XMPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3")})public void groupIds(ConsumerRecord<String, String> record) {System.out.println("groupIds");System.out.println("内容:" + record.value());System.out.println("分区:" + record.partition());System.out.println("偏移量:" + record.offset());System.out.println("创建消息的时间戳:" + record.timestamp());}
5.7、手动提交偏移量
    /*** 设置手动提交偏移量** @param record*/@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP,//3个消费者concurrency = "3")public void setCommitType(ConsumerRecord<String, String> record, Acknowledgment ack) {System.out.println("setCommitType");System.out.println("内容:" + record.value());System.out.println("分区:" + record.partition());System.out.println("偏移量:" + record.offset());System.out.println("创建消息的时间戳:" + record.timestamp());ack.acknowledge();}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/189219.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

php中RESTful API使用

1、RESTful AP是什么 RESTful API是一种软件架构风格 RESTful API基于HTTP协议&#xff0c;并遵循一系列约定和原则。它的设计理念是将资源&#xff08;Resource&#xff09;作为核心概念&#xff0c;并通过一组统一的接口对资源进行操作。API的资源通常通过URL进行标识&…

vue + antd 动态增加表单并进行表单校验

<template><a-modalv-model:visible="visible":title="formData.id ? 编辑渠道 : 添加渠道":width="850":mask-closable="false":destroy-on-close="true"@ok="onSubmit"@cancel="onClose"&g…

<Linux>(极简关键、省时省力)《Linux操作系统原理分析之Linux 进程管理 4》(8)

《Linux操作系统原理分析之Linux 进程管理 4》&#xff08;8&#xff09; 4 Linux 进程管理4.4 Linux 进程的创建和撤销4.4.1 Linux 进程的族亲关系4.4.2 Linux 进程的创建4.4.3 Linux 进程创建的过程4.4.4 Linux 进程的执行4.4.5 Linux 进程的终止和撤销 4 Linux 进程管理 4.…

关于CSDN右上角的消息数显示

最近一段时间CSDN总是出一下小问题&#xff0c;要么网页访问无响应宕机&#xff0c;要么已发表的文章今天写的时候又出现在草稿箱里&#xff0c;待编辑页面里。今天发现右上角的未读消息也是消除不了。 越点越多。 按道理&#xff0c;CSDN这么大的公司&#xff0c;中国最大的程…

dataGridView 嵌套ComboBox对单元格精准绑定数据

1&#xff0c;数据准备并绑定数据 List<P> list new List<P>();for (int i 0; i < 3; i){P data new P();data.Idx i 1;data.Name "名称" i;list.Add(data);}dataGridView1.DataSource list;dataGridView1.Refresh(); 2&#xff0c;对单元格…

248: vue+openlayers 以静态图片作为底图,并在上面绘制矢量多边形

第248个 点击查看专栏目录 本示例是演示如何在vue+openlayers项目中以静态图片作为底图,并在上面绘制矢量多边形。这里主要通过pixels的坐标作为投射,将静态图片作为底图,然后通过正常的方式在地图上显示多边形。注意的是左下角为[0,0]。 直接复制下面的 vue+openlayers源代…

获取用户详细信息

pojo.user&#xff1a;JsonIgnore注解作用忽略密码属性&#xff0c;返回给用户的信息不能有敏感属性密码 package com.lin.springboot01.pojo;import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data;import java.time.LocalDateTime;Data public class Use…

vim模式用法总结

0.前言 我们用gcc编译文件的时候&#xff0c;如果发生了下面的错误&#xff0c;那么如何用vim打开的时候就定位到&#xff1f; 我们可以知道&#xff0c;这是第6行出现了错误&#xff1b; 所以我们使用vim打开的时候多输入个这个&#xff0c;我们就可以快速定位了 vim test.c 6…

【观察】华为:数智世界“一触即达”,应对数智化转型“千变万化”

毫无疑问&#xff0c;数智化既是这个时代前进所趋&#xff0c;也是国家战略所指&#xff0c;更是所有企业未来发展进程中达成的高度共识。 但也要看到&#xff0c;由于大量新兴技术的出现&#xff0c;技术热点不停的轮转&#xff0c;加上市场环境的快速变化&#xff0c;让数智化…

使用vant list实现订单列表,支持下拉加载更多

在公司项目开发时&#xff0c;有一个需求是实现可以分页的订单列表&#xff0c;由于是移动端项目&#xff0c;所以最好的解决方法是做下拉加载更多。 1.在页面中使用vant组件 <van-listv-model"loading":finished"finished"finished-text"没有更…

Android 当中的 Fragment 协作解耦方式

Android 当中的 Fragment 协作解耦方式 文章目录 Android 当中的 Fragment 协作解耦方式第一章 前言介绍第01节 遇到的问题第02节 绘图说明 第二章 核心代码第01节 代理人接口第02节 中间人 Activity第03节 开发者A第04节 开发者B第05节 测试类 第一章 前言介绍 第01节 遇到的…

使用 React Flow 构建一个思维导图应用

思维导图是围绕共同主题或问题将思想、概念、信息或任务分组的视觉表示。思维导图应用是一种软件应用&#xff0c;允许您创建、可视化和组织您的思想、想法和信息作为思维导图。本文将向您展示如何实现自己的思维导图应用程序。 在我们开始之前&#xff0c;我想向您展示一下我们…