搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符

系列文章目录


文章目录

  • 系列文章目录
  • 前言
      • 一、本文要点
      • 二、开发环境
      • 三、原项目
      • 四、修改项目
      • 五、测试一下
      • 五、小结


前言

本插件稳定运行上百个kafka项目,每天处理上亿级的数据的精简小插件,快速上手。

<dependency><groupId>io.github.vipjoey</groupId><artifactId>multi-kafka-consumer-starter</artifactId><version>最新版本号</version>
</dependency>

例如下面这样简单的配置就完成SpringBoot和kafka的整合,我们只需要关心com.mmc.multi.kafka.starter.OneProcessorcom.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。

## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=com.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

国籍惯例,先上源码:Github源码

一、本文要点

本文将介绍通过封装一个starter,来实现多kafka数据源的配置,通过通过源码,可以学习以下特性。系列文章完整目录

  • SpringBoot 整合多个kafka数据源
  • SpringBoot 批量消费kafka消息
  • SpringBoot 优雅地启动或停止消费kafka
  • SpringBoot kafka本地单元测试(免集群)
  • SpringBoot 利用map注入多份配置
  • SpringBoot BeanPostProcessor 后置处理器使用方式
  • SpringBoot 将自定义类注册到IOC容器
  • SpringBoot 注入bean到自定义类成员变量
  • Springboot 取消限定符

二、开发环境

  • jdk 1.8
  • maven 3.6.2
  • springboot 2.4.3
  • kafka-client 2.6.6
  • idea 2020

三、原项目

1、接前文,我们开发了一个kafka插件,但在使用过程中发现有些不方便的地方,例如我们所有processor需要继承MmcKafkaKafkaAbastrctProcessor<T extends MmcKafkaMsg> ,其中的T为反序列化的实体类类型。


@Slf4j
@Service
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Resourceprivate DemoService demoService;@Overrideprotected Class<DemoMsg> getEntityClass() {return DemoMsg.class;}@Overrideprotected void dealMessage(List<DemoMsg> datas) {demoService.dealMessage("one", datas.stream().map(x -> (MmcKafkaMsg) x).collect(Collectors.toList()));}}@Slf4j
@Service
public class TwoProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Resourceprivate DemoService demoService;public TwoProcessor() {}@Overrideprotected Class<DemoMsg> getEntityClass() {return DemoMsg.class;}@Overrideprotected void dealMessage(List<DemoMsg> datas) {demoService.dealMessage("two", datas.stream().map(x -> (MmcKafkaMsg) x).collect(Collectors.toList()));}}

2、可以看到这里有两个体验不太好的地方。

  • 自定义实体类DemoMsg 必须要继承 MmcKafkaMsg,很多同学会忘记这个步骤;
  • 需要覆盖getEntityClass()父类方法,用于反序列化指定实体类的类型,这里太冗余;

因此、所以我们要升级和优化。

四、修改项目

1、取消限定符,消息实体类不再强制要求实现MmcKafkaMsg接口,改为可选项,作为候选插件化的能力增强(后文介绍);

@Data
class DemoMsg {private String routekey;private String name;private Long timestamp;}

2、修改MmcKafkaKafkaAbastrctProcessor类,取消限定符并增加类型推断方法。

a、如果实现了MmcKafkaMsg接口,就拥有了单次消费内的batch数据去重能力;

public void onMessage(List<ConsumerRecord<String, String>> records) {if (null == records || CollectionUtils.isEmpty(records)) {log.warn("{} records is null or records.value is empty.", name);return;}Assert.hasText(name, "You must pass the field `name` to the Constructor or invoke the setName() after the class was created.");Assert.notNull(properties, "You must pass the field `properties` to the Constructor or invoke the setProperties() after the class was created.");try {Stream<T> dataStream = records.stream().map(ConsumerRecord::value).flatMap(this::doParse).filter(Objects::nonNull).filter(this::isRightRecord);// 支持配置强制去重或实现了接口能力去重if (properties.isDuplicate() || isSubtypeOfInterface(MmcKafkaMsg.class)) {// 检查是否实现了去重接口if (!isSubtypeOfInterface(MmcKafkaMsg.class)) {throw new RuntimeException("The interface "+ MmcKafkaMsg.class.getName() + " is not implemented if you set the config `spring.kafka.xxx.duplicate=true` .");}dataStream = dataStream.collect(Collectors.groupingBy(this::buildRoutekey)).entrySet().stream().map(this::findLasted).filter(Objects::nonNull);}List<T> datas = dataStream.collect(Collectors.toList());if (CommonUtil.isNotEmpty(datas)) {this.dealMessage(datas);}} catch (Exception e) {log.error(name + "-dealMessage error ", e);}}

b、新增类型推断方法,目的是去掉子类必须实现getEntityClass()的约束;

    protected boolean isSubtypeOfInterface(Class<?> interfaceClass) {if (null == type) {Type superClass = getClass().getGenericSuperclass();if (superClass instanceof ParameterizedType) {ParameterizedType parameterizedType = (ParameterizedType) superClass;Type[] typeArguments = parameterizedType.getActualTypeArguments();if (typeArguments.length > 0 && typeArguments[0] instanceof Class) {//noinspection uncheckedtype = (Class<T>) typeArguments[0];}}}return (null != type) && interfaceClass.isAssignableFrom(type);}protected  Class<T> getEntityClass() {if (null == type) {synchronized(this) {Type superClass = getClass().getGenericSuperclass();if (superClass instanceof ParameterizedType) {ParameterizedType parameterizedType = (ParameterizedType) superClass;Type[] typeArguments = parameterizedType.getActualTypeArguments();if (typeArguments.length > 0 && typeArguments[0] instanceof Class) {//noinspection uncheckedtype = (Class<T>) typeArguments[0];}}}}return type;}

c、修改去重方法,也就是取批次内最新一条消息,不再使用限定符;

    protected T findLasted(Map.Entry<String, List<T>> entry) {try {Optional<T> d = entry.getValue().stream().max(Comparator.comparing(x -> ((PandoKafkaMsg) x).getRoutekey()));if (d.isPresent()) {return d.get();}} catch (Exception e) {String content = JsonUtil.toJsonStr(entry.getValue());log.error("处理消息出错:{}", e.getMessage() + ": " + content, e);}return null;}protected String buildRoutekey(T t) {return ((MmcKafkaMsg) t).getRoutekey();}

3、修改MmcKafkaBeanPostProcessor,取消限定符。

public class MmcKafkaBeanPostProcessor implements BeanPostProcessor {@Getterprivate final Map<String, MmcKafkaKafkaAbastrctProcessor<?>> suitableClass = new ConcurrentHashMap<>();@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof MmcKafkaKafkaAbastrctProcessor) {MmcKafkaKafkaAbastrctProcessor<?> target = (MmcKafkaKafkaAbastrctProcessor<?>) bean;suitableClass.putIfAbsent(beanName, target);suitableClass.putIfAbsent(bean.getClass().getName(), target);}return bean;}
}

4、修改MmcKafkaProcessorFactory,取消限定符。

五、测试一下

1、引入kafka测试需要的jar。参考文章:kafka单元测试

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>

2、定义一个消息实体和业务处理类。

@Data
class DemoMsg  {private String routekey;private String name;private Long timestamp;}
@Slf4j
@Service
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Resourceprivate DemoService demoService;@Overrideprotected void dealMessage(List<DemoMsg> datas) {datas.forEach(x -> {log.info("dealMessage one: {}", x);});}}

3、配置kafka地址和指定业务处理类。

spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=com.mmc.multi.kafka.starter.OneProcessor  // 业务处理类名称
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

4、编写测试类。

@Slf4j
@ActiveProfiles("dev")
@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {MmcMultiConsumerAutoConfiguration.class, DemoService.class, OneProcessor.class})
@TestPropertySource(value = "classpath:application.properties")
@DirtiesContext
@EmbeddedKafka(topics = {"${spring.kafka.one.topic}"})
class AppTest {@Resourceprivate EmbeddedKafkaBroker embeddedKafkaBroker;@Value("${spring.kafka.one.topic}")private String topicOne;@Value("${spring.kafka.two.topic}")private String topicTwo;@Testvoid testDealMessage() throws Exception {// 模拟生产数据produceMessage();Thread.sleep(10 * 1000);}void produceMessage() {Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();for (int i = 0; i < 10; i++) {DemoMsg msg = new DemoMsg();msg.setRoutekey("routekey" + i);msg.setName("name" + i);msg.setTimestamp(System.currentTimeMillis());String json = JsonUtil.toJsonStr(msg);producer.send(new ProducerRecord<>(topicOne, "my-aggregate-id", json));producer.send(new ProducerRecord<>(topicTwo, "my-aggregate-id", json));producer.flush();}}
}

5、运行一下,测试通过。
在这里插入图片描述

五、小结

将本项目代码构建成starter,就可以大大提升我们开发效率,我们只需要关心业务代码的开发,github项目源码:轻触这里。如果对你有用可以打个星星哦。下一篇,升级本starter,在kafka单分区下实现十万级消费处理速度。

加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/660897.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【CTF Reverse】XCTF GFSJ0490 simple-unpack Writeup(UPX壳+脱壳+反汇编)

simple-unpack 菜鸡拿到了一个被加壳的二进制文件 解法 拉进 exeinfope。 检测到是 UPX 打包的 ELF 文件。 NOT Win EXE - .o - ELF [ 64bit obj. Exe file - CPU : AMD x86-64 - OS/ABI: Linux/GNU ]Detected UPX! packer - http://upx.github.io -> try unpack with &…

VS2022 .Net6.0 无法打开窗体设计器

拿Vs2022 建了个Demo&#xff0c;运行环境是net6.0-windows&#xff0c;无论双击或是右键都打不开窗体设计器 打开项目目录下的*.csproj.user <?xml version"1.0" encoding"utf-8"?> <Project ToolsVersion"Current" xmlns"htt…

Github 2024-05-01 开源项目月报Top20

根据Github Trendings的统计,本月(2024-05-01统计)共有20个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目13TypeScript项目5C项目2非开发语言项目1C++项目1JavaScript项目1Rust项目1Go项目1Shell项目1Svelte项目1编程面试大学:成为软件工程…

JENKINS 安装,学习运维从这里开始

Download and deployJenkins – an open source automation server which enables developers around the world to reliably build, test, and deploy their softwarehttps://www.jenkins.io/download/首先点击上面。下载Jenkins 为了学习&#xff0c;从windows开始&#x…

java技术栈快速复习04_javaweb基础总结

javaweb概述 JDBC JDBC&#xff08;Java DataBase Connectivity&#xff0c;Java数据库连接&#xff09;是一种用于执行SQL语句的Java API&#xff0c;可以为多种关系数据库提供统一访问。简单说就是用Java语言来操作数据库。 jdbc原理 早期SUN公司的天才们想编写一套可以连接…

【初识Redis】

初识Redis Redis&#xff08;Remote Dictionary Server&#xff09;是一个开源的内存数据库&#xff0c;它提供了一个高性能的键值存储系统&#xff0c;并且支持多种数据结构&#xff0c;包括字符串、哈希、列表、集合和有序集合等。Redis的特点包括&#xff1a; 内存存储&…

出现 xx has no default (no arg) constructor 解决方法

目录 1. 问题所示2. 原理分析3. 解决方法 1. 问题所示 执行脱敏函数的时候&#xff0c;出现如下问题&#xff1a; Exception in thread "main" com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Class com.example.test.ChineseNameDesensitizatio…

项目中跟踪和提高资源利用率的方法

发展业务和服务是每个组织的首要任务。但是&#xff0c;仅仅把资源分配到项目上并不能确保会有高效率的工作产出。为了达到最高效率&#xff0c;这些资源必须在可以计费的或有战略意义的工作上得到有效利用。 资源利用率是衡量项目是否成功的一个关键指标。通过跟踪资源如何被…

如何提升制造设备文件汇集的可靠性和安全性?

制造设备文件汇集通常指的是将与制造设备相关的各种文档和资料进行整理和归档的过程。这些文件可能包括但不限于&#xff1a; 生产数据&#xff1a;包括生产计划、订单信息、生产进度等。 设计文件&#xff1a;如CAD图纸、设计蓝图、产品模型等。 工艺参数&#xff1a;用于指…

EasyRecovery2024汉化版电脑数据恢复软件下载

EasyRecovery是一款功能强大的数据恢复软件&#xff0c;其主要功能包括但不限于以下几点&#xff1a; 硬盘数据恢复&#xff1a;能够扫描本地计算机中的所有卷&#xff0c;建立丢失和被删除文件的目录树&#xff0c;实现硬盘格式化、重新分区、误删数据、重建RAID等硬盘数据恢…

JavaScript原型链深度剖析

目录 前言 一、原型链 1.原型链的主要组成 原型&#xff08;Prototype&#xff09; 构造函数&#xff08;Constructor&#xff09; 实例&#xff08;Instance&#xff09; 2.原型链的工作原理 前言 在JavaScript的世界中&#xff0c;原型链&#xff08;Prototype Chain&…

C++多态(全)

多态 概念 调用函数的多种形态&#xff0c; 多态构成条件 1&#xff09;父子类完成虚函数的重写&#xff08;三同&#xff1a;函数名&#xff0c;参数&#xff0c;返回值相同&#xff09; 2&#xff09;父类的指针或者引用调用虚函数 虚函数 被virtual修饰的类成员函数 …