基于Windows环境的Kafka搭建与.NET实战开发案例

news/2025/1/1 8:17:52/文章来源:https://www.cnblogs.com/sq1201/p/18638230

前言:基于Windows环境下的Kafka搭建(scal+zookeeper+Kafka+可视化工具)、以及使用.NET6.0进行简单的生产者与消费者的演示

一、环境部署
Kafka是使用Java语言和Scala语言开发的,所以需要有对应的Java环境,以及Scala语言环境。

【JAVA环境配置】
安装java jdk环境,下载地址:https://www.oracle.com/java/technologies/downloads/#java21
下载完成以后运行,直接默认都是下一步进行安装

安装完成以后,打开CMD,输入java和javac,如果能够弹出一堆操作提示符,就代表安装成功

接下来就是配置Java环境变量了。Jdk11以上的版本,默认没有Jre文件夹,需要手动通过Jlink来生成有关的Jre目录。
通过Jlink来创建Jre文件目录方法:使用管理员权限,打开windows powershell命令提示符,然后定位到java安装目录下。
然后使用命令:.\bin\jlink.exe --module-path jmods --add-modules java.desktop --output jre
执行以后,就可以生成jre文件夹了

然后是配置环境变量,右键计算机图标 -> 属性 -> 高级系统设置 ->环境变量,新建一个系统变量,叫 JAVA_HOME,变量值是安装的路径,例如我的安装地址是如下图所示的地址。

再新建一个CLASSPATH,变量值为:
.;%JAVA_HOME%\bin;%JAVA_HOME%\lib;%JAVA_HOME%\jre\lib

注意:该变量值开头是句点加分号 .; 这个不能漏掉了。
最后,在Ptah系统变量里面,新增两个变量值:
%JAVA_HOME%\bin
%JAVA_HOME%\jre\bin

命令行执行 java -version查看是否安装成功

出现了安装的java jdk版本就是安装成功

【Scala环境安装】
Scala环境安装,需要先下载Scala语言包,下载地址:https://www.scala-lang.org/download/all.html
要选择Binaries版本的环境,否则需要自己编译
二进制版本的下载现在都是在github,下拉到页面最底部

下载完后解压

新建一个系统变量

新建Path系统变量

创建完后,控制台窗口输入:scala -version
如果提示类似如下有关版本信息,则代表安装成功。

【安装Zookeeper环境】
Kafka基于Zookeeper环境运行,zookeeper提供给kafka一系列的功能支持,所以还需要安装Zookeeper有关的环境。
下载地址:https://zookeeper.apache.org/releases.html#download

同样,Zookeeper也需要下载带bin 的链接,没有带bin的链接,可能是源码,需要自己编译

先解压下载的包,然后在解压后的目录下新增data文件夹

然后复制data文件夹的绝对路径,备用。在conf文件夹下,编辑cfg文件

在cfg文件内,修改dataDir指定为上面新建的data文件夹的绝对路径。注意路径是斜杠/,如果要使用 \ 反斜杆,需要写双反斜杠 \

也要更改cfg格式的文件名称为 zoo.cfg 否则zookeeper无法识别配置文件。Zoo.cfg文件是zookeeper启动时候自动关联的默认配置文件名称。

然后新建环境变量 ZOOKEEPER_HOME

新建Path环境变量

启动zookeeper,直接任意打开控制台,输入 zkServer

如果都没有报错,一般是启动成功了的。再次验证下,可以任意开个控制台,输入JPS进行查看,如下图所示,有JPS、也有QuorumPeerMain,代表zookeeper启动成功了。

注意使用kafka的时候不要关闭该窗口!

【Kafka环境】
下载地址:https://kafka.apache.org/downloads.html
注意:下载Kafka的时候尽量选择低版本,因为高版本的Kafka对windows环境不太友好,可能会启动失败,这里我下载的是3.8版本,同样需要选择下载binary版本,然后根据scala的版本选择对应的版本

先解压,然后在解压后的目录下,新增logs文件夹

然后在Config文件夹下,修改 server.properties 文件,修改 log.dirs 的值为 新增的logs文件夹的绝对路径

进入到解压后的kafka目录下,在路径栏输入cmd,快速打开当前文件夹下的控制台窗口:

输入命令:
.\bin\windows\kafka-server-start.bat .\config\server.properties
进行启动Kafka服务

没有提示错误,根据提示信息,代表是启动成功了。任意打开控制台,再输入JPS查看下,可以看到Kafka,确认是启动OK了。

注意:可能会报错输入行太长。 命令语法不正确,就是目录层级太深,或者文件夹太长

【可视化工具】
然后是要一款Kafka可视化工具,此处我选择使用offset explorer (原来是叫kafka tools,如下载地址所示)
下载地址:https://www.kafkatool.com/download.html

二、代码开发与测试
新建类库项目,当做Kafka服务类库

此处选择标准库2.1,用于可以给多种.net core版本使用,方便兼容

引用 Confluent.Kafka 包。

新增 发布服务类 和 订阅服务类以及对应的接口

点击查看代码
namespace Ysq.Kafka.Service
{public interface IPublishService{Task PublishAsync<TMessage>(string broker, string topicName, TMessage message) where TMessage : class;}
}
点击查看代码
/// <summary>
/// Kafka生产者发布服务
/// </summary>
public class PublishServcie : IPublishService
{public async Task PublishAsync<TMessage>(string broker,string topicName,TMessage message) where TMessage : class {var config = new ProducerConfig{BootstrapServers = broker,//Kafka服务集群,例如"192.168.0.1:9092,192.168.0.2:9092" 或者单机"192.168.0.1:9092"Acks = Acks.All,//生产者会等待 所有同步副本 都确认接收到消息之后,才认为消息发送成功//消息写入过程:生产者将消息发送给 Topic 的分区 Leader。//Leader 副本将消息写入自身的日志。//同步副本(ISR)从 Leader 副本拉取消息并写入自己的日志。//Leader 收到所有同步副本的确认(即消息被复制成功)后,向生产者返回成功的响应。MessageSendMaxRetries = 3,//发送失败重试的次数};using (var producer=new ProducerBuilder<string,string>(config).Build()){try{string data=JsonConvert.SerializeObject(message);var sendData = new Message<string, string> { Key = Guid.NewGuid().ToString("N"),Value=data };var report=await producer.ProduceAsync(topicName, sendData);Console.WriteLine($"消息>>>>>:{data}\r\n发送到:{report.TopicPartitionOffset}");}catch (ProduceException<string,string> ex){Console.WriteLine($"消息发送失败>>>>>:\r\n Code= {ex.Error.Code} >>> \r\nError= {ex.Message}");}}}
}
点击查看代码
namespace Ysq.Kafka.Service
{public interface ISubscribeService{Task SubscribeAsync<TMessage>(ConsumerConfig config, IEnumerable<string> topics, Action<TMessage> func, CancellationToken cancellationToken) where TMessage : class;}
}
点击查看代码
namespace Ysq.Kafka.Service
{/// <summary>/// Kafka消费者订阅服务/// </summary>public class SubscribeService : ISubscribeService{/// <summary>/// 消费者服务核心代码/// </summary>/// <typeparam name="TMessage"></typeparam>/// <param name="config">消费者配置信息</param>/// <param name="topics">主题集合</param>/// <param name="func"></param>/// <param name="cancellationToken"></param>/// <returns></returns>public async Task SubscribeAsync<TMessage>(ConsumerConfig config, IEnumerable<string> topics, Action<TMessage> func, CancellationToken cancellationToken) where TMessage : class{const int commitPeriod = 1;using (var consumer = new ConsumerBuilder<Ignore, string>(config)//Ignore: 表示 Kafka 消息的键被忽略。string: 表示 Kafka 消息的值是字符串。//注册一个错误处理回调,当消费过程中出现错误时会调用.SetErrorHandler((_, e) =>{Console.WriteLine($"消费错误 >>>>>: {e.Reason}");})// 注册一个统计信息处理回调,用于捕获 Kafka 消费者的统计信息(JSON 格式)。.SetStatisticsHandler((_, json) =>{Console.WriteLine($"************************************************");})//分区分配事件处理器。当消费者分配到新的分区时触发.SetPartitionsAssignedHandler((c, partitionList) =>{string partitions = string.Join(", ", partitionList);Console.WriteLine($"分配的分区 >>>>> : {partitions}");})//分区回收事件处理器。当消费者失去分区分配时触发.SetPartitionsRevokedHandler((c, partitionList) =>{string partitions = string.Join(", ", partitionList);Console.WriteLine($"回收的分区 >>>>> : {partitions}");}).Build())// 构建消费者实例{consumer.Subscribe(topics);//订阅主题try{while (true)//通过一个无限循环,不断消费消息{try{var consumeResult = consumer.Consume(cancellationToken);//拉取一条消息。如果有消息到达,consumeResult 包含消息内容和元数据(如分区、偏移量)。如果没有消息到达(或者超时),可能抛出 ConsumeException。if (consumeResult.IsPartitionEOF){continue;//当到达分区末尾时(没有新消息可消费),跳过当前循环。}if (consumeResult?.Offset % commitPeriod == 0){try{var result = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message?.Value);func(result); // 消费消息}catch (Exception ex){Console.WriteLine($"消费业务处理失败: {ex.Message}");}try{consumer.Commit(consumeResult);  // 手动提交Console.WriteLine($"消费者消费完成,已提交 ");}catch (KafkaException e){Console.WriteLine($"提交错误 >>>>> : {e.Error.Reason}");}}}catch (ConsumeException e){Console.WriteLine($"消费错误>>>>> : {e.Error.Reason}");}}}catch (Exception e){Console.WriteLine($"其他错误 >>>>> :{e.Message}");consumer.Close();}}await Task.CompletedTask;}}
}

新建一个控制台项目,用来当作消费者端的测试,并且新增一个方法,用来当作消费者接收到消息以后的业务处理方法体。此处控制台环境版本为.NET 6

消费客户端代码如下。其中,BootstrapServers也可以提供集群地址,例如 ip1:port,ip2:port…… 服务之间以半角逗号隔开。

再新增一个webapi项目,用来当作生产者的客户端进行发送数据。以及对kafka服务类部分进行依赖注入注册,此处使用单例。该webapi此处使用.NET 6环境,带有控制器的模式。
新增的控制器里面,进行生产者的注入与实现。注意:topicName参数对应上边的topic-wesky,通过主题绑定,否则消费者不认识就没办法消费到了。

测试


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/860628.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何处理服务器上的同步数据异步存储问题?

问题: 在多台服务器之间进行数据同步时,遇到了异步存储的问题,导致日志文件无法正确保存到目标服务器。请问如何处理服务器上的同步数据异步存储问题? 答案: 在分布式系统或多台服务器环境中,数据同步和异步存储是关键环节之一。当遇到日志文件无法正确保存到目标服务器的…

如何解决网站后台无法正常操作的问题?

当用户尝试登录网站后台时,发现后台频繁闪退,无法正常进行操作。这种情况可能会影响日常管理和维护工作。请问如何解决网站后台无法正常操作的问题? 答案: 网站后台频繁闪退且无法正常操作是一个常见但令人头疼的问题,它可能由多种原因引起,包括但不限于服务器配置、程序…

面向对象程序设计第七、八次作业总结

前言 这篇博客是完成《面向对象程序设计》(java)课程的中三次PTA作业后的总结。主要内容有:家居强电电路模拟程序 - 3 家居强电电路模拟程序 - 4题目分析 家居强电电路模拟程序 - 3 分析 这是第三次的家居强电电路模拟程序,这次的强度变高了,题目有点难。 设备包括:控制设…

服务器重装系统后无法使用初始密码进行远程连接,应如何解决?

当您遇到服务器重装系统后无法使用初始密码进行远程连接的问题时,这通常意味着在重装过程中某些配置没有正确设置或存在其他潜在问题。以下是详细的排查和解决方案:确认密码正确性:首先确保您使用的确实是服务器提供的最新初始密码。有时服务商会在邮件或其他通知中提供新的…

如何解决宝塔面板登录密码忘记的问题?

您好,当您忘记了宝塔面板的登录密码时,可以按照以下步骤进行重置和处理:确认初始密码:如果您从未修改过宝塔面板的登录密码,默认情况下,宝塔面板的初始密码与服务器的初始密码相同。您可以尝试使用服务器的初始密码进行登录。如果您不确定服务器的初始密码,可以在云服务…

如何解决网站无法访问的问题?

您好,当您遇到网站无法访问的问题时,可以按照以下步骤进行排查和解决,确保网站能够正常运行:确认域名解析:首先,检查域名是否正确解析到服务器的IP地址。您可以通过在线DNS查询工具(如(网址))来验证域名解析是否正确。如果解析有问题,请联系域名注册商进行修正。检查服…

如何取消IP直接访问网站?

您好,当您希望取消通过IP地址直接访问您的网站时,可以通过以下步骤进行设置,确保用户只能通过域名访问,而不能通过IP地址直接访问。以下是详细的解决方案:理解原理:默认情况下,Web服务器(如Nginx、Apache等)会监听所有请求,无论这些请求是通过域名还是IP地址发起的。…

如何解决数据库超标需要升级的问题?

您好,当您遇到数据库超出配额并需要升级时,可以按照以下步骤进行处理,确保数据库能够正常运行并满足业务需求:确认当前使用情况:首先,登录到云服务提供商的管理控制台,查看当前数据库的使用情况。了解具体的存储空间、连接数、查询次数等指标,以确定是否确实超出了配额…

软件工程个人总结

这个作业属于哪个课程 https://edu.cnblogs.com/campus/fzu/SE2024这个作业要求在哪里 https://edu.cnblogs.com/campus/fzu/SE2024/homework/13315这个作业的目标 总结软工实践整个过程学号 102202154一、学期回顾 1.1 回顾你对于软件工程课程的想象 软件工程,在我对这门课程…

free version GitHub Copilot All In One

free version GitHub Copilot All In One AI 编程编码助手free version GitHub Copilot All In OneAI 编程编码助手GitHub Copilot freeResponses are limited to 2,000 code completions and 50 chat messages per month. https://github.com/settings/copilot GitHub Copilot…

7~8次作业总结

一、前言家居强电电路模拟程序-3知识点:所考察的知识点与前两次任务相比区别不大,都是主要用到封装,继承和多态等面向对象的基础操作,所用类的结构也没有多大变化,主要是新增了互斥开关以及电路中可能存在多条并联电路,这要求我对电路的相应知识有更多的了解。 题量分析:…

网站被挂马,如何处理?

网站被挂马后,您希望能够尽快清理并防止再次发生。这类问题可能由以下几个原因引起:木马文件植入: 黑客可能通过漏洞植入了木马文件,导致网站被挂马。建议您立即删除所有可疑文件,并进行全面扫描。可以使用专业的杀毒软件或在线扫描工具(如Sucuri SiteCheck)检查网站文件…