kafka 集群 ZooKeeper 模式搭建

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序

Kafka 官网:Apache Kafka

关于ZooKeeper的弃用

根据 Kafka官网信息,随着Apache Kafka 3.5版本的发布,Zookeeper现已被标记为已弃用。未来计划在Apache Kafka(4.0版)的下一个主要版本中删除ZooKeeper,该版本最快将于2024年4月发布。在弃用阶段,ZooKeeper仍然支持用于Kafka集群元数据的管理,但不建议用于新的部署。新的部署方式使用 KRaft 模式,KRaft 模式部署可以看笔者的文章《kafka 集群 KRaft 模式搭建》,考虑到一些公司仍然在使用老版本的 Kafka,故笔者写这篇文章记录 Kafka 集群Zookeeper 模式搭建

官网信息截图

笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11

目录

1、官网下载 Kafka

2、配置 Kafka

3、启动 Kafka 集群

4、关闭 Kafka 集群

5、使用Kafka 可视化工具查看

6、测试Kafka集群


1、官网下载 Kafka

这里笔者下载最新版3.6.0

3.6.0 版本需要至少 java8 及以上版本,笔者使用的是 java8 版本

关于 linux 安装 java,没安装过的朋友可以参考《linux 系统安装 jdk》

下载完成

将 kafka分别上传到3台linux

在3台服务器上分别创建 kafka 安装目录

mkdir /usr/local/kafka

在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录

tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka

进入配置目录

cd /usr/local/kafka/kafka_2.13-3.6.0/config

编辑配置文件 server.properties

vi server.properties

配置 broker.id,advertised.listeners,zookeeper.connect

broker.id 每个节点的id

advertised.listeners 本机的外网访问地址

zookeeper.connect zookeeper 地址

192.168.3.232 节点配置

advertised.listeners 笔者配置为本机地址

192.168.2.90 节点

192.168.2.11 节点

笔者zookeeper 地址是 192.168.2.130:2181

zookeeper 版本是3.8.3

关于zookeeper单机安装和集群安装可以参考:《Linux环境 安装 zookeeper》《windows环境 安装 zookeeper》《linux 使用 nginx 搭建 zookeeper 集群》

3、启动 Kafka 集群

首先启动 zookeeper

然后在3台机器上依次启动 Kafka

进入 kafka 目录

cd /usr/local/kafka/kafka_2.13-3.6.0

下面2个命令皆可

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh -daemon config/server.properties

4、关闭 Kafka 集群

关闭命令

bin/kafka-server-stop.sh

在 3 个节点上分别执行关闭命令

5、使用Kafka 可视化工具查看

下载地址:https://www.kafkatool.com/download.html

运行效果

6、测试Kafka集群

新建 maven 项目,添加 Kafka 依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

笔者新建 maven项目 kafka-learn

kafka-learn 项目 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wsjzzcbq</groupId><artifactId>kafka-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>11</source><target>11</target></configuration></plugin></plugins></build>
</project>

新建生产者 ProducerDemo

package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** Demo** @author wsjz* @date 2023/11/24*/
public class ProducerDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();//配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");//配置序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(properties);//topic 名称是demo_topicProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");RecordMetadata recordMetadata = producer.send(producerRecord).get();System.out.println(recordMetadata.topic());System.out.println(recordMetadata.partition());System.out.println(recordMetadata.offset());}
}

新建消费者 ConsumerDemo

package com.wsjzzcbq;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** ConsumerDemo** @author wsjz* @date 2023/11/24*/
public class ConsumerDemo {public static void main(String[] args) {Properties properties = new Properties();// 配置集群节点信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");// 消费分组名properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");// 序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 消费者订阅主题consumer.subscribe(Arrays.asList("demo_topic"));while (true) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String,String> record:records) {System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),record.offset(),record.key(),record.value());}}}
}

运行测试

效果图

至此完

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/238778.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[二分查找]LeetCode1964:找出到每个位置为止最长的有效障碍赛跑路线

本文涉及的基础知识点 二分查找算法合集 作者推荐 动态规划LeetCode2552&#xff1a;优化了6版的1324模式 题目 你打算构建一些障碍赛跑路线。给你一个 下标从 0 开始 的整数数组 obstacles &#xff0c;数组长度为 n &#xff0c;其中 obstacles[i] 表示第 i 个障碍的高度…

【矩阵论】Chapter 2—内积空间知识点总结复习

文章目录 内积空间1 内积空间2 标准正交向量集3 Gram-Schmidt正交化方法4 正交子空间5 最小二乘问题6 正交矩阵和酉矩阵 内积空间 1 内积空间 内积空间定义 设 V V V是在数域 F F F上的向量空间&#xff0c;则 V V V到 F F F的一个代数运算记为 ( α , β ) (\alpha,\beta) (α…

春秋云镜:CVE-2022-28512

靶标介绍&#xff1a; Fantastic Blog (CMS)是一个绝对出色的博客/文章网络内容管理系统。它使您可以轻松地管理您的网站或博客&#xff0c;它为您提供了广泛的功能来定制您的博客以满足您的需求。它具有强大的功能&#xff0c;您无需接触任何代码即可启动并运行您的博客。 该…

【Vulnhub 靶场】【Funbox: Lunchbreaker】【简单】【20210522】

1、环境介绍 靶场介绍&#xff1a;https://www.vulnhub.com/entry/funbox-lunchbreaker,700/ 靶场下载&#xff1a;https://download.vulnhub.com/funbox/FunboxLunchbreaker.ova 靶场难度&#xff1a;简单 发布日期&#xff1a;2021年05月22日 文件大小&#xff1a;1.6 GB 靶…

关于前端的学习思考-父子盒子溢出问题

先摆图片 很明显&#xff0c;大盒子高度设置400px&#xff0c;小盒子都是高度设置成300px&#xff0c;明显400px<600px&#xff0c;这时候子盒子就会溢出。如何解决溢出问题&#xff1f; 这个时候我把子盒子换成50%&#xff0c;50%。发现并不会溢出&#xff0c;因为相当于两…

localStorage 和sessionStorage

localStorage 和 sessionStorage 是浏览器提供的两种客户端存储数据的方式&#xff1a; 生命周期&#xff1a; localStorage&#xff1a; 存储在 localStorage 中的数据在浏览器关闭后仍然保留&#xff0c;直到被显式删除或浏览器清除缓存。sessionStorage&#xff1a; 存储在 …

【科技素养】蓝桥杯STEMA 科技素养组模拟练习试卷14

单选题 1、下列现象中有化学变化发生的是 A、蜡烛融化 B、冰块融化 C、电磁炉烧开水 D、铁生锈 答案&#xff1a;D 2、把左边的图形用剪刀剪开&#xff0c;拼成右边的正方形&#xff0c;至少剪几刀 A、1 B、2 C、3 D、4 答案&#xff1a;B 3、能够检验土壤中有沙和粘…

macOS本地调试k8s源码

目录 准备工作创建集群注意点1. kubeconfig未正常加载2. container runtime is not running3. The connection to the server 172.16.190.132:6443 was refused - did you specify the right host or port?4. 集群重置5.加入子节点 代码调试 准备工作 apple m1芯片 安装vmwa…

计算机网络——数据链路层-封装成帧(帧定界、透明传输-字节填充,比特填充、MTU)

目录 介绍 帧定界 PPP帧 以太网帧 透明传输 字节填充&#xff08;字符填充&#xff09; 比特填充 比特填充习题 MTU 介绍 所谓封装成帧&#xff0c;就是指数据链路层给上层交付下来的协议数据单元添加帧头和帧尾&#xff0c;使之成为帧。 例如下图所示&#xff1a; …

Unity Image - 镜像

1、为什么要使用镜像 在游戏开发过程中&#xff0c;我们经常会为了节省 美术图片资源大小&#xff0c;美术会将两边相同的图片进行切一半来处理。如下所示一个按钮 需要 400 * 236&#xff0c;然而美术只需要切一张 74*236的大小就可以了。这样一来图集就可以容纳更多的图片。…

[WP] ISCTF2023 Web 部分题解

圣杯战争!!! 反序列化伪协议读取 where_is_the_flag 环境变量根目录当前目录 绕进你的心里 利用正则最大回溯绕过 easy_website or select 用双写绕过 空格用/**/绕&#xff0c;报错注入 wafr codesystem(ca\t /f*) webinclude 扫描得到index.bak备份文件打开为加密的代码 写…

java游戏攻略资讯网站的设计与实现springboot+vue

游戏攻略网站分为管理员与用户两种角色。 管理员的功能包括登录&#xff0c;用户管理&#xff0c;游戏分类管理&#xff0c;游戏攻略管理&#xff0c;游戏资讯管理等。 登录功能&#xff1a;管理员需要登录进入系统后台。 用户管理&#xff1a;实现用户信息的查询&#xff0c;修…