在Go项目中二次封装Kafka客户端功能

1.摘要

在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。

在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafka Go客户端库, 该开源库地址为:GitHub - IBM/sarama: Sarama is a Go library for Apache Kafka.。

2.功能结构组织

为了能在项目中快速使用, 我在项目目录中专门新建了一个名为kafka的文件夹,在该文件夹下新建了四个文件,分别为:

kafka (目录)|----- consumer.go   (消费者方法实现)|----- producer.go   (生产者方法实现)|----- kafka.go      (定义接口)|----- kafka_test.go (单元功能测试)

为方便项目使用,在此基础上做了二次封装。

3.消费者实现

第一步首先定义了一个结构体, 里面包含了Kafka的主机、topic、接收通道和消费者对象信息:

type KafkaConsumer struct {Hosts    string          // Kafka主机IP:端口,例如:192.168.201.206:9092Ctopic   string          // topic名称Kchan    chan string     // 接收信息通道Consumer sarama.Consumer // 消费者对象
}

接下来是消费者初始化函数:

func (k *KafkaConsumer) kafkaInit() {// 定义配置选项 config := sarama.NewConfig()config.Consumer.Return.Errors = trueconfig.Version = sarama.V0_10_2_0// 初始化一个消费对象consumer, err := sarama.NewConsumer(k.Hosts, config)if err != nil {err = errors.New("NewConsumer错误,原因:" + err.Error())fmt.Println(err.Error())return}// 获取所有Topictopics, err := consumer.Topics()if err != nil {fmt.Println(err.Error())return}// 判断是否有自定义的Topicvar topicsName = ""for _, e := range topics {if e == k.Ctopic {topicsName = ebreak}}// 没有自定义的Topic则报错if topicsName == "" {err = errors.New("找不到topics内容")fmt.Println(err.Error())return}// 将消费对象保存到结构体以备后面使用k.Consumer = consumer
}

在上面的初始化函数中, 首先初始化一个消费对象, 然后获取所有的Topic名称,并判断了在这些Topic名称中是否有我自定义的名称,获取成功后则将消费对象保存到我们绑定的结构体中。

接下来是消费监控函数实现,代码如下:

func (k *KafkaConsumer) kafkaProcess() {var wg sync.WaitGroup// 遍历指定Topic分区持续监控消息Partitions, _ := k.Consumer.Partitions(k.Ctopic)for _, subPartitions := range Partitions {pc, err := k.Consumer.ConsumePartition(k.Ctopic, subPartitions, sarama.OffsetNewest)if err != nil {continue}wg.Add(1)go func() {defer wg.Done()// 这里进入另一个函数可以过滤消息内容k.processPartition(pc)}()}wg.Wait()
}

函数processPartition()的实现代码如下:

func (k *KafkaConsumer) processPartition(pc sarama.PartitionConsumer) {defer pc.AsyncClose()for msg := range pc.Messages() {// 这里可以过滤不需要的Topic的信息if strings.Contains(string(msg.Value), "group_state2") {continue}// 这里将获取到的Topic信息发送到通道k.Kchan <- string(msg.Value)}
}

4.生产者实现

为了跟消费者代码配套,这里也同步实现了生产者代码,主要功能是完成工作后,给指定Topic的生产方返回一个指定消息。

定义生产者的结构体如下:

type KafkaProducer struct {hosts         string               // Kafka主机sendmsg       string               // 消费方返回给生产方的消息ptopic        string               // TopicAsyncProducer sarama.AsyncProducer // Kafka生产者接口对象
}

对应的生产者初始化函数实现如下:

func (k *KafkaProducer) kafkaInit() {// 定义配置参数config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 5config.Producer.Return.Successes = trueconfig.Version = sarama.V0_10_2_0// 初始化一个生产者对象producer, err := sarama.NewAsyncProducer(k.hosts, config)if err != nil {err = errors.New("NewAsyncProducer错误,原因:" + err.Error())fmt.Println(err.Error())return}// 保存对象到结构体k.AsyncProducer = producer
}

给生产者回复信息的函数实现如下:

func (k *KafkaProducer) kafkaProcess() {msg := &sarama.ProducerMessage{Topic: k.ptopic,}// 信息编码msg.Value = sarama.ByteEncoder(k.sendmsg)// 将信息发送给通道k.AsyncProducer.Input() <- msg
}

5.接口定义实现

首先对于生产者和消费者,都有对应的初始化和执行操作,因此定义接口函数如下:

// Kafka方法接口
type IKafkaMethod interface {kafkaInit()     // 初始化方法kafkaProcess()  // 执行方法
}

为了方便管理接口的赋值操作, 这里定义了一个接口管理方法, 并用Set()函数进行接口类型赋值, Run()函数负责运行对应的成员函数:

// 接口管理结构体
type KafkaManager struct {kafkaMethod IKafkaMethod  // 接口对象
}// 定义实现Set方法
func (km *KafkaManager) Set(m IKafkaMethod) {km.kafkaMethod = m  // 将指定的方法赋给接口
}// 定义实现Run方法
func (km *KafkaManager) Run() {km.kafkaMethod.kafkaInit()go km.kafkaMethod.kafkaProcess()
}

最后一部分是供外部调用的函数,首先定义一个结构体,该结构体中保存了Kafka的基础信息和三个对象指针:

type KafkaMessager struct {KafkaManager  *KafkaManager   // 接口管理对象指针KafkaProducer *KafkaProducer  // 生产者对象指针KafkaConsumer *KafkaConsumer  // 消费者对象指针Hosts         string          // Kafka主机topic         string          // topic
}// 供外部调用初始化的函数,传入Kafka主机IP和Topic,返回操作对象指针,并初始化结构体成员变量
func NewKafkaMessager(hosts, topic string) *KafkaMessager {km := &KafkaMessager{KafkaManager:  new(KafkaManager),KafkaProducer: new(KafkaProducer),KafkaConsumer: new(KafkaConsumer),Hosts:         hosts,topic:         topic,}return km
}

6.功能调用和验证

在Kafka_test.go文件中,定义一个用于单元测试的函数,格式如下:

func TestKafka(t *testing.T) {....
}

使用单元测试函数的好处是可以单独调试, 专注核心功能本身。

我使用的编辑器是Goland, 在TestKafka函数前面有个三角形小图标,点击可以选择各种调试选项,如图:

下面是我模拟用户调用的客户端代码片段:

// 这里选择我自己搭建的Kafka所在服务器,Topic为test123
// 注意:这里的hosts格式是IP:端口的格式,例如:192.168.201.206:9092
hosts := "192.168.201.206:9092"
topic := "test123"// 调用初始化函数,并将上面的内容作为参数传进去
nkm := NewKafkaMessager(hosts, topic)// 初始化消费者,当生产者发出消息,消费者自动消费
nkm.KafkaConsumer.Hosts = hosts             // 消费者host赋值
nkm.KafkaConsumer.Ctopic = topic            // 消费者topic赋值
nkm.KafkaConsumer.Kchan = make(chan string) // 初始化消息通道
nkm.KafkaManager.Set(nkm.KafkaConsumer)     // 接口赋值,设置成操作消费者方法
nkm.KafkaManager.Run()                  // 执行消费者初始化方法// 监听通道,接收生产客户端发过来的消息
recv := <- nkm.KafkaConsumer.Kchan
fmt.Println(recv)  // 打印接收到的消息

现在我们可以选择直接运行程序了,然后在Kafka的生产者控制台中输入字符:Hello,Goland发送:

可以看到,我们的程序成功接收到Kafka生产者发送过来的信息。

--- END --

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/151910.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MySQL】并发事务产生的问题及事务隔离级别

先来复习一下事务的四大特性&#xff1a; 原子性&#xff08;Atomicity&#xff09;&#xff1a;事务是不可分割的最小操作单位&#xff0c;事务中的所有操作要么全部执行成功&#xff0c;要么全部失败回滚&#xff0c;不能只执行其中一部分操作。一致性&#xff08;Consisten…

排序算法-堆积树排序法(HeapSort)

目录 排序算法-堆积树排序法&#xff08;HeapSort&#xff09; 1、说明 2、算法分析 3、C代码 排序算法-堆积树排序法&#xff08;HeapSort&#xff09; 1、说明 堆积树排序法是选择排序法的改进版&#xff0c;可以减少在选择排序法中的比较次数&#xff0c;进而减少排序…

hdlbits系列verilog解答(模块按位置)-21

文章目录 一、问题描述二、verilog源码三、仿真结果 一、问题描述 此问题类似于上一个&#xff08;模块&#xff09;。您将获得一个名为的 mod_a 模块&#xff0c;该模块按此顺序具有 2 个输出和 4 个输入。您必须按位置将 6 个端口按该顺序连接到顶级模块的端口 out1 、 out2…

pycharm 2023.2.3设置conda虚拟环境

分两步&#xff1a; &#xff08;1&#xff09;设置Virtualenv Environment &#xff08;2&#xff09;设值Conda Executable 加载conda环境&#xff0c;然后选择conda环境

仓库管理系统源代码集合,带图片展示和网站演示

目录 1、ModernWMS2、GreaterWMS3、kopSoftWMS4、SwebWMS5、若依wms6、jeewms 1、ModernWMS 体验地址&#xff1a;https://wmsonline.ikeyly.com 简易完整的仓库管理系统 该库存管理系统是&#xff0c;我们从多年ERP系统研发中总结出来的一套针对小型物流仓储供应链流程。 简…

JavaScript基础知识18——逻辑运算符之短路运算

哈喽&#xff0c;大家好&#xff0c;我是雷工。 本节学习JavaScript基础知识——逻辑运算符中的短路运算&#xff0c;以下为学习笔记。 规则&#xff1a; 1、如果是&&运算&#xff0c;只要遇到false&#xff0c;就立即短路&#xff0c;不会再执行了&#xff0c;直接返回…

GPT的广泛应用会对互联网公司造成挑战吗?——探讨GPT在实际使用中的应用和影响

文章目录 前言GPT 技术的背景和发展历程GPT 技术对互联网行业的影响GPT 技术在互联网行业中的应用GPT 技术对于用户隐私和数据安全的威胁GPT 技术对于人类工作岗位的影响加强 AI 伦理和监管加强 AI 安全性和隐私保护推动 AI 创新和发展&#xff0c;避免过度依赖 AIGPT 技术是一…

Android12之#pragma clang diagnostic ignored总结(一百六十八)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

基于物联网云平台的分布式光伏监控系统的设计与实现

贾丽丽 安科瑞电气股份有限公司 上海嘉定 201801 摘要&#xff1a;针对国内光伏发电监控系统的研究现状&#xff0c;文中提出了基于云平台的光伏发电监控体系。构建基于B/S架构的数据实时采集与推送&#xff0c;以SSH(strutsspringhibernate)作为Web开发框架&#xff0c;开发基…

windows PC virtualBox 配置

效果&#xff1a; oracle vitualbox 可以访问通PC主机&#xff0c;可以访问外网: 注意&#xff0c;如果docker0网络地址&#xff0c;和PC主机的网络地址冲突了&#xff0c;需要变更docker的网络地址&#xff1a; root/home/mysqlPcap/anti-tamper $ cat /etc/docker/daemon.js…

docker 中给命令起别名

docker 的有些命令特别复杂&#xff0c;我们可以给它设置别名简化输入&#xff0c;就不用每次都输入那么多了&#xff01;&#xff01;&#xff01; 1. 进入 .bashrc 中修改配置&#xff08; .bashrc 是root下的隐藏文件&#xff09; cd /rootvim .bashrc2. 在 .bashrc 中加入…

iOS iGameGuardian修改器检测方案

一直以来&#xff0c;iOS 系统的安全性、稳定性都是其与安卓竞争的主力卖点。这要归功于 iOS 系统独特的闭源生态&#xff0c;应用软件上架会经过严格审核与测试。所以&#xff0c;iOS端的作弊手段&#xff0c;总是在尝试绕过 App Store 的审查。 常见的 iOS 游戏作弊&#xf…