Kafka是什么,以及如何使用SpringBoot对接Kafka

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle
架构必备能力——kafka的选型对比及应用场景
Kafka存取原理与实现分析,打破面试难关
防止消息丢失与消息重复——Kafka可靠性分析及优化实践



在这里插入图片描述
继上一次教大家手把手安装kafka后,今天我们直接来到入门实操教程,也就是使用SpringBoot该怎么对接和使用kafka。当然,在一开始我们也会比较细致的介绍一下kafka本身。那么话不多说,马上开始今天的学习吧

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待

一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流进行实时处理,例如计算每个玩家的分数、监控游戏区域内的异常情况、统计玩家在线时长等等。这样,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才进行操作。
类似的,流处理还可以应用在其他实时性要求比较高的场景中,例如金融交易、物联网、实时监测等。通过对数据流进行实时处理,我们可以更加精准地掌握数据变化的情况,并及时做出反应和调整,

二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以使用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

在这里插入图片描述

此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

在这里插入图片描述

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者

package com.zhanfu.kafkademo.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("test_topic", message);}
}

这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。


5. 创建Kafka消费者

在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。

package com.zhanfu.kafkademo.listener;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaLis {@KafkaListener(topics = "test_topic", groupId = "test_group")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}

6. 应用程序入口

现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。

package com.zhanfu.kafkademo.controller;import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {@Autowiredprivate KafkaService kafkaService;@GetMapping("/send/{message}")public String sendMessage(@PathVariable String message) {kafkaService.sendMessage(message);return "Message sent successfully";}
}

在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:

在这里插入图片描述

三、启动与验证

首先自然是启动 Kafka ,怎么启动可参考 《上手第一关,手把手教你安装kafka与可视化工具kafka-eagle》,然后是启动我们的Spring Boot项目

在这里插入图片描述

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

在这里插入图片描述

最后检查我们的项目日志:

在这里插入图片描述

可以看到,整个发送和接收的流程都走通了

四、KafkaTemplate 介绍

不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。

它有三个主要属性:

  • producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
  • defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。
  • messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord

它的主要方法:

  • send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包含了主题名称、分区编号、Key 和 Value 等信息。
  • send(String topic, V data):向指定的 Kafka 主题发送一条消息。
  • send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
  • execute(ProducerCallback<K,V> callback):使用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
  • inTransaction():启用事务,多个 send 方法调用将被包装在一个事务中,保证 Kafka 事务的原子性。

除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:

在这里插入图片描述
在这里插入图片描述

如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClassConditionalOnMissingBean 应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内

总结

今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/539746.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql 主从延迟分析

一、如何分析主从延迟 分析主从延迟一般会采集以下三类信息。 从库服务器的负载情况 为什么要首先查看服务器的负载情况呢&#xff1f;因为软件层面的所有操作都需要系统资源来支撑。 常见的系统资源有四类&#xff1a;CPU、内存、IO、网络。对于主从延迟&#xff0c;一般会…

Gitlab光速发起Merge Request

前言 在我们日常开发过程中需要经常使用到Merge Request&#xff0c;在使用过程中我们需要来回在开发工具和UI界面之前来回切换&#xff0c;十分麻烦。那有没有一种办法可以时间直接开发开工具中直接发起Merge Request呢&#xff1f; 答案是有的。 使用 Git 命令方式创建 Me…

3dmax导入模型渲染过亮---模大狮模型网

在3ds Max中导入模型后渲染过亮可能是由于以下原因导致的&#xff1a; 材质和贴图设置&#xff1a; 检查导入的模型的材质和贴图设置&#xff0c;确保它们没有过度亮度或反射。调整材质的Diffuse(漫反射)颜色和Specular(高光)属性&#xff0c;以使渲染看起来更加自然。 光源设…

操作系统——中断

目录 前置知识 ​编辑 基本概念 1.中断特点 2.PSW&#xff08;程序状态字&#xff0c;Program statement word&#xff09; 中断的作用 中断的类型 中断嵌套、中断优先级、中断屏蔽 中断响应过程 前置知识 内核程序 &#xff1a;内核是操作系统的核心部分&#xff…

封装、继承、多态

1.封装 1.1 概念 举个例子解释,我们使用计算机时,并不关心内部核心部件,而是只需要知道如何开机,如何通过键盘鼠标与计算机进行交互即可.因此厂家在出厂计算机时,在外部套上壳子将内部实现细节隐藏起来,仅仅对外提供开机等,让用户可以与计算机进行交互即可. 封装: 将数据和…

14.WEB渗透测试--Kali Linux(二)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;13.WEB渗透测试--Kali Linux&#xff08;一&#xff09;-CSDN博客 netcat简介内容:13.WE…

log4cplus在Qt linux中的应用与问题解决

log4cplus在Qt linux中的应用与问题解决 背景log4cplus下载遇到问题&#xff1a;libm.so.6:undefined reference to __strtof128_nanGLIBC_PRIVATE‘解决方案编译生成在Qt工程里面添加对应依赖编译运行成功 背景 最近工作中需要用到log4cplus的日志做一些记录&#xff0c;用了…

R语言深度学习-3-过拟合问题(无监督正则化/Lasso回归/岭回归/集成和平均算法)

本教程参考《RDeepLearningEssential》 我们从上一个教程看到&#xff0c;我们看到在我们训练迭代或者训练更大神经网络的时候&#xff0c;往往会产生过拟合&#xff0c;而且越来越严重&#xff0c;它可能会把训练它的数据拟合的很好&#xff0c;但是未必能把新数据做的很好。…

通过路由器监控,优化网络效率

路由器是网络的基本连接组件&#xff0c;路由器监控涉及将路由器网络作为一个整体进行管理&#xff0c;其中持续监控路由器的性能、运行状况、安全性和可用性&#xff0c;以确保更好的操作和最短的停机时间&#xff0c;因此监控路由器至关重要。 为什么路由器监控对组织很重要…

vue3动态组件未渲染问题

渲染问题 component动态组件写法与vue2写法一致&#xff0c;代码如下&#xff1a; <component :is"componentName"/><script setup>import { ref } from vueimport account from ./user/account.vue// 组件名称const componentName ref(account)// 点击…

JVM探究

JVM探究 请你谈谈你对JVM的理解&#xff1f;java -> class -> jvm java 8虚拟机和之前的变化更新OOM 内存溢出。栈溢出 StackOverFlowError > 怎么分析JVM的常用调优参数 &#xff1f; 扩大内存内存快照如何抓取&#xff0c;怎么分析Dump文件&#xff1f;知道吗&…

用Stable Diffusion生成同角色不同pose的人脸

随着技术的不断发展&#xff0c;我们现在可以使用稳定扩散技术&#xff08;Stable Diffusion&#xff09;来生成同一角色但不同姿势的人脸图片。本文将介绍这一方法的具体步骤&#xff0c;以及如何通过合理的提示语和模型选择来生成出更加真实和多样化的人脸图像。 博客首发地…