KafkaStream:基本使用

简介:

        kafkaStream:提供了对存储在kafka中的数据进行流式处理和分析的功能

特点:

        KafkasSream提供了一个非常简单轻量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署

入门案例:

  1、新建工程kafka-demo

           引入kafkaStream依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--kafkaStream--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency></dependencies>

   2、新建流式处理类

          代码如下

package com.heima.kafkademo.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*
* 流式处理
* */
public class KafkaStreamQuickStart {public static void main(String[] args) {/*创建kafka配置中心并配置参数*/Properties prop = new Properties();//连接地址prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//key序列化prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//value序列化prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());//创建id名称prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream构造器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}//流式计算方法private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kafka对象,同时指定从哪个topic获取消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");//处理消息的valuestream.flatMapValues(new ValueMapper<String, Iterable<?>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})      //按照value进行聚合.groupBy((key,value)->value)//时间窗口,每隔10秒更新一次.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}

3、启动消费者类和流式处理类监听消息

        使用生产者类发送消息

       消费者和生产者类代码参考Kafka:安装和配置_Success___的博客-CSDN博客

4、测试

        成功接收到消息

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/69484.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WebRTC音视频通话-WebRTC本地视频通话使用ossrs服务搭建

iOS开发-ossrs服务WebRTC本地视频通话服务搭建 之前开发中使用到了ossrs&#xff0c;这里记录一下ossrs支持的WebRTC本地服务搭建。 一、ossrs是什么&#xff1f; ossrs是什么呢&#xff1f; SRS(Simple Realtime Server)是一个简单高效的实时视频服务器&#xff0c;支持RTM…

ubuntu18.04下配置muduoC++11环境

1.安装muduo依赖的编译工具及库 Cmake sudo apt-get install cmakeBoost sudo apt-get install libboost-dev libboost-test-devcurl、c-ares DNS、google protobuf sudo apt-get install libcurl4-openssl-dev libc-ares-dev sudo apt-get install protobuf-compiler libp…

LeetCode--HOT100题(30)

目录 题目描述&#xff1a;24. 两两交换链表中的节点&#xff08;中等&#xff09;题目接口解题思路代码 PS: 题目描述&#xff1a;24. 两两交换链表中的节点&#xff08;中等&#xff09; 给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节…

【Linux】进程地址空间

目录 一、回顾我们以前学习的地址空间二、进程地址空间三、进程地址空间的作用四、解决一个地址出现两个值的问题 一、回顾我们以前学习的地址空间 这个内存布局真是的我们实实在在的内存嘛&#xff1f; 答案是不是的 下面我们来验证 1 #include<stdio.h>2 #include<a…

数据库操作不再困难,MyBatis动态Sql标签解析

系列文章目录 MyBatis缓存原理 Mybatis的CachingExecutor与二级缓存 Mybatis plugin 的使用及原理 MyBatis四大组件Executor、StatementHandler、ParameterHandler、ResultSetHandler 详解 MyBatisSpringboot 启动到SQL执行全流程 数据库操作不再困难&#xff0c;MyBatis动态S…

YOLOv5复现过程出现的问题(关于数据集路径)dataset not found

YOLOv5复现过程出现的问题&#xff08;关于数据集路径&#xff09; 在复现YOLOv5时&#xff0c;按照唐老师的教程&#xff08; https://www.bilibili.com/video/BV11K41167Ar?t122.1&p63&#xff09;下载好了数据集&#xff08;MaskWearing就是检测口罩的一个&#xff0c;…

AT89C51单片机实现单片机串口互动(中断方式,单片机--单片机,应答)

说一下功能&#xff1a;客户机发送0x01到服务机 2服务单片机应答0xf2到客户机 3客户机接收到0xf2,发送信息153432这6个数字到服务机 4client发送完信息后发送0xaa结束通信 5server接收到0xaa后回复0xaa结束通信&#xff0c;从此老死不相往来 看代码&#xff1a; //发送端…

订货系统怎么选?从这四个方面筛选错不了(一)

选择适合的订货系统对企业来说是一个重要且复杂的决策。一个优秀的订货系统可以提高供应链的运作效率、降低成本&#xff0c;并帮助企业更好地管理库存和订单。如果不知道从那几方面做选择&#xff0c;我们可以简单从四个方面进行筛选&#xff0c;这样一般错不了&#xff0c;今…

list

目录 迭代器 介绍 种类 本质 介绍 模拟实现 注意点 代码 迭代器 介绍 在C中&#xff0c;迭代器&#xff08;Iterators&#xff09;是一种用于遍历容器&#xff08;如数组、vector、list等&#xff09;中元素的工具 无论容器的具体实现细节如何,访问容器中的元素的方…

语聚AI如何通过对话方式让AI助手执行应用软件

1 什么是应用助手&#xff1f; 应用助手可以通过对话的方式让AI助手执行应用软件的操作。 例如&#xff0c;您可以调用“Bing搜索引擎搜索”热门信息&#xff0c;然后根据这些信息总结内容。 也可以使用“抖音”应用&#xff0c;搜索热门抖音视频&#xff0c;并根据热门视频生…

Python爱心光波

文章目录 前言Turtle入门简单案例入门函数 爱心光波程序设计程序分析 尾声 前言 七夕要来啦&#xff0c;博主在闲暇之余创作了一个爱心光波&#xff0c;感兴趣的小伙伴们快来看看吧&#xff01; Turtle入门 Turtle 是一个简单而直观的绘图工具&#xff0c;它可以帮助你通过简…

Stable Diffusion WebUI 从零基础到入门

本文主要介绍Stable Diffusion WebUI的实际操作方法&#xff0c;涵盖prompt推导、lora模型、vae模型和controlNet应用等内容&#xff0c;并给出了可操作的文生图、图生图实战示例。适合对Stable Diffusion感兴趣&#xff0c;但又对Stable Diffusion WebUI使用感到困惑的同学&am…