前言:基于Windows环境下的Kafka搭建(scal+zookeeper+Kafka+可视化工具)、以及使用.NET6.0进行简单的生产者与消费者的演示
一、环境部署
Kafka是使用Java语言和Scala语言开发的,所以需要有对应的Java环境,以及Scala语言环境。
【JAVA环境配置】
安装java jdk环境,下载地址:https://www.oracle.com/java/technologies/downloads/#java21
下载完成以后运行,直接默认都是下一步进行安装
安装完成以后,打开CMD,输入java和javac,如果能够弹出一堆操作提示符,就代表安装成功
接下来就是配置Java环境变量了。Jdk11以上的版本,默认没有Jre文件夹,需要手动通过Jlink来生成有关的Jre目录。
通过Jlink来创建Jre文件目录方法:使用管理员权限,打开windows powershell命令提示符,然后定位到java安装目录下。
然后使用命令:.\bin\jlink.exe --module-path jmods --add-modules java.desktop --output jre
执行以后,就可以生成jre文件夹了
然后是配置环境变量,右键计算机图标 -> 属性 -> 高级系统设置 ->环境变量,新建一个系统变量,叫 JAVA_HOME,变量值是安装的路径,例如我的安装地址是如下图所示的地址。
再新建一个CLASSPATH,变量值为:
.;%JAVA_HOME%\bin;%JAVA_HOME%\lib;%JAVA_HOME%\jre\lib
注意:该变量值开头是句点加分号 .; 这个不能漏掉了。
最后,在Ptah系统变量里面,新增两个变量值:
%JAVA_HOME%\bin
%JAVA_HOME%\jre\bin
命令行执行 java -version查看是否安装成功
出现了安装的java jdk版本就是安装成功
【Scala环境安装】
Scala环境安装,需要先下载Scala语言包,下载地址:https://www.scala-lang.org/download/all.html
要选择Binaries版本的环境,否则需要自己编译
二进制版本的下载现在都是在github,下拉到页面最底部
下载完后解压
新建一个系统变量
新建Path系统变量
创建完后,控制台窗口输入:scala -version
如果提示类似如下有关版本信息,则代表安装成功。
【安装Zookeeper环境】
Kafka基于Zookeeper环境运行,zookeeper提供给kafka一系列的功能支持,所以还需要安装Zookeeper有关的环境。
下载地址:https://zookeeper.apache.org/releases.html#download
同样,Zookeeper也需要下载带bin 的链接,没有带bin的链接,可能是源码,需要自己编译
先解压下载的包,然后在解压后的目录下新增data文件夹
然后复制data文件夹的绝对路径,备用。在conf文件夹下,编辑cfg文件
在cfg文件内,修改dataDir指定为上面新建的data文件夹的绝对路径。注意路径是斜杠/,如果要使用 \ 反斜杆,需要写双反斜杠 \
也要更改cfg格式的文件名称为 zoo.cfg 否则zookeeper无法识别配置文件。Zoo.cfg文件是zookeeper启动时候自动关联的默认配置文件名称。
然后新建环境变量 ZOOKEEPER_HOME
新建Path环境变量
启动zookeeper,直接任意打开控制台,输入 zkServer
如果都没有报错,一般是启动成功了的。再次验证下,可以任意开个控制台,输入JPS进行查看,如下图所示,有JPS、也有QuorumPeerMain,代表zookeeper启动成功了。
注意使用kafka的时候不要关闭该窗口!
【Kafka环境】
下载地址:https://kafka.apache.org/downloads.html
注意:下载Kafka的时候尽量选择低版本,因为高版本的Kafka对windows环境不太友好,可能会启动失败,这里我下载的是3.8版本,同样需要选择下载binary版本,然后根据scala的版本选择对应的版本
先解压,然后在解压后的目录下,新增logs文件夹
然后在Config文件夹下,修改 server.properties 文件,修改 log.dirs 的值为 新增的logs文件夹的绝对路径
进入到解压后的kafka目录下,在路径栏输入cmd,快速打开当前文件夹下的控制台窗口:
输入命令:
.\bin\windows\kafka-server-start.bat .\config\server.properties
进行启动Kafka服务
没有提示错误,根据提示信息,代表是启动成功了。任意打开控制台,再输入JPS查看下,可以看到Kafka,确认是启动OK了。
注意:可能会报错输入行太长。 命令语法不正确,就是目录层级太深,或者文件夹太长
【可视化工具】
然后是要一款Kafka可视化工具,此处我选择使用offset explorer (原来是叫kafka tools,如下载地址所示)
下载地址:https://www.kafkatool.com/download.html
二、代码开发与测试
新建类库项目,当做Kafka服务类库
此处选择标准库2.1,用于可以给多种.net core版本使用,方便兼容
引用 Confluent.Kafka 包。
新增 发布服务类 和 订阅服务类以及对应的接口
点击查看代码
namespace Ysq.Kafka.Service
{public interface IPublishService{Task PublishAsync<TMessage>(string broker, string topicName, TMessage message) where TMessage : class;}
}
点击查看代码
/// <summary>
/// Kafka生产者发布服务
/// </summary>
public class PublishServcie : IPublishService
{public async Task PublishAsync<TMessage>(string broker,string topicName,TMessage message) where TMessage : class {var config = new ProducerConfig{BootstrapServers = broker,//Kafka服务集群,例如"192.168.0.1:9092,192.168.0.2:9092" 或者单机"192.168.0.1:9092"Acks = Acks.All,//生产者会等待 所有同步副本 都确认接收到消息之后,才认为消息发送成功//消息写入过程:生产者将消息发送给 Topic 的分区 Leader。//Leader 副本将消息写入自身的日志。//同步副本(ISR)从 Leader 副本拉取消息并写入自己的日志。//Leader 收到所有同步副本的确认(即消息被复制成功)后,向生产者返回成功的响应。MessageSendMaxRetries = 3,//发送失败重试的次数};using (var producer=new ProducerBuilder<string,string>(config).Build()){try{string data=JsonConvert.SerializeObject(message);var sendData = new Message<string, string> { Key = Guid.NewGuid().ToString("N"),Value=data };var report=await producer.ProduceAsync(topicName, sendData);Console.WriteLine($"消息>>>>>:{data}\r\n发送到:{report.TopicPartitionOffset}");}catch (ProduceException<string,string> ex){Console.WriteLine($"消息发送失败>>>>>:\r\n Code= {ex.Error.Code} >>> \r\nError= {ex.Message}");}}}
}
点击查看代码
namespace Ysq.Kafka.Service
{public interface ISubscribeService{Task SubscribeAsync<TMessage>(ConsumerConfig config, IEnumerable<string> topics, Action<TMessage> func, CancellationToken cancellationToken) where TMessage : class;}
}
点击查看代码
namespace Ysq.Kafka.Service
{/// <summary>/// Kafka消费者订阅服务/// </summary>public class SubscribeService : ISubscribeService{/// <summary>/// 消费者服务核心代码/// </summary>/// <typeparam name="TMessage"></typeparam>/// <param name="config">消费者配置信息</param>/// <param name="topics">主题集合</param>/// <param name="func"></param>/// <param name="cancellationToken"></param>/// <returns></returns>public async Task SubscribeAsync<TMessage>(ConsumerConfig config, IEnumerable<string> topics, Action<TMessage> func, CancellationToken cancellationToken) where TMessage : class{const int commitPeriod = 1;using (var consumer = new ConsumerBuilder<Ignore, string>(config)//Ignore: 表示 Kafka 消息的键被忽略。string: 表示 Kafka 消息的值是字符串。//注册一个错误处理回调,当消费过程中出现错误时会调用.SetErrorHandler((_, e) =>{Console.WriteLine($"消费错误 >>>>>: {e.Reason}");})// 注册一个统计信息处理回调,用于捕获 Kafka 消费者的统计信息(JSON 格式)。.SetStatisticsHandler((_, json) =>{Console.WriteLine($"************************************************");})//分区分配事件处理器。当消费者分配到新的分区时触发.SetPartitionsAssignedHandler((c, partitionList) =>{string partitions = string.Join(", ", partitionList);Console.WriteLine($"分配的分区 >>>>> : {partitions}");})//分区回收事件处理器。当消费者失去分区分配时触发.SetPartitionsRevokedHandler((c, partitionList) =>{string partitions = string.Join(", ", partitionList);Console.WriteLine($"回收的分区 >>>>> : {partitions}");}).Build())// 构建消费者实例{consumer.Subscribe(topics);//订阅主题try{while (true)//通过一个无限循环,不断消费消息{try{var consumeResult = consumer.Consume(cancellationToken);//拉取一条消息。如果有消息到达,consumeResult 包含消息内容和元数据(如分区、偏移量)。如果没有消息到达(或者超时),可能抛出 ConsumeException。if (consumeResult.IsPartitionEOF){continue;//当到达分区末尾时(没有新消息可消费),跳过当前循环。}if (consumeResult?.Offset % commitPeriod == 0){try{var result = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message?.Value);func(result); // 消费消息}catch (Exception ex){Console.WriteLine($"消费业务处理失败: {ex.Message}");}try{consumer.Commit(consumeResult); // 手动提交Console.WriteLine($"消费者消费完成,已提交 ");}catch (KafkaException e){Console.WriteLine($"提交错误 >>>>> : {e.Error.Reason}");}}}catch (ConsumeException e){Console.WriteLine($"消费错误>>>>> : {e.Error.Reason}");}}}catch (Exception e){Console.WriteLine($"其他错误 >>>>> :{e.Message}");consumer.Close();}}await Task.CompletedTask;}}
}
新建一个控制台项目,用来当作消费者端的测试,并且新增一个方法,用来当作消费者接收到消息以后的业务处理方法体。此处控制台环境版本为.NET 6
消费客户端代码如下。其中,BootstrapServers也可以提供集群地址,例如 ip1:port,ip2:port…… 服务之间以半角逗号隔开。
再新增一个webapi项目,用来当作生产者的客户端进行发送数据。以及对kafka服务类部分进行依赖注入注册,此处使用单例。该webapi此处使用.NET 6环境,带有控制器的模式。
新增的控制器里面,进行生产者的注入与实现。注意:topicName参数对应上边的topic-wesky,通过主题绑定,否则消费者不认识就没办法消费到了。
测试