Kafka学习-Java使用Kafka

文章目录

  • 前言
  • 一、Kafka
    • 1、什么是消息队列
      • offset
    • 2、高性能
      • topic
      • partition
    • 3、高扩展
      • broker
    • 4、高可用
      • replicas、leader、follower
    • 5、持久化和过期策略
    • 6、消费者组
    • 7、Zookeeper
    • 8、架构图
  • 二、安装Zookeeper
  • 三、安装Kafka
  • 四、Java中使用Kafka
    • 1、引入依赖
    • 2、生产者
    • 3、消费者
    • 4、运行效果


前言

Kafka消息中间件

一、Kafka

1、什么是消息队列

假设我们有两个服务:生产者A每秒能生产200个消息,消费者B每秒能消费100个消息。

在这里插入图片描述

那么B服务是处理不了A这么多消息的,那么怎么使B不被压垮的同时还能处理A的消息呢,我们引入一个中间件,即Kafka。(当然着并不能使消费者的处理速度上升)

在这里插入图片描述

offset

那么我们可以在B服务中加入一个队列,也就是一个链表,链表的每个节点相当于一条消息,每个节点有一个序号即offset,记录消息的位置。

在这里插入图片描述

在这里插入图片描述

但是这样也会有个问题,还没有处理的消息是存储在内存中的,如果B服务挂掉,那么消息也就丢失了。
所以我们可以把队列移出,变成一个单独的进程,即使B服务挂掉,消息也不会丢失。

在这里插入图片描述

2、高性能

B服务由于性能差,队列中未处理的消息会越来越多,我们可以增加更多的消费者来处理消息,相对的也可以增加更多的生产者来生成消息。

在这里插入图片描述

topic

但是,生产者与消费者会争抢同一个队列,没有抢到就要等待,那么怎么解决呢?
我们可以将消息进行分类,每一类消息是一个topic,生产者按消息的类型投递到不同的topic中,消费者也按照不同的topic进行消费。

在这里插入图片描述

partition

但是单个topic的消息还是有可能过多,我们可以将单个队列拆分,每段是一个partition分区,每个消费者负责一个partition

在这里插入图片描述

3、高扩展

broker

随着partition过多,所有的partition都在同一个机器上,就可能会导致单机的cpu和内存过高,影响性能,那么我们可以使用多台机器,将partition分散部署在不同的机器上。每台机器就代表一个broker
我们可以增加broker来缓解服务器的cpu过高的性能问题。

在这里插入图片描述

4、高可用

replicas、leader、follower

假如某个broker挂了, 那么其中partition中的消息也就都丢失了,那么这个问题怎么解决呢?
我们可以给partition多加几个副本,统称replicas,并将它们分为leaderfollower
leader负责生产者和消费者的读写,follower只负责同步leader的数据。假如leader挂了,也不会影响follower,随后在follower中选出一个leader,保证消息队列的高可用。

在这里插入图片描述

5、持久化和过期策略

在上面讲述了leader挂掉的情况,如果所有的broker都挂了,消息不就都丢失了?
为了解决这个问题,就不能只把数据存在内存中,也要存在磁盘中。
但是如果所有消息一直保存在磁盘中,那磁盘也会被占满,所以引入保留策略。

6、消费者组

如果我想在原有的基础上增加一个消费者,那么它只能跟着最新的offset接着消费,如果我想从某个offset开始消费呢?
我们引入消费者组,实现不同消费者维护自己的消费进度。

在这里插入图片描述

7、Zookeeper

上面介绍了很多的组件,每个组件都有自己的状态信息,那么就需要有一个组件去统一维护这些组件的状态信息,于是引入了Zookeeper组件,它会定期与broker通信,获取Kafka集群的状态,判断哪些broker挂了,消费者组消费到哪了等等。

8、架构图

在这里插入图片描述

二、安装Zookeeper

1、官网地址

https://zookeeper.apache.org/

2、下载

在这里插入图片描述

选择稳定版本下载

在这里插入图片描述

3、解压,修改配置文件

解压后,复制 zoo_sample.cfg,重命名为 zoo.cfg

在这里插入图片描述

修改数据文件目录位置

在这里插入图片描述

4、启动

我们是在windows系统下安装的,运行 bin 目录下的 zkServer.cmd

在这里插入图片描述

三、安装Kafka

1、官网地址

https://kafka.apache.org/

2、下载

在这里插入图片描述

3、解压,修改配置文件

修改 config 目录下 server.properties 文件
修改日志文件位置,其他参数(如zookeeper端口,根据需要修改)

在这里插入图片描述

4、启动

bin\windows\kafka-server-start.bat config\server.properties

在这里插入图片描述

四、Java中使用Kafka

1、引入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId>
</dependency>

2、生产者

public static void main(String[] args) throws InterruptedException {Properties prop = new Properties();prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.ACKS_CONFIG, "all");prop.put(ProducerConfig.RETRIES_CONFIG, 0);prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);String topic = "hello";KafkaProducer<String, String> producer = new KafkaProducer<>(prop);for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka" + i));System.out.println("生产消息:" + i);Thread.sleep(1000);}producer.close();
}

3、消费者

public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.GROUP_ID_CONFIG, "con-1");    // 消费者组prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);    //自动提交偏移量prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);     //自动提交时间KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);ArrayList<String> topics = new ArrayList<>();//可以订阅多个消息topics.add("hello");consumer.subscribe(topics);try {while(true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(10));for (TopicPartition topicPartition : poll.partitions()) {//	通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息List<ConsumerRecord<String, String>> partitionRecords = poll.records(topicPartition);//	获取TopicPartition对应的主题名称String topic = topicPartition.topic();//	获取TopicPartition对应的分区位置int partition = topicPartition.partition();//	获取当前TopicPartition下的消息条数int size = partitionRecords.size();System.out.printf("--- 获取topic: %s, 分区位置:%s, 消息总数: %s%n",topic,partition,size);for(int i = 0; i < size; i++) {ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);//	实际的数据内容String key = consumerRecord.key();//	实际的数据内容String value = consumerRecord.value();//	当前获取的消息偏移量long offset = consumerRecord.offset();//	表示下一次从什么位置(offset)拉取消息long commitOffser = offset + 1;System.out.printf("消费消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s%n",key, value, offset, commitOffser);Thread.sleep(1500);}}}} catch (Exception e) {e.printStackTrace();} finally {consumer.close();}
}

4、运行效果

生产消息

在这里插入图片描述

消费消息

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/703871.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenAI 震撼发布:GPT-4o免费,实时语音视频交互开启新纪元

OpenAI 震撼发布&#xff1a;GPT-4o免费&#xff0c;实时语音视频交互开启新纪元 在仅仅问世17个月后&#xff0c;OpenAI 研制出了仿佛科幻片中登场的超级人工智能——GPT-4o&#xff0c;而且所有人都可以完全免费使用&#xff0c;让这个科技界的巨浪让人震撼无比&#xff01;…

【VUE】VUE3绘制箭头组件

效果预览&#xff1a; 长、宽、粗细等等根据情况合理调整即可。 组件&#xff1a; <template><div class"line" :style"props.arrowsColor"></div> </template><script setup> import { defineProps, ref, onMounted } fr…

代码随想录Day29

39.组合总和 题目&#xff1a;39. 组合总和 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a;可以重复选取&#xff0c;这个要怎么回溯&#xff1f;而且是无限制的重复。每次回溯&#xff0c;都还是遍历当前的数组&#xff0c;记住数组中的最小值&#xff0c;一旦发现…

低成本、功能强大!德思特提供一体化WiFi 6E信道测试方案!

​ 作者介绍 一、方案介绍 伴随WiFi 6E与WiFi 7的提出&#xff0c;WIFI划分出一个全新的5.925GHz-7.125GHz 之间的80MHz和160MHz频段。1200MHz的带宽是迄今为止最宽的&#xff0c;是之前2.4GHz和5GHz WiFi 频段可用带宽的数倍。此外WiFi 6E引入了以下技术&#xff1a; ● 多…

Google I/O 2024:探索未来AI技术的无限可能

近日&#xff0c;Google I/O 2024大会圆满落幕&#xff0c;带给我们一场关于人工智能的盛宴。在这场大会上&#xff0c;Google推出了一系列令人激动的AI新功能和工具&#xff0c;让我们得以一窥未来的科技发展。今天&#xff0c;就让我来为大家总结一下这些亮点吧&#xff01; …

实验室无法培养的菌,原来可以这么研究!

厌氧氨氧化&#xff08;anammox&#xff09;细菌在全球氮循环和废水氮去除中发挥着至关重要的作用&#xff0c;由于anammox细菌生长缓慢、难以培养等特点&#xff0c;对其生态学和生物学特性知之甚少。近日&#xff0c;凌恩生物合作客户重庆大学陈猷鹏教授团队在《Science of t…

802.1X认证,梦回网吧的年代。

1、802.1x的原理 &#xff08;1&#xff09;802.1x的产生原因 802.1X协议作为局域网接口的一个普通接入控制机制在以太网中被广泛应用&#xff0c;主要解决以太网内认证和安全方面的问题。802.1X协议是一种基于接口的网络接入控制协议。“基于接口的网络接入控制”是指&#…

LearnOpenGL(十八)之面剔除

一、面剔除 对于一个3D立方体&#xff0c;无论我们从哪个方向&#xff0c;最多只能同时看到3个面。如果我们能够以某种方式丢弃另外几个看不见的面&#xff0c;我们就能省下超过50%的片段着色器执行数&#xff01; 这正是面剔除(Face Culling)所做的。OpenGL能够检查所有面向…

揭秘未来工厂核心:智慧大屏引领可视化管理新潮流

在数字化浪潮席卷全球的今天&#xff0c;智慧工厂已不再是科幻小说中的概念&#xff0c;而是成为了现代工业发展的新引擎。 智慧工厂可视化大屏&#xff0c;不仅仅是一块显示屏&#xff0c;更是工厂运行的“大脑”。通过这块屏幕&#xff0c;我们可以实时掌握工厂的每一个角落、…

根据参考风格进行矢量图绘制

摘要 利用机器学习根据给定的文本描述生成图像的技术已经取得了显著的进步&#xff0c;例如CLIP图像-文本编码器模型的发布&#xff1b;然而&#xff0c;当前的方法缺乏对生成图像风格的艺术控制。我们提出了一种方法&#xff0c;用于为给定的文本描述生成指定风格的绘图&…

华焰天下晋升质量管理三大体系和产品3C认证实力级

华焰天下&#xff0c;作为业界领先的新能源灶具企业&#xff0c;一直以来都致力于追求卓越的质量管理和产品创新。近日&#xff0c;华焰天下成功晋升为质量管理三大体系先进管理&#xff0c;并成功获得了产品3C认证&#xff0c;这标志着我们在质量管理和产品安全方面迈出了坚实…

【文献阅读】李井林等2021ESG促企业绩效的机制研究——基于企业创新的视角

ESG促进企业绩效的机制 摘要 0.引言与文献综述 1.理论分析与研究假设 1.1企业ESG表现与企业绩效 假设1a&#xff1a;企业的环境表现对企业绩效存在正向影响效应。 假设1b&#xff1a;企业的社会表现对企业绩效存在正向影响效应。 假设1c&#xff1a;企业的公司治理表现对企业…