Kafka-TopicPartition

Kafka主题与分区

主题与分区

topic & partition,是Kafka两个核心的概念,也是Kafka的基本组织单元。 主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水平扩展性、容错性等优势。 分区可以有一个至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等

主题的管理

主题的管理

  • 创建主题

  • 查看主题信息

  • 修改主题

  • 删除主题

上述操作可以采用Kafka提供的kafka-topics.sh脚本来完成,也可以采用Kafka提供的AdminClient来完成。 该脚本位于¥KAFKA_HOME/bin目录下 image

创建主题

创建主题的命令格式如下:

kafka-topics.sh --bootstrap-server <server:port> \--create --topic <topic> \--partitions <numPartitions> \--replication-factor <replicationFactor>

创建一个分区数为4、副本因子为2的主题

kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create \--partitions 4 \--replication-factor 2

创建一个分区数为4、副本因子为2的主题,并且指定主题的配置信息

kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create \--partitions 4 \--replication-factor 2 \--config max.message.bytes=128000

通过describe指令来查看分区副本的分配细节

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create

使用replica-assignment参数手动指定分区副本的分配方案

使用这种方式根据分区号的数值大小按照从小到大的顺序进行排列

例如:0:1:2,0:1:2,0:1:2,0:1:2

  • 分区与分区之间用逗号分隔

  • 分区与副本之间用冒号分隔

kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic topic-create-same \--replica-assignment 0:1:2,0:1:2,0:1:2,0:1:2

注意:

  • 同一个分区内的副本不能有重复,比如0:0,1:1这样,就会报出AdminCommandFailedException异常

  • 分区之间所指定的副本数不同,比如0:0,1:1这样,就会报出AdminOperationException异常

主题命名规范

  • 主题名称只能包含ASCII字母、数字、点、减号和下划线

  • 主题名称长度不能超过249个字符

  • 主题名称不能以点开头

  • 不能以__开头,这是Kafka内部使用的主题前缀

  • 不能包含空格、单引号、双引号、逗号、分号、冒号和NULL字符

  • 主题名称应该全部小写,因为Kafka在区分主题名称时是不区分大小写的

  • 主题名称不能与Kafka保留的名称冲突,比如__consumer_offsets

  • 主题名称不能与已经存在的消费者组名称冲突

  • 主题名称不能与已经存在的主题名称冲突

查看主题信息

通过list指令来查看当前Kafka集群中所有可用的主题

kafka-topics.sh --bootstrap-server localhost:9092 --list

image

通过describe指令来查看主题的详细信息

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create

image

修改主题

当主题被创建之后,依然允许我们对其做一定的修改,比如修改分区数、修改副本因子、修改配置等。 通过alter指令来修改主题的配置信息

# 修改主题的最大消息字节数,配置值从10000修改为20000kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-config \--config max.message.bytes=20000

通过alter指令来修改主题的分区数

kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-create \--partitions 6

删除主题

通过delete指令来删除主题

kafka-topics.sh --bootstrap-server localhost:9092 \--delete --topic topic-delete

通过delete-config参数来删除之前设置的配置信息

kafka-topics.sh --bootstrap-server localhost:9092 \--alter --topic topic-config \--delete-config max.message.bytes

手动删除主题

  • 主题中的元数据存储在Zookeeper中的/brokers/topics和/config/topics路径下

  • 主题中的消息数据存储在log.dir或log.dirs配置的路径下,只需要手动删除这些地方的数据即可。

配置管理

kafka-configs.sh脚本用于管理Kafka的配置信息,该脚本位于$KAFKA_HOME/bin目录下 主要包含变更配置alter和查看配置describe两个指令

# 变更主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--alter --entity-type topics --entity-name topic-config \--add-config max.message.bytes=128000# 添加主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--alter --entity-type topics --entity-name topic-config \--add-config max.message.bytes=128000# 查看主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \--describe --entity-type topics --entity-name topic-config    

KafkaAdminClient

KafkaAdminClient是Kafka提供的一个管理客户端,用于管理Kafka集群中的资源,比如主题、分区、消费者组等。

TopicCommand基本使用

使用KafkaAdminClient来完成TopicCommand的基本操作

查看主题信息

public class demo{public static void describeTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--describe","--topic", "topic-create"};kafka.admin.TopicCommand.main(options);}
}

创建主题

public class demo{public static void createTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--create","--replication-factor", "1","--partitions", "1","--topic", "topic-create-api"};kafka.admin.TopicCommand.main(options);}
}

查看所有可用主题

public class demo{public static void listTopic(){String[ ] options = new String[ ]{"--bootstrap-server localhost:9092","--list"};kafka.admin.TopicCommand.main(options);}
}

KafkaAdminClient基本使用

KafkaAdminClient可以用来管理broker、配置和ACL(Access Control List),以及管理主题、分区和消费者组等。 KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient,提供了一系列的API来管理Kafka集群中的资源。

AdminClient常见的方法

  • createTopics:创建主题

    • CreateTopicsResult createTopics(Collection newTopics)
  • deleteTopics:删除主题

    • DeleteTopicsResult deleteTopics(Collection topics)
  • listTopics:列出所有可用的主题

    • ListTopicsResult listTopics()
  • describeTopics:查看主题的详细信息

    • DescribeTopicsResult describeTopics(Collection topicNames)
  • describeCluster:查看集群的详细信息

    • DescribeClusterResult describeCluster()
  • describeConfigs:查看配置的详细信息

    • DescribeConfigsResult describeConfigs(Collection resources)
  • alterConfigs:修改配置信息

    • AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs)
  • describeConsumerGroups:查看消费者组的详细信息

    • DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds)
  • listConsumerGroups:列出所有可用的消费者组

    • ListConsumerGroupsResult listConsumerGroups()
  • createPartitions:创建分区

    • CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions)
使用KafkaAdminClient创建主题
public class KafkaAdminClientCreateTopic {/*** 使用AdminClient创建Topic** 创建完成之后使用如下脚本进行检查* 进入KAFKA_HOME/bin* 执行 ./kafka-topics.sh --bootstrap-server localhost:9092 --list*/public static void createTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);NewTopic newTopic = new NewTopic("topic-create-api", 1, (short) 1);// 创建主题的方法内部是通过发送CreateTopicRequest请求来完成的CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {createTopic();}
}
使用KafkaAdminClient查看主题信息
public class KafkaAdminClientDescribeTopic {/*** 使用AdminClient查看Topic信息*/public static void describeTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("topic-create-api"));try {Map<String, TopicDescription> map = result.all().get();for (Map.Entry<String, TopicDescription> entry : map.entrySet()) {System.out.println(entry.getKey() + " : " + entry.getValue());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {describeTopic();}
}
使用KafkaAdminClient查看所有可用的主题
public class KafkaAdminClientListTopic {/*** 使用AdminClient查看所有可用的Topic*/public static void listTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ListTopicsResult result = adminClient.listTopics();try {Set<String> set = result.names().get();for (String s : set) {System.out.println(s);}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {listTopic();}
}
使用KafkaAdminClient创建分区
public class KafkaAdminClientCreatePartition {/*** 使用AdminClient创建分区*/public static void createPartition(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);Map<String, NewPartitions> map = new HashMap<>();NewPartitions newPartitions = NewPartitions.increaseTo(2);map.put("topic-create-api", newPartitions);CreatePartitionsResult result = adminClient.createPartitions(map);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {createPartition();}
}
使用KafkaAdminClient删除主题
public class KafkaAdminClientDeleteTopic {/*** 使用AdminClient删除Topic*/public static void deleteTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList("topic-create-api"));try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {deleteTopic();}
}
使用KafkaAdminClient修改主题配置
public class KafkaAdminClientAlterTopic {/*** 使用AdminClient修改Topic配置*/public static void alterTopic(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ConfigEntry configEntry = new ConfigEntry("max.message.bytes", "128000");Config config = new Config(Arrays.asList(configEntry));Map<ConfigResource, Config> map = new HashMap<>();ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");map.put(configResource, config);AlterConfigsResult result = adminClient.alterConfigs(map);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {alterTopic();}
}
使用KafkaAdminClient查看主题配置
public class KafkaAdminClientDescribeTopicConfig {/*** 使用AdminClient查看Topic配置*/public static void describeTopicConfig(){Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(props);ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");DescribeConfigsResult result = adminClient.describeConfigs(Arrays.asList(configResource));try {Map<ConfigResource, Config> map = result.all().get();for (Map.Entry<ConfigResource, Config> entry : map.entrySet()) {System.out.println(entry.getKey() + " : " + entry.getValue());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}// 使用完之后需要关闭AdminClient,释放资源adminClient.close();}public static void main(String[ ] args) {describeTopicConfig();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/219230.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Web】Ctfshow Thinkphp3.2.3代码审计(1)

目录 ①web569 ②web570 ③web571 ④web572 ①web569 基础考察 /index.php/Admin/Login/ctfshowLogin ②web570 提示找路由 查看附件源码 (config.php) 发现定义了一个可执行命令的路由规则 /index.php/ctfshow/assert/eval($_POST[1]) 1system(tac /f*); ③web571 提…

设备树是什么?

设备树&#xff1a; 设备树DTS(Device Tree Source) 描述设备信息的独立的文件。 为什么要引入设备树&#xff1f; 随着芯片的发展&#xff0c;Linux内核中就包含着越来越多这些描述设备的代码&#xff0c;导致Linux内核代码会很臃肿。因此引入了设备树文件&#xff0c;从…

手摸手Element-Plus组件化开发

前端环境准备 编码工具: VSCode 依赖管理:NPM 项目构建: Vuecli NPM的全称是Node Package Manager&#xff0c;是一个NodeJS包管理和分发工具&#xff0c;已经成为了非官方的发布Node模块&#xff08;包&#xff09;的标准。2020年3月17日&#xff0c;Github宣布收购npm&am…

大数据项目--学习笔记

新零售项目介绍 1&#xff0c;行业背景介绍 一&#xff0c;百货商店 百货商店是世界商业史上第一个实行新销售方法的现代大量销售组织。其新型销售方法有&#xff1a; 1&#xff0e;顾客可以毫无顾忌地、自由自在地进出商店&#xff1b; 2&#xff0e;商品销售实行“明码标价…

【brpc学习实践八】bvar及其应用

什么是bvar bvar是多线程环境下的计数器类库&#xff0c;支持单维度bvar和多维度mbvar&#xff0c;方便记录和查看用户程序中的各类数值&#xff0c;它利用了thread local存储减少了cache bouncing&#xff0c;相比UbMonitor(百度内的老计数器库)几乎不会给程序增加性能开销&a…

HarmonyOS安装三方库遇到的问题

使用开发电脑系统为&#xff1a;MacOS, 开发工具为&#xff1a;DevEco-Studio版本号3.1.1 Release。在控制栏使用终端工具输入命令&#xff1a;ohpm install ohos/lottie遇到的第一个问题如下图。 解决方案&#xff1a; 1、在首选项中找到ohpm的安装路径。 2、打开bash_profil…

java爱心代码,脱单必备

package com.example.test;import java.awt.Color;import java.awt.Font;import java.awt.Graphics;import java.awt.Image;import java.awt.Toolkit; import java.util.jar.JarOutputStream;import javax.swing.JFrame;class Cardioid extends JFrame {//定义窗口大小private …

⑩【Redis Java客户端】:Jedis、SpringDataRedis、StringRedisTemplate

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ Jedis、SpringDataRedis、StringRedisTemplate…

Linux 命令vim(编辑器)

(一)vim编辑器的介绍 vim是文件编辑器&#xff0c;是vi的升级版本&#xff0c;兼容vi的所有指令&#xff0c;同时做了优化和延伸。vim有多种模式&#xff0c;其中常用的模式有命令模式、插入模式、末行模式&#xff1a;。 (二)vim编辑器基本操作 1 进入vim编辑文件 1 vim …

死磕Nacos系列:Nacos事件发布订阅模型

前言 在Nacos源码中&#xff0c;你是否也经常看到NotifyCenter.publishEvent这样的代码块&#xff1f; 这个事件发布出去后&#xff0c;有哪些类接收到通知并进行了逻辑处理呢&#xff1f; 这里面的实现逻辑是什么呢&#xff1f; 如果你不太清楚&#xff0c;那我们一起来梳理…

2024年天津天狮学院食品质量与安全专业《普通化学》考试大纲

2024年天津天狮学院食品质量与安全专业高职升本入学考试《普通化学》考试大纲 一、考试性质 《普通化学》专业课程考试是天津天狮学院食品质量与安全专业高职升本入学考试 的必考科目之一&#xff0c;其性质是考核学生是否达到了升入本科继续学习的要求而进行的选拔性考试。《…

大数据面试大厂真题【附答案详细解析】

1.Java基础篇&#xff08;阿里、蚂蚁、字节、携程、快手、杭州银行等&#xff09; 问题&#xff1a;HashMap的底层实现原理 答案&#xff1a; 在jdk1.8之前&#xff0c;hashmap由 数组-链表数据结构组成&#xff0c;在jdk1.8之后hashmap由 数组-链表-红黑树数据结构组成&…