【Java】SpringBoot快速整合Kafka

目录

1.什么是Kafka?

主要特点和概念:

主要组成部分:

2.Kafka可以用来做什么?

3.SpringBoot整合Kafka步骤:

1. 添加依赖:

2. 配置 Kafka:

3. 创建 Kafka 生产者:

4. 创建 Kafka 消费者:

5. 发布消息:

6. 使用Postman进行测试:


如果你没有Kafka,可以参考这篇文章进行安装【Docker】手把手教你使用Docker搭建kafka【详细教程】_docker 安装kafka-CSDN博客

1.什么是Kafka?

        Kafka是一个开源的流式平台,用于构建实时数据流应用程序和实时数据管道。Kafka旨在处理大规模的数据流,具有高吞吐量、可扩展性、持久性和容错性的特点。

主要特点和概念:

  1. 发布-订阅模型: Kafka采用发布-订阅模型,数据生产者将消息发布到一个或多个主题(topics),而数据消费者则订阅这些主题以接收消息。

  2. 分布式架构: Kafka是一个分布式系统,允许横向扩展,通过分布式存储和分区机制来实现高吞吐量和可扩展性。

  3. 持久性存储: Kafka使用持久性存储来保留消息,可以在消息发送后保留一定的时间,确保消费者可以在需要时检索历史消息。

  4. 数据分区: 主题被划分为多个分区,每个分区可以在不同的服务器上,以实现并行处理和提高性能。

  5. 流式处理: Kafka提供了流处理功能,允许应用程序实时处理和分析数据流,执行复杂的事件处理操作。

  6. 高可用性: Kafka在集群中的多个节点之间复制数据,提高了系统的容错性和可用性。

  7. 数据保证: Kafka提供了不同级别的数据传递保证,包括至多一次、至少一次和精确一次语义。

  8. 生态系统: Kafka生态系统丰富,包括连接器(Connectors)、Kafka Streams、MirrorMaker等组件,用于与各种外部系统集成和实现各种应用场景。

主要组成部分:

  • Producer(生产者): 负责向Kafka主题发布消息。

  • Broker(代理): Kafka集群中的服务器,负责存储和管理消息。

  • Consumer(消费者): 订阅并处理Kafka主题中的消息。

  • Topic(主题): 消息的类别或标签,生产者将消息发布到主题,而消费者从主题订阅消息。

  • Partition(分区): 主题可以划分为多个分区,每个分区独立存储和处理消息。

2.Kafka可以用来做什么?

  1. 消息队列:

    场景: 在电子商务平台上,订单服务产生订单消息,并将其发布到Kafka主题。支付服务、物流服务等通过订阅相应主题,异步处理订单信息,实现订单处理的解耦和异步通信。

  2. 实时数据流处理:

    场景: 在在线广告平台上,使用Kafka Streams处理实时产生的广告点击数据。可以实时计算点击率、过滤无效点击、将数据与用户信息连接,以实现实时广告效果分析。

  3. 日志收集与分析:

    场景: 在一个大规模的云服务中,使用Kafka收集分布在不同服务器上的应用程序日志。日志分析服务通过消费Kafka主题,实时分析日志以监控系统性能、检测异常和进行故障排除。

  4. 事件溯源(Event Sourcing):

    场景: 在金融领域的交易系统中,使用Kafka追踪交易事件。每笔交易引发一个事件,将其发布到Kafka主题,以便在需要时进行审计、回溯和重新处理。

  5. 数据同步:

    场景: 在企业的分布式系统中,使用Kafka同步用户信息。用户服务在用户数据变更时将事件发布到Kafka主题,其他服务通过消费主题以保持用户数据同步。

  6. 消息广播:

    场景: 在社交媒体应用中,使用Kafka将用户发布的状态更新广播给其关注者。关注者通过订阅用户状态的Kafka主题,实现实时消息广播。

  7. 分布式应用解耦:

    场景: 在电子商务微服务架构中,购物车服务、订单服务、支付服务等通过Kafka进行异步通信。例如,购物车服务可以通过Kafka发布购物车更新的事件,订单服务通过订阅事件来处理相关订单逻辑。

  8. 大数据集成:

    场景: 在一个大数据处理流水线中,使用Kafka将产生的数据传输到Spark进行实时分析。生产者将数据发布到Kafka主题,而Spark应用程序通过订阅主题来接收实时数据。

  9. 实时推荐系统:

    场景: 在在线视频平台上,使用Kafka收集用户观看记录。推荐引擎通过消费Kafka主题,实时更新用户的个性化推荐列表,提高用户体验。

  10. 异步通信:

    场景: 在电商平台中,使用Kafka实现异步订单处理。当订单支付成功时,订单服务通过Kafka发布订单处理完成的消息,而邮件服务通过订阅该主题来异步发送订单确认邮件。

下面就使用SpringBoot整合kafka的发布订阅机制,实现消息的发布和订阅。

3.SpringBoot整合Kafka步骤:

1. 添加依赖:

确保在你的pom.xml文件中包含了Spring Boot和Spring Kafka的依赖。

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Kafka Starter --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>

2. 配置 Kafka:

在application.properties或application.yml中配置 Kafka 连接信息。

spring:kafka:bootstrap-servers: your-kafka-server:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3. 创建 Kafka 生产者:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaMessageProducer {private static final String TOPIC = "admin-messages";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendAdminMessage(String message) {kafkaTemplate.send(TOPIC, message);}
}

4. 创建 Kafka 消费者:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaMessageConsumer {@KafkaListener(topics = "admin-messages", groupId = "user-group")public void receiveAdminMessage(String message) {System.out.println("Received message: " + message);// ...}
}

5. 发布消息:

在管理员需要发布消息的地方调用KafkaMessageProducer的 sendAdminMessage 方法。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/message")
public class AdminController {@Autowiredprivate KafkaMessageProducer kafkaMessageProducer;@GetMapping("/publish")public void publishAdminMessage(@RequestParam("messagemessage") String message) {kafkaMessageProducer.sendAdminMessage(message);}
}

        当调用 publishAdminMessage方法时,所有监听 admin-messages 主题的用户将会接收到相应的消息。

6. 使用Postman进行测试:

控制台输出结果:

这样就使用SpringBoot整合了Kafka并写了一个简单的案例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/299267.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker-compose 安装Sonar并集成gitlab

文章目录 1. 前置条件2. 编写docker-compose-sonar.yml文件3. 集成 gitlab4. Sonar Login with GitLab 1. 前置条件 安装docker-compose 安装docker 创建容器运行的特有网络 创建挂载目录 2. 编写docker-compose-sonar.yml文件 version: "3" services:sonar-postgre…

MSSQL执行查询报错“使用 UNION、INTERSECT 或 EXCEPT 运算符合并的所有查询必须在其目标列表中有相同数目的表达式。”

文章目录 MSSQL执行查询报错“使用 UNION、INTERSECT 或 EXCEPT 运算符合并的所有查询必须在其目标列表中有相同数目的表达式。”报错截图根本原因 MSSQL执行查询报错“使用 UNION、INTERSECT 或 EXCEPT 运算符合并的所有查询必须在其目标列表中有相同数目的表达式。” 报错截…

白龙地铁消费项目(地铁消费系统,包括用户端、管理端)

大一学的C#可视化项目文件&#xff0c;所有功能均可使用。可以直接下载 下方是演示照片

【Midjourney】登录和使用:详细指南!

关于Midjourney 链接: Midjourney Midjourney 是一项使用人工智能自动生成图像的服务。你可以通过向人工智能发出称为“提示”的指令来生成你想要的图像。例如&#xff0c;可以生成风景、人物、建筑等各种图像&#xff0c;还可以调整图像风格&#xff08;绘画、插画风格、摄影风…

【汇编先导】-- 2

汇编先导 6. 寄存器 存储数据&#xff1a;CPU > 内存 > 硬盘(固态、机械) CPU还可分为&#xff1a; 32位CPU 8 16 32 64位CPU 8 16 32 64(增加了寻址能力) 通用寄存器 # 32位的通用寄存器只有8个 # 可以在任意软件的底层看到 # 通用寄存器可以存储任何值存值的范围…

逻辑运算加法器

前言 逻辑门本质上操作的是单个二进制数&#xff0c;通过高低电压或者有无信号来表示&#xff0c;并且&#xff0c;因为二进制数的原因&#xff0c;一个数字&#xff0c;我们可以通过二进制数来表示&#xff0c;整数可以精确表示&#xff0c;浮点数可以近似表示 本篇文章使用逻…

二叉树题目:分裂二叉树的最大乘积

文章目录 题目标题和出处难度题目描述要求示例数据范围 解法思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;分裂二叉树的最大乘积 出处&#xff1a;1339. 分裂二叉树的最大乘积 难度 6 级 题目描述 要求 给定二叉树的根结点 root \texttt{root} root&…

CentOS 7 用户必看SQLite 升级指南:轻松将旧版 3.7.17 升级至3.41.2详细教程

0.背景 编写此文是因为在 Linux 上跑项目时报错&#xff1a; sqlite3.NotSupportedError: deterministicTrue requires SQLite 3.8.3 or highe&#xff08;此时已经安装了 python3&#xff09;。sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) near "(&q…

路径规划最全综述+代码+可视化绘图(Dijkstra算法+A*算法+RRT算法等)

路径规划综述 1. 背景介绍 路径规划是指在给定的环境中找到从起点到终点的最佳路径的过程。它在现实生活中有着广泛的应用&#xff0c;包括无人驾驶、物流配送、机器人导航等领域。随着人工智能和计算机技术的发展&#xff0c;路径规划技术也在不断地得到改进和应用。 路径规划…

【文本处理】正则表达式

一、简介 正则表达式&#xff0c;又称规则表达式,&#xff08;Regular Expression&#xff0c;在代码中常简写为regex、regexp或RE&#xff09;&#xff0c;是一种文本模式&#xff0c;包括普通字符&#xff08;例如&#xff0c;a 到 z 之间的字母&#xff09;和特殊字符&…

2024年软考电子商务设计师如何备考?考什么?

一、电子商务设计师概述&#xff1f; 电子商务设计师属于软考中级资格考试&#xff0c;软考是由国家人力资源和社会保障部&#xff08;原人事部&#xff09;、工业和信息化部&#xff08;原信息产业部&#xff09;领导的国家级考试&#xff0c;其目的是&#xff0c;科学、公正…

Spring 依赖注入概述、使用以及原理解析

前言 源码在我github的guide-spring仓库中&#xff0c;可以克隆下来 直接执行。 我们本文主要来介绍依赖注入的使用示例及其原理 依赖注入 什么是依赖注入 依赖注入&#xff08;Dependency Injection&#xff0c;简称DI&#xff09;是一种设计模式&#xff0c;它用于实现对…