KafkaStream:Springboot中集成

1、在kafka-demo中创建配置类

        配置kafka参数

package com.heima.kafkademo.config;import lombok.Data;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Data
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

2、在application.yml中配置上面配置类需要的参数

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

3、新增配置类,创建KStream对象,进行聚合

package com.heima.kafkademo.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}

4、启动kafka-demo服务测试

        使用生产者发送消息可以看到控制台接收成功

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/69596.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

shell脚本之正则表达式

目录 一.常见的管道命令1.1sort命令1.2uniq命令1.3tr命令1.4cut命令1.5实例1.5.1统计当前主机连接状态1.5.2统计当前主机数 二.正则表达式2.1正则表达式的定义2.2常见元字符&#xff08;支持的工具&#xff1a;find&#xff0c;grep&#xff0c;egrep&#xff0c;sed和awk&…

C语言入门 Day_5 四则运算

目录 前言 1.四则运算 2.其他运算 3.易错点 4.思维导图 前言 图为世界上第一台通用计算机ENIAC,于1946年2月14日在美国宾夕法尼亚大学诞生。发明人是美国人莫克利&#xff08;JohnW.Mauchly&#xff09;和艾克特&#xff08;J.PresperEckert&#xff09;。 计算机的最开始…

【变形金刚01】attention和transformer所有信息

图1.来源&#xff1a;Arseny Togulev在Unsplash上的照片 一、说明 这是一篇 长文 &#xff0c;几乎讨论了人们需要了解的有关注意力机制的所有信息&#xff0c;包括自我注意、查询、键、值、多头注意力、屏蔽多头注意力和转换器&#xff0c;包括有关 BERT 和 GPT 的一些细节。因…

Spring-Cloud-Loadblancer详细分析_3

前两篇文章介绍了加载过程&#xff0c;本文从Feign的入口开始分析执行过程&#xff0c;还是从FeignBlockingLoadBalancerClient.execute来入手 public class FeignBlockingLoadBalancerClient implements Client {private static final Log LOG LogFactory.getLog(FeignBlock…

SQL- 每日一题【1327. 列出指定时间段内所有的下单产品】

题目 表: Products 表: Orders 写一个解决方案&#xff0c;要求获取在 2020 年 2 月份下单的数量不少于 100 的产品的名字和数目。 返回结果表单的 顺序无要求 。 查询结果的格式如下。 示例 1: 解题思路 1.题目要求我们获取在 2020 年 2 月份下单的数量不少于 100 的产品的…

冉冉升起的星火,再度升级迎来2.0时代!

文章目录 前言权威性评测结果 星火大模型多模态功能插件功能简历生成文档问答PPT生成 代码能力 福利 前言 前几天从技术群里看到大家都在谈论《人工智能大模型体验报告2.0》里边的内容&#xff0c;抱着好奇和学习的态度把报告看了一遍。看完之后瞬间被里边提到的科大讯飞的星火…

Python爬虫:js逆向调式操作及调式中遇到debugger问题

Python爬虫:js逆向调式操作及调式中遇到debugger问题 1. 前言2. js逆向调式操作2.1 DOM事件断点2.2 XHR/提取断点(用于请求接口参数加密处理)2.3 请求返回的数据是加密的2.4 hook定位参数 3. 调式中遇到debugger问题3.1 解决方式(一律不在此处暂停)3.2 问题&#xff1a;点击一律…

Flink CDC系列之:TiDB CDC 导入 Elasticsearch

Flink CDC系列之&#xff1a;TiDB CDC 导入 Elasticsearch 一、通过docker 来启动 TiDB 集群二、下载 Flink 和所需要的依赖包三、在TiDB数据库中创建表和准备数据四、启动Flink 集群&#xff0c;再启动 SQL CLI五、在 Flink SQL CLI 中使用 Flink DDL 创建表六、Kibana查看Ela…

2023/8/16总结

这几天完成了私信的功能点&#xff0c;用websocket做的。 这是大概的界面&#xff0c;参考的是微信 用户可以搜索好友&#xff1a; 如果不存在是下面这样&#xff0c;存在就会在左边的聊天里面显示有这个人选项 发送消息 接下来需要把推荐算法给做了

asp.net core webapi如何执行周期性任务

使用Api执行周期性任务 第一种&#xff0c;无图形化界面1.新建类&#xff0c;继承IJob&#xff0c;在实现的方法种书写需要周期性执行的事件。2.编写方法类&#xff0c;定义事件执行方式3.在启动方法中&#xff0c;进行设置&#xff0c;.net 6中在program.cs的Main方法中&#…

Unity C# 之 Http 获取网页的 html 数据,并去掉 html 格式等相关信息

Unity C# 之 Http 获取网页的 html 数据&#xff0c;并去掉 html 格式等相关信息 目录 Unity C# 之 Http 获取网页的 html 数据&#xff0c;并去掉 html 格式等相关信息 一、简单介绍 二、实现原理 三、注意事项 四、效果预览 五、关键代码 一、简单介绍 Unity中的一些知…

(JavaScript笔记摘要)一次性搞定原型和原型链

一、前言 学完JavaScript ES5基础语法&#xff08;核心语法&#xff09;后&#xff0c;发现根本不够用&#xff0c;于是选择继续精进JavaScript功底 学到原型和原型链时&#xff0c;发现有点卡壳儿&#xff0c;于是投入了一定精力&#xff0c;进行了汇总整理和吸收&#xff0c…