Kafka的安装及接入SpringBoot

环境:windows、jdk1.8、springboot2

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/

1.概述

        Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目。它设计用于处理大规模的实时数据流,具有高吞吐量、低延迟、持久性等特点,被广泛应用于构建实时数据管道、日志收集、事件驱动架构等场景。

        详细概述见Kafka概述:

1.1 Kafka的作用

  • 发布和订阅记录流
  • 持久存储记录流,Kafka中的数据即使消费后也不会消失
  • 在系统或应用之间构建可靠获取数据的实时流数据管道
  • 构建转换或响应数据流的实时流应用程序
  • Kafka可以处理源源不断产生的数据

1.2 Kafka的一些概念

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic 就是Rabbitmq中的queue)

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

2.Kafka下载安装

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/downloads        选择最新版就可以

2.1 配置kafka

        解压下载的文件,修改 config 文件夹下的 zookeeper.properties

        修改 config 文件夹下的 server.properties

        当需要外网访问时要配置advertised.listeners(比如连云服务器的kafka)

advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

 

2.2 启动 zookeeper

        Zookeeper 在 Kafka 中充当了分布式协调服务的角色,帮助 Kafka 实现了集群管理、元数据存储、故障恢复、领导者选举等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撑。

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

        可以本地访问看一下:http://localhost:2181/ 

2.3 启动Kafka 

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

kafka-server-start.sh ../../config/server.properties

        访问路径: http://localhost:9092/ 

2.4 便捷启动脚本

        两个脚本放到Kafka的目录(kafka_2.13-3.7.0)中

cd bin\windows

zookeeper-server-start.bat ../../config/zookeeper.properties

cd bin\windows

kafka-server-start.bat ../../config/server.properties

3.springboot集成Kafka

3.1 环境搭建

(1)添加pom依赖

<!-- 继承Spring boot工程 -->
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.8.RELEASE</version>
</parent>
<properties><fastjson.version>1.2.58</fastjson.version>
</properties>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>

(2)配置类application.yml

        生产者:

spring:kafka:bootstrap-servers: xxx.xxx.xxx.xxx:9092producer:retries: 0key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

        消费者:

spring:kafka:bootstrap-servers: xxx.xxx.xxx.xxx:9092consumer:group-id: kafka-demo-kafka-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApp {public static void main(String[] args) {SpringApplication.run(KafkaApp.class, args);}
}

3.2 消息生产者

        junit测试,新建消息发送方

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
​
​
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaSendTest {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate; //如果这里有红色波浪线,那是假错误
​@Testpublic void sendMsg(){String topic = "spring_test";kafkaTemplate.send(topic,"hello spring boot kafka!");System.out.println("发送成功.");while (true){ //保存加载ioc容器
​}}
}

3.3 消息消费者

        新建监听类:

​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
​
@Component
public class MyKafkaListener {
​//    以下两种方法都行// 指定监听的主题
//    @KafkaListener(topics = "spring_test")
//    public void receiveMsg(String message){
//        System.out.println("接收到的消息:"+message);
//    }
​@KafkaListener(topics = "spring_test")public void handleMessage(ConsumerRecord<String, String> record) {System.out.println("接收到消息,偏移量为: " + record.offset() + " 消息为: " + record.value());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/691193.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机毕业设计】springbootBBS论坛系统

本系统为用户而设计制作 BBS论坛系统&#xff0c;旨在实现BBS论坛智能化、现代化管理。本BBS论坛自动化系统的开发和研制的最终目的是将BBS论坛的运作模式从手工记录数据转变为网络信息查询管理&#xff0c;从而为现代管理人员的使用提供更多的便利和条件。使BBS论坛系统数字化…

win11 u盘启动

本文基于 rufus制作U盘启动盘过程_rufus制作windows11启动盘-CSDN博客 做了简单修改 这里讲一讲制作一个windows11启动盘的过程。 1.首先下载windows11的镜像&#xff08;可以从这里下载纯净版&#xff1a;https://msdn.itellyou.cn/&#xff09; 2.从rufus官网下载最新的版…

虚幻五关卡制作学习笔记

1.创建一个移动平台 这个移动平台的功能&#xff1a;从箭头1移动到箭头2来回移动&#xff0c;可移动时发绿光&#xff0c;不可移动时发红光 首先&#xff0c;创建两个材质&#xff0c;发红光和绿光 然后我们创建一个actor蓝图类&#xff0c;添加两个arrow组件&#xff0c;两个…

大数据测试

1、前言 大数据测试是对大数据应用程序的测试过程&#xff0c;以确保大数据应用程序的所有功能按预期工作。大数据测试的目标是确保大数据系统在保持性能和安全性的同时&#xff0c;平稳无差错地运行。 大数据是无法使用传统计算技术处理的大型数据集的集合。这些数据集的测试涉…

【北京迅为】《iTOP-3588从零搭建ubuntu环境手册》-第6章 安装Samba

RK3588是一款低功耗、高性能的处理器&#xff0c;适用于基于arm的PC和Edge计算设备、个人移动互联网设备等数字多媒体应用&#xff0c;RK3588支持8K视频编解码&#xff0c;内置GPU可以完全兼容OpenGLES 1.1、2.0和3.2。RK3588引入了新一代完全基于硬件的最大4800万像素ISP&…

答辩PPT设计缺乏专业感?笔灵AI提供简洁且关键的幻灯片设计

在我原本的认知里面&#xff0c;答辩PPT是要包含论文各个章节的&#xff0c;在答辩时需要方方面面都讲到的&#xff0c;什么摘要、文献综述、实证分析、研究结果样样不落。但是&#xff0c;这大错特错&#xff01; 答辩PPT环节时长一般不超过5分钟&#xff0c;老师想要的答辩P…

PyCharm 集成 Git

目录 1、配置 Git 忽略文件 2、定位Git 3、使用pycharm本地提交 3.1、初始化本地库 3.2、添加到暂存区 3.3、提交到本地库 3.4、切换版本 4、分支操作 4.1、创建分支 4.2、切换分支 4.3、合并分支 5、解决冲突 1、配置 Git 忽略文件 作用&#xff1a;与项目的实际…

乡村旅游指标-最美乡村数、旅游示范县数、旅行社数、景区数、农家乐数(2007-2021年)

01、数据介绍 乡村旅游也是促进乡村经济发展的有效途径。通过发展乡村旅游&#xff0c;可以带动乡村相关产业的发展&#xff0c;提高乡村居民的收入&#xff0c;促进乡村的经济发展和社会进步。此外&#xff0c;乡村旅游还能促进城乡交流&#xff0c;推动城乡统筹发展。 数据…

微信小程序踩坑,skyline模式下,scroll-view下面的一级元素设置margin中的auto无效,具体数据有效

开发工具版本 基础库 开启skyline渲染调试 问题描述 skyline模式下,scroll-view下面的一级元素的margin写auto的值是没有效果的(二级元素margin写auto是有效果的),关闭这个模式就正常显示 演示效果图 父元素的宽度和高度效果(宽度是750rpx,宽度占满的) 一级元素宽度和css效果…

绍兴ISO27001认证:信息安全认证的金钥匙

&#x1f308;&#x1f308;绍兴ISO27001认证&#xff1a;✌️信息安全认证的金钥匙&#x1f511; &#x1f498;随着信息技术的飞速发展&#xff0c;&#x1f481;信息安全问题日益凸显。&#x1f510;为了提升信息安全管理水平&#xff0c;&#x1f46e;保障企业数据资产安全…

java项目之企业OA管理系统源码(springboot+vue+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的企业OA管理系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 企业OA管理系统的主要使用…

【信号与槽机制】

信号与槽机制 &#x1f31f; 信号函数&#x1f31f; 槽函数&#x1f31f; 连接函数&#x1f338; QObejct::connect函数剖析&#x1f31f; 官方文档中给出的定义&#x1f31f;《Qt 5.9 C开发指南》中的定义 &#x1f31f; 信号函数 信号是一种特殊的成员函数&#xff0c;用于在…