07、Kafka ------ 消息生产者(演示 发送消息) 和 消息消费者(演示 监听消息)

目录

  • Kafka --- 消息生产者
    • ★ 消息
    • ★ 消息的分发机制
    • ★ 分发到哪个分区
    • ★ 轮询策略(round-robin)
    • ★ 使用命令行工具发送消息
      • 演示添加消息
  • Kafka --- 消息消费者
    • ★ 消息消费者命令
    • ▲ 监听 【指定主题】 的所有消息:
    • ▲ 监听 【指定主题、指定分区】的所有消息:
    • ▲ 监听【指定主题、指定分区】的【新】消息:
    • ▲ 监听【指定主题、指定分区的、指定下标及之后】的所有消息:
    • kafka灵活之处:

Kafka — 消息生产者


★ 消息

简单来说,就是一个数据项。

▲ 消息就是 Kafka 所记录的数据节点,消息在 Kafka 中又被称为记录(record)或事件(event)。

从存储上来看,消息就是存储在分区文件(有点类似于List)中的一个数据项,消息具有 key、value、时间戳 和 可选的元数据头。

▲ 下面是一个示例事件:
key: “fkjava”
value: “publish a new Book”
timestamp: “Feb. 15, 2021 at 2:06 p.m.”


★ 消息的分发机制

【注意:】当程序向主题发送消息时,该消息会被立即分给该主题下某一个领导者分区。

消息生产者向消息主题发送消息,这些消息将会立即分发给该主题下某一个分区(此处说的都是领导者分区)来保存

主题下的每条消息只会保存在一个领导者分区中,而不会在多个领导者分区中保存多份

消息实际上是存在分区中的,往主题发送消息只是一种逻辑说法。
当生产者发送一条消息到一个主题的时候,实际上这个消息马上就会被直接分发到对应的某一个领导者分区当中。

非领导者分区只是领导者分区的后备,也就是备份而已,当领导者分区挂掉的时候,非领导者分区就有可能成为领导者分区。

但是真正能够直接与客户进行交互的,就是直接接收用户的数据,或者让用户来消费数据的只能是领导者分区。

在这里插入图片描述


★ 分发到哪个分区

当生产者发送一条消息时,它会按如下规则来决定该消息被分发到哪个分区:

优先级:

最优先:(1)如果在发送消息时指定了分区,则消息分发到指定的分区。

(2)如果发送消息时没有指定分区,但消息的key不为空,则基于key的hashCode来选择一个分区。

此处暗示了:在一段时间内:同一个key的多条消息,通常会被分发到同一个分区

最次:(3)如果既没有指定分区,且消息的key也是空,则用轮询策略(round-robin)来选择一个分区。


★ 轮询策略(round-robin)

轮询策略(round-robin)就是按顺序来分发消息,

比如下面一个主题有P0、P1、P2三个分区,

那么第一条消息被分发到P0分区,
第二条消息被分发到P1分区,
第三条消息被分发到P2分区,

以此类推,第四条消息又被分发到P0分区。
在这里插入图片描述


★ 使用命令行工具发送消息

Kafka提供了kafka-console-producer.bat(.sh)工具来发送消息,例如执行如下命令:

下面命令发送带key的消息:

kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2 ^
--property parse.key=true

上面命令指定向test2这个主题发送消息,并通过“parse.key=true”指定发送消息时会解析消息的key,

默认解析规则为:制表符(Tab键)之前的是key,制表符(Tab键)之后的是value。

如果不指定“parse.key=true”属性,则默认不解析消息的key,也就是发送不带key的消息。

下面命令发送不带key的消息:

kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2


演示添加消息

因为演示过程中,我弄的3个节点老是只有一个能存活,所以把 kafka和zookeeper的存储数据的文件夹都删除了,重新启动,就好了。方便演示。

删除Kaka和zookeeper的存储数据的文件夹,重新启动

现在3个节点就能正常存活了。
在这里插入图片描述

接下来继续演示:
添加 不带key 和带key的消息,演示生产者发送消息:

解释:
1、演示生产者发送消息,消息发送到主题 test2 那里
2、发送没有带 key 的消息,kafka 采用的是轮询的策略,把消息存放到不同的分区里面;如图,test2 主题有4个分区(属于领导者分区),那么这8条消息就会轮询的发送到这4个分区里面
3、发送带key的消息,如图,key 是 ljh,kafka就会计算 ljh 这个key的hashcode 值,然后存放都某一个分区里面,因为key都是一样的,所以这几个key为ljh的消息都会被发到同一个分区里面。
但是具体发送到哪个分区,是无法指定的。
4、发送带key的消息,如图,key 和 value 之间要间隔一个 Tab 键,不要弄成空格键。

命令在上面

在这里插入图片描述

我自己再添加不同key的消息,可以看出新添加的两条消息,是存放在0分区的。

在这里插入图片描述



Kafka — 消息消费者

★ 消息消费者命令

消费者用于从消息主题读取消息,Kafka提供了kafka-console-consumer.bat工具命令从指定主题、甚至指定分区读取消息。
该工具支持如下常用选项:

 --bootstrap-server:指定要连接的Kafka主机和端口。--from-beginning:指定从开始读取消息。--group:指定组ID。--offset <String: consume offset>:指定从特定下标开始读取消息,比如将该选项设为1,表明从第2条消息开始读取;该选项还支持earliest和latest两个字符串值,其中earliest表示从最开始处读取(类似于--from-beginning选项的作用),latest表示从最新处开始读取、即不读取之前的消息,latest是默认值。--partition <Integer: partition>:指定哪个分区。--property:用于指定一些额外属性,比如 print.timestamp=true 指定要输出时间戳,print.key=true表示输出消息key,print.offset=true表示打印消息的下标,print.partition=true表示打印分区信息。--topic:指定哪个主题。 


▲ 监听 【指定主题】 的所有消息:

这个监听命令,运行后是一直存在的,会一直监听,有新消息会马上监听出来的。

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

可以看到消息都成功存放在分区里面。
可以看到指定主题 test2 下的所有消息
但是目前还演示出轮询存放,先不理。

在这里插入图片描述



▲ 监听 【指定主题、指定分区】的所有消息:

看看分区2下的所有消息

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--partition 2^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

在这里插入图片描述

看看分区3下的所有消息

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--partition 3^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

分区3没有消息,所以没有任何显示:正确
在这里插入图片描述




查看分区0的消息:
有两条,正确
在这里插入图片描述


注意点:
–from-beginning:指定从开始读取消息。
添加这个命令,就是一直都会读取到从一开始就添加的消息,这样演示不够真实,因为我们消息消费之后,正常情况下我们不需要再查出来,所以可以用offset这个命令:
–offset <String: consume offset>:指定从特定下标开始读取消息。
下面就来演示这个offset命令:

 --offset <String: consume offset>:指定从特定下标开始读取消息,比如将该选项设为1,表明从第2条消息开始读取;该选项还支持earliest和latest两个字符串值,其中earliest表示从最开始处读取(类似于--from-beginning选项的作用),latest表示从最新处开始读取、即不读取之前的消息,latest是默认值。

▲ 监听【指定主题、指定分区】的【新】消息:

(模拟传统的ActiveMQ、RabbitMQ的消息模型):

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--offset latest ^--partition 0^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

演示 latest 从最新处开始读取、即不读取之前的消息,latest是默认值。

如图:现在从最新开始出监听消息,为了演示能监听最新消息,我们再打开一个命令行小黑窗,来往这个 0 分区发送消息。

因为上面key 为 l 的消息是发送到 0 分区,所以接下来发送的消息key也设置为l

在这里插入图片描述

如图:生产者刚发送消息,消费者这边马上就监听到主题为test2,分区为0 的最新的消息。

在这里插入图片描述



▲ 监听【指定主题、指定分区的、指定下标及之后】的所有消息:

这个 --offset 2 ^ 指定下标,查出来的消息是包括索引下标2 这条消息的。


先查出主题test2,分区0的所有消息,用来对比:

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--partition 0^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

在这里插入图片描述


再查出 【主题test2,分区0,指定索引下标为2及之后】 的所有消息,用来对比:

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--offset 2 ^--partition 0^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

上面有4条消息,所以2及之后的消息,应该有2条,为索引2 nnnnnnnnnn,索引3 mmmmmmmm

在这里插入图片描述



kafka灵活之处:

灵活之处, --offset latest ^ 这个设置,默认就是latest ,就是从最新处开始读取、不读取之前的消息,始终读取最新的消息,以前用过的消息就不会管了。
因为 kafka 内部有一个偏移主题来存储每一个分区里面的消息及消息曾经被读取到哪一条(哪个位置)。

更灵活之处:
我们可以通过 --offset 加上指定的索引下标,非常灵活的读取我们想要读取的哪个位置的消息。
从上面的消息监听可以看出,消息是一直保存在分区当中的,意味着消息被消费之后,并没有立即从分区中被删除,还可以被重复的使用,这就是kafka非常灵活的地方。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/336324.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能化输电线路定位技术:提升电网运行效率的未来发展方向

随着科技的不断发展&#xff0c;电力行业也在逐步引入智能化技术&#xff0c;以提高输电线路的运行效率和安全性。在这篇文章中&#xff0c;恒峰智慧科技将探讨一种新的输电线路定位技术——分布式行波测量技术&#xff0c;它如何帮助我们实现这一目标。 一、分布式故障定位及隐…

Burp Suite拦截信息或返回数据中文乱码,解决不显示中文

1.点击User options->Display。 2.点击change font&#xff0c;选择宋体。Charater sets选择UTF-8&#xff08;如果还是乱码&#xff0c;看一下请求或响应字符集 charserxxx&#xff09;。

oracle角色管理

常用角色 CONNECT,RESOURCE,DBA,EXP_FULL_DATABASE,IMP_FULL_DATABASE 1角色可以自定义&#xff0c;语法与创建用户一样 CREATE role role1 IDENTIFIED by 123; 2授权权限给角色 --自定义角色 CREATE role role1 IDENTIFIED by 123; --授权权限给角色 GRANT create view, …

106短信群发如何发送会员通知

106短信群发可以通过以下步骤发送会员通知&#xff1a; 1.确定通知内容&#xff1a;首先需要确定要发送的通知内容&#xff0c;包括通知的主题、内容、时间、地点等信息。 2.设计通知模板&#xff1a;根据通知内容&#xff0c;设计通知模板。模板应包括短信的格式、语言风格、…

基于Jackson封装的JSON、Properties、XML、YAML 相互转换的通用方法

文章目录 一、概述二、思路三、实现四、测试 一、概述 我们在 yaml转换成JSON、MAP、Properties 通过引入 实现了JSON、Properties、XML、YAML文件的相互转换&#xff0c;具体封装的类、方法如下&#xff1a; 上面的实现&#xff0c;定义了多个类、多个方法&#xff0c;使用…

第2章 监督学习

目录 1. 分类与回归2. 泛化、过拟合与欠拟合模型复杂度与数据集大小的关系 3. 监督学习算法3.1 一些样本数据集3.2 k近邻3.2.1 k近邻分类&#xff08;sklearn.neighbors.KNeighborsClassifier&#xff09;3.2.2 分析 KNeighborsClassifier3.3.3 k近邻回归&#xff08;sklearn.n…

如何用 AI 给科研提速?超长对话记忆 Kimi Chat 体验

我尝试了网络文章总结、学术文献分析&#xff0c;以及长文翻译。 最近经常听小伙伴提及 Kimi Chat 这款国产大语言模型&#xff0c;据说它在上下文记忆能力很有特色。于是我打开官网&#xff08;https://kimi.moonshot.cn/&#xff09;进行了尝试&#xff0c;今天就来给你分享一…

2024年第九届机器学习技术国际会议(ICMLT 2024) 即将召开

2024年第九届机器学习技术国际会议&#xff08;ICMLT 2024&#xff09;将于2024年5月24-26日在挪威奥斯陆举行。ICMLT 2024旨在讨论机器学习技术领域的最新研究技术现状和前沿趋势&#xff0c;为来自世界各地的科学家、工程师、实业家、学者和其他专业人士提供一个互动和交流的…

springboot程序启动慢解决

记springboot程序启动慢解决。 今天将程序发给别人后&#xff0c;别人立马说你这个启动很慢。 查看程序启动耗时分布 <!--启动耗时监测--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator…

【idea】idea插件编写教程,博主原创idea插件 欢迎下载

前言&#xff1a;经常使用Objects.equals(a,b)方法的同学 应该或多或少都会因为粗心而传错参&#xff0c; 例如日常开发中 我们使用Objects.equals去比较 status(入参)&#xff0c;statusEnum(枚举), 很容易忘记statusEnum.getCode() 或 statusEnum.getVaule() &#xff0c;再比…

亚马逊,速卖通,美客多卖家怎么才能安全及有效的积累产品的评论

测评补单对于亚马逊、速卖通等平台卖家来说&#xff0c;是一种重要的运营手段之一&#xff0c;通过测评补单快速增加产品的销量、评论数量&#xff0c;提升排名&#xff0c;从而打造爆款产品。 测评养号的好处包括&#xff1a; 1. 提升店铺信誉&#xff0c;制造爆款&#xff…

Armv8-R AArch32 architecture概念学习

提示 该博客主要为个人学习&#xff0c;通过阅读官网手册整理而来&#xff08;个人觉得阅读官网的英文文档非常有助于理解各个IP特性&#xff09;。若有不对之处请参考参考文档&#xff0c;以官网文档为准。阅读该文章&#xff0c;可以先查看AArch64 Exception Model学习&…