实例:NodeJS 操作 Kafka

本人是C#出身的程序员,c#很简单就能实现,有需要的可以加我私聊。但是就目前流行的开发语言,尤其是面向web方向应用的,我感觉就是Nodejs最简单了。下面介绍:

本文将会介绍在windows环境下启动Kafka,并通过nodejs作为客户端,生产并消费消息。

步骤一,Kafka需要java运行时,先安装配置java环境。通过在命令行中输入java -version确认java是否成功安装(可能需要查看windows的环境变量中是否有java)。

步骤二,Kafka官网下载最新版本的压缩包(.tgz格式),并解压。分别在两个命令行里面启动zookeeper、kafka(解压缩路径下)

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

bin\windows\kafka-server-start.bat config\server.properties

说明一下zookeeper和kafka的关系:zookeeper是集群的调度者,kafka才是消息队列。 zookeeper的默认端口:2181,kafka的默认端口:9092
相关配置可以在config文件下的server.properties和zookeeper.properties中找到

用记事本打开就可以编辑

建立data,logs,kafka-logs目录,用于日志,备用。

消费者客户端需要的group.id可以在config->consumer.properties中找到。

步骤三,使用DOS的CMD管理员命令行方式测试生产者生产、消费者消费。
//创建一个topic:test
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

//查看topic
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

//创建生产者主题mytest
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test-nodetopic

//创建消费者消费mytest
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-nodetopic --from-beginning

步骤四,生产者发送消息
在生产者窗口,随意输入一条消息,可以在消费者窗口看到该消息。

最后,使用nodejs访问kafka  首先安装kafkajs

初始化项目

npm init -y

没有安装kafkajs的,在这里可以安装,互联网在线安装。
npm install kafkajs

新建demo2023.js,输入以下代码

const { Kafka } = require('kafkajs')

const kafka = new Kafka({

  clientId: 'my-app',

  brokers: ['localhost:9092']

})

const producer = kafka.producer()

const consumer = kafka.consumer({ groupId: 'test-consumer-group' })

const run = async () => {

  // Producing

  await producer.connect()

  await producer.send({

    topic: 'test-nodetopic',

    messages: [

      { value: ' Hello KafkaJS user,I am producer ! ' },

    ],

  })

  // Consuming

  await consumer.connect()

  await consumer.subscribe({ topic: 'test-nodetopic', fromBeginning: true })

  await consumer.run({

    eachMessage: async ({ topic, partition, message }) => {

      console.log({

        partition,

        offset: message.offset,

        value: message.value.toString(),

      })

    },

  })

}

run().catch(console.error)

最后执行命令
node demo2023.js

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/322092.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Parallels虚拟机启动后,Mac主机无法上网怎么办

文章目录 1.问题2.解决: 1.问题 部分用户在运行Parallels Desktop的Windows 11打开后,Windows上网没有问题 ,但是Mac主机不能访问带域名的网站,而访问带ip的网站没问题,退出parallels虚拟机以后,mac网络恢…

SparkStreaming基础解析(四)

1、 Spark Streaming概述 1.1 Spark Streaming是什么 Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、…

解决在test以外的目录下导入junit无效

以上引用来自src目录下的文件,可以看到,和junit有关的导入都飘红,但明明junit已经被正确导入进了项目中。 再看右侧的Maven的依赖下方,junit的右边有一个很不起眼的(test) 这是因为junit作为测试框架,可能包含仅适用于…

LLM Agent之再谈RAG的召回信息密度和质量

话接上文的召回多样性优化,多路索引的召回方案可以提供更多的潜在候选内容。但候选越多,如何对这些内容进行筛选和排序就变得更加重要。这一章我们唠唠召回的信息密度和质量。同样参考经典搜索和推荐框架,这一章对应排序重排环节,…

ASP.NET Core路由中间件[1]: 终结点与URL的映射

一、路由注册 我们演示的这个ASP.NET Core应用是一个简易版的天气预报站点。如果用户希望获取某个城市在未来N天之内的天气信息,他可以直接利用浏览器发送一个GET请求并将对应城市(采用电话区号表示)和天数设置在URL中。如下图所示&#xff…

Mybatis一级缓存

文章目录 Mybatis一级缓存原理一级缓存特点命中原则生命周期源码解读设计理念Spring集成 Mybatis一级缓存原理 一级缓存特点 自动启用 通过在setting中设置localCacheScope STATEMENT(默认为SESSION)全局禁用一级缓存 在Dao接口方法上添加注解&#xff…

【GO语言卵细胞级别教程】01.GO基础知识

01.GO基础知识 目录 01.GO基础知识1.GO语言的发展历程2.发展历程3.Windowns安装4.VSCode配置5.基础语法5.1 第一段代码5.2 GO执行的流程5.3 语法规则5.4 代码风格5.5 学习网址 1.GO语言的发展历程 Go语言是谷歌公司于2007年开始开发的一种编程语言,由Robert Griese…

Spring之强大的DefaultListableBeanFactory

系列文章目录 如何查看类继承结构参考这里 文章目录 系列文章目录一、DefaultListableBeanFactory的类继承实现结构二、实现接口 一、DefaultListableBeanFactory的类继承实现结构 二、实现接口 AliasRegistry:支持别名功能,一个名字可以对应多个别名B…

Java Arrays.copyOfRange的用法

Arrays.copyOfRange的使用方法: 将一个数组拷贝至另一个数组中 参数: original:第一个参数为要拷贝的数组对象 from:第二个参数为拷贝的开始位置(包含) to:第三个参数为拷贝的结束位置&#x…

数字孪生与边缘计算的结合

数字孪生与边缘计算的结合可以在物理实体附近进行实时数据处理和决策,从而提高响应速度、降低延迟,并有效地利用边缘资源。以下是数字孪生在边缘计算中的一些应用,希望对大家有所帮助。北京木奇移动技术有限公司,专业的软件外包开…

在pycharm中执行 os.makedirs 提示用户名或密码不正确

问题:在pycharm中运行脚本,在 \10.0.21.249\share 共享目录下创建目录提示错误 发现:手动在该目录下创建目录没有问题。 解决方法: 切换到cmd 命令行运行该脚本成功创建 猜测:感觉应该是pycharm中使用的用户名和密码存…

CF1909_C. Heavy Intervals题解

CF1909_C. Heavy Intervals题解 题目传送门(Problem - C - CodeforcesCodeforces. Programming competitions and contests, programming communityhttps://codeforces.com/contest/1909/problem/C)。 题目翻译如下:(图片来源&a…