kafka 磁盘扩容与数据均衡实在操作讲解

文章目录

    • 一、概述
      • 1)Kafka 磁盘扩容概述
      • 2)Kafka 数据均衡概述
    • 二、K8s 集群部署
    • 三、kafka on k8s 环境部署
      • 1)安装 helm
      • 2)安装 zookeeper
        • 1、添加源并下载部署包
        • 2、修改配置
        • 3、开始安装 zookeeper
        • 4、测试验证
        • 5、卸载
      • 3)安装 kafka
        • 1、添加源并下载部署包
        • 2、修改配置
        • 3、开始安装
        • 4、测试验证
        • 5、卸载
    • 四、kafka 分区与副本
      • 1)分区(Partitions):
      • 2)副本(Replicas):
    • 五、kafka 磁盘扩容
    • 六、数据均衡(分区迁移)
      • 1)查看topic分区情况
      • 2)查看分区大小
      • 3)编写 move-json-file.json,生成执行计划
        • 【示例一】分区迁移
      • 4)开始迁移
        • 【示例二】磁盘间、不同路径分区迁移

一、概述

Kafka 的磁盘扩容和数据均衡是与保证Kafka集群可用性和性能相关的两个重要方面。在 Kafka 中,分区数据的存储和平衡对集群的运行至关重要。以下是有关Kafka磁盘扩容和数据均衡的一些建议:

1)Kafka 磁盘扩容概述

  1. 添加新磁盘:在服务器上添加新的磁盘,确保磁盘有足够的容量,并且其性能符合集群的需求。

  2. 修改 Kafka 配置:在Kafka的配置文件(server.properties)中更新log.dirs属性,将新磁盘路径添加到现有的路径中。

log.dirs=/path/to/old/disk,/path/to/new/disk
  1. 重新启动 Kafka 节点:重新启动 Kafka 节点,确保新的配置生效。在进行重启之前,确保已经备份了关键的配置文件和数据。

2)Kafka 数据均衡概述

  • 分区重新平衡:在 Kafka 中,分区数据的均衡很重要,以确保每个节点的负载相对均匀。您可以使用 Kafka 提供的工具或 API 来重新平衡分区,确保每个节点负责处理相似数量的分区和数据。

  • 监控分区状态:使用Kafka的监控工具,例如Kafka ManagerBurrow 等,来监控分区的状态和分布情况。确保没有分区处于不平衡的状态。

  • 手动干预:在某些情况下,可能需要手动干预来解决数据均衡问题。这可能包括手动重新分配分区或手动调整分区的副本分布。

  • 考虑工作负载变化:在Kafka集群上部署新的生产者或消费者时,要考虑工作负载的变化。新的生产者可能导致更多的数据写入,而新的消费者可能导致更多的数据读取。

  • 分区数量和副本:考虑适当的分区数量和副本数量。分区数太多可能导致管理和维护的困难,而分区数太少可能导致单个节点的负载过重。

  • 使用Kafka工具:Kafka提供了一些工具,如 kafka-reassign-partitions.sh 用于手动重新分配分区,以及 kafka-preferred-replica-election.sh 用于执行首选副本选举。

在进行磁盘扩容和数据均衡时,请确保在生产环境中小心操作,并在非生产环境中进行测试和模拟。细心的规划和执行可以确保Kafka集群的可用性和性能。

在这里插入图片描述

二、K8s 集群部署

k8s 环境安装之前写过很多文档,可以参考我以下几篇文章:

  • 【云原生】k8s 离线部署讲解和实战操作
  • 【云原生】k8s 环境快速部署(一小时以内部署完)

三、kafka on k8s 环境部署

这里为了快速演示,选择了 on k8s 部署方式,当然也可以选择物理机部署方式。以前也写过很多关于 kafka的文章,可以参考一下:

  • Kafka原理介绍+安装+基本操作(kafka on k8s)
  • 大数据Hadoop之——Kafka鉴权认证
  • 大数据Hadoop之——Kafka安全机制(Kafka SSL认证实现)
  • 大数据Hadoop之——Kafka Streams原理介绍与简单应用示例
  • 大数据Hadoop之——Kafka 图形化工具 EFAK(EFAK环境部署)
  • 大数据Hadoop之——EFAK安全认证实现(kafka+zookeeper)
  • 【云原生】zookeeper + kafka on k8s 环境部署
  • 【中间件】通过 docker-compose 快速部署 Kafka 保姆级教程

1)安装 helm

# 下载包
wget https://get.helm.sh/helm-v3.7.1-linux-amd64.tar.gz -O /tmp/helm-v3.7.1-linux-amd64.tar.gz
# 解压压缩包
tar -xf /tmp/helm-v3.7.1-linux-amd64.tar.gz -C /root/
# 软链
ln -s /root/linux-amd64/helm /usr/local/bin/helm

2)安装 zookeeper

在这里插入图片描述

1、添加源并下载部署包
helm repo add bitnami https://charts.bitnami.com/bitnami
helm pull bitnami/zookeeper --version 10.2.1
tar -xf  zookeeper-10.2.1.tgz
2、修改配置
  • 修改zookeeper/values.yaml
image:registry: registry.cn-hangzhou.aliyuncs.comrepository: bigdata_cloudnative/zookeepertag: 3.8.0-debian-11-r36
...replicaCount: 3...service:type: NodePortnodePorts:#NodePort 默认范围是 30000-32767client: "32181"tls: "32182"...persistence:storageClass: "zookeeper-local-storage"size: "10Gi"# 目录需要提前在宿主机上创建local:- name: zookeeper-0host: "local-168-182-110"path: "/opt/bigdata/servers/zookeeper/data/data1"- name: zookeeper-1host: "local-168-182-111"path: "/opt/bigdata/servers/zookeeper/data/data1"- name: zookeeper-2host: "local-168-182-112"path: "/opt/bigdata/servers/zookeeper/data/data1"...# Enable Prometheus to access ZooKeeper metrics endpoint
metrics:enabled: true
  • 添加zookeeper/templates/pv.yaml
{{- range .Values.persistence.local }}
---
apiVersion: v1
kind: PersistentVolume
metadata:name: {{ .name }}labels:name: {{ .name }}
spec:storageClassName: {{ $.Values.persistence.storageClass }}capacity:storage: {{ $.Values.persistence.size }}accessModes:- ReadWriteOncelocal:path: {{ .path }}nodeAffinity:required:nodeSelectorTerms:- matchExpressions:- key: kubernetes.io/hostnameoperator: Invalues:- {{ .host }}
---
{{- end }}
  • 添加zookeeper/templates/storage-class.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:name: {{ .Values.persistence.storageClass }}
provisioner: kubernetes.io/no-provisioner
  • 设置时区,zookeeper/templates/statefulset.yaml
  env:- name: TZvalue: Asia/Shanghai
3、开始安装 zookeeper
docker pull docker.io/bitnami/zookeeper:3.8.0-debian-11-r36# 为了方便下次快速拉取镜像,将镜像推送到阿里云上
docker tag docker.io/bitnami/zookeeper:3.8.0-debian-11-r36 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/zookeeper:3.8.0-debian-11-r36
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/zookeeper:3.8.0-debian-11-r36# 开始安装
helm install zookeeper ./zookeeper -n zookeeper --create-namespace

NOTES

NAME: zookeeper
LAST DEPLOYED: Sun Nov 12 22:39:36 2023
NAMESPACE: zookeeper
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: zookeeper
CHART VERSION: 10.2.1
APP VERSION: 3.8.0** Please be patient while the chart is being deployed **ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:zookeeper.zookeeper.svc.cluster.localTo connect to your ZooKeeper server run the following commands:export POD_NAME=$(kubectl get pods --namespace zookeeper -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")kubectl exec -it $POD_NAME -- zkCli.shTo connect to your ZooKeeper server from outside the cluster execute the following commands:export NODE_IP=$(kubectl get nodes --namespace zookeeper -o jsonpath="{.items[0].status.addresses[0].address}")export NODE_PORT=$(kubectl get --namespace zookeeper -o jsonpath="{.spec.ports[0].nodePort}" services zookeeper)zkCli.sh $NODE_IP:$NODE_PORT

查看pod状态

kubectl get pods,svc -n zookeeper -owide
4、测试验证
# 登录zookeeper pod
kubectl exec -it zookeeper-0 -n zookeeper -- zkServer.sh status
kubectl exec -it zookeeper-1 -n zookeeper -- zkServer.sh status
kubectl exec -it zookeeper-2 -n zookeeper -- zkServer.sh statuskubectl exec -it zookeeper-0 -n zookeeper -- bash
5、卸载
helm uninstall zookeeper -n zookeeperkubectl delete pod -n zookeeper `kubectl get pod -n zookeeper|awk 'NR>1{print $1}'` --force
kubectl patch ns zookeeper -p '{"metadata":{"finalizers":null}}'
kubectl delete ns zookeeper --force

3)安装 kafka

1、添加源并下载部署包
helm repo add bitnami https://charts.bitnami.com/bitnami
helm pull bitnami/kafka --version 18.4.2
tar -xf kafka-18.4.2.tgz
2、修改配置
  • 修改kafka/values.yaml
image:registry: registry.cn-hangzhou.aliyuncs.comrepository: bigdata_cloudnative/kafkatag: 3.2.1-debian-11-r16...replicaCount: 3...service:type: NodePortnodePorts:client: "30092"external: "30094"...externalAccessenabled: trueservice:type: NodePortnodePorts:- 30001- 30002- 30003useHostIPs: true...persistence:storageClass: "kafka-local-storage"size: "10Gi"# 目录需要提前在宿主机上创建local:- name: kafka-0host: "local-168-182-110"path: "/opt/bigdata/servers/kafka/data/data1"- name: kafka-1host: "local-168-182-111"path: "/opt/bigdata/servers/kafka/data/data1"- name: kafka-2host: "local-168-182-112"path: "/opt/bigdata/servers/kafka/data/data1"...metrics:kafka:enabled: trueimage:registry: registry.cn-hangzhou.aliyuncs.comrepository: bigdata_cloudnative/kafka-exportertag: 1.6.0-debian-11-r8jmx:enabled: trueimage:registry: registry.cn-hangzhou.aliyuncs.comrepository: bigdata_cloudnative/jmx-exportertag: 0.17.1-debian-11-r1annotations:prometheus.io/path: "/metrics"...zookeeper:enabled: false...externalZookeeperservers:- zookeeper-0.zookeeper-headless.zookeeper- zookeeper-1.zookeeper-headless.zookeeper- zookeeper-2.zookeeper-headless.zookeeper
  • 添加kafka/templates/pv.yaml
{{- range .Values.persistence.local }}
---
apiVersion: v1
kind: PersistentVolume
metadata:name: {{ .name }}labels:name: {{ .name }}
spec:storageClassName: {{ $.Values.persistence.storageClass }}capacity:storage: {{ $.Values.persistence.size }}accessModes:- ReadWriteOncelocal:path: {{ .path }}nodeAffinity:required:nodeSelectorTerms:- matchExpressions:- key: kubernetes.io/hostnameoperator: Invalues:- {{ .host }}
---
{{- end }}
  • 添加kafka/templates/storage-class.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:name: {{ .Values.persistence.storageClass }}
provisioner: kubernetes.io/no-provisioner
  • 设置时区,kafka/templates/statefulset.yaml
  env:- name: TZvalue: Asia/Shanghai
3、开始安装
docker pull docker.io/bitnami/kafka:3.2.1-debian-11-r16# 为了方便下次快速拉取镜像,将镜像推送到阿里云上
docker tag docker.io/bitnami/kafka:3.2.1-debian-11-r16 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:3.2.1-debian-11-r16
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:3.2.1-debian-11-r16# node-export
docker pull docker.io/bitnami/kafka-exporter:1.6.0-debian-11-r8
docker tag docker.io/bitnami/kafka-exporter:1.6.0-debian-11-r8 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka-exporter:1.6.0-debian-11-r8
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka-exporter:1.6.0-debian-11-r8# JXM
docker pull docker.io/bitnami/jmx-exporter:0.17.1-debian-11-r1
docker tag docker.io/bitnami/jmx-exporter:0.17.1-debian-11-r1 registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/jmx-exporter:0.17.1-debian-11-r1
docker push registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/jmx-exporter:0.17.1-debian-11-r1#开始安装
helm install kafka ./kafka -n kafka --create-namespace

NOTES

NAME: kafka
LAST DEPLOYED: Sun Nov 12 23:32:49 2023
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 18.4.2
APP VERSION: 3.2.1** Please be patient while the chart is being deployed **Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:kafka.kafka.svc.cluster.localEach Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:kafka-0.kafka-headless.kafka.svc.cluster.local:9092kafka-1.kafka-headless.kafka.svc.cluster.local:9092kafka-2.kafka-headless.kafka.svc.cluster.local:9092To create a pod that you can use as a Kafka client run the following commands:kubectl run kafka-client --restart='Never' --image registry.cn-hangzhou.aliyuncs.com/bigdata_cloudnative/kafka:3.2.1-debian-11-r16 --namespace kafka --command -- sleep infinitykubectl exec --tty -i kafka-client --namespace kafka -- bashPRODUCER:kafka-console-producer.sh \--broker-list kafka-0.kafka-headless.kafka.svc.cluster.local:9092,kafka-1.kafka-headless.kafka.svc.cluster.local:9092,kafka-2.kafka-headless.kafka.svc.cluster.local:9092 \--topic testCONSUMER:kafka-console-consumer.sh \--bootstrap-server kafka.kafka.svc.cluster.local:9092 \--topic test \--from-beginning

查看pod状态

kubectl get pods,svc -n kafka -owide
4、测试验证
# 登录zookeeper pod
kubectl exec -it kafka-0 -n kafka -- bash# 1、创建分区
kafka-topics.sh --create --topic test001 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1
# 查看
kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092  --topic test001

问题处理:Error: Exception thrown by the agent : java.rmi.server.ExportException: Port already in use: 5555; nested exception is:

在这里插入图片描述
修改 /opt/bitnami/kafka/bin/kafka-run-class.sh 脚本,修改内容如下:

# 增加
ISKAFKASERVER="false"
if [[ "$*" =~ "kafka.Kafka" ]]; thenISKAFKASERVER="true"
fi# 修改
# if [  $JMX_PORT ];then
if [  $JMX_PORT ] && [ -z "$ISKAFKASERVER" ]; then

修改后的完整脚本

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.if [ $# -lt 1 ];
thenecho "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"exit 1
fi# CYGWIN == 1 if Cygwin is detected, else 0.
if [[ $(uname -a) =~ "CYGWIN" ]]; thenCYGWIN=1
elseCYGWIN=0
fiif [ -z "$INCLUDE_TEST_JARS" ]; thenINCLUDE_TEST_JARS=false
fi# Exclude jars not necessary for running commands.
regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc|connect-file.*\.jar)$"
should_include_file() {if [ "$INCLUDE_TEST_JARS" = true ]; thenreturn 0fifile=$1if [ -z "$(echo "$file" | egrep "$regex")" ] ; thenreturn 0elsereturn 1fi
}ISKAFKASERVER="false"
if [[ "$*" =~ "kafka.Kafka" ]]; thenISKAFKASERVER="true"
fibase_dir=$(dirname $0)/..if [ -z "$SCALA_VERSION" ]; thenSCALA_VERSION=2.13.6if [[ -f "$base_dir/gradle.properties" ]]; thenSCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`fi
fiif [ -z "$SCALA_BINARY_VERSION" ]; thenSCALA_BINARY_VERSION=$(echo $SCALA_VERSION | cut -f 1-2 -d '.')
fi# run ./gradlew copyDependantLibs to get all dependant jars in a local dir
shopt -s nullglob
if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; thenfor dir in "$base_dir"/core/build/dependant-libs-${SCALA_VERSION}*;doCLASSPATH="$CLASSPATH:$dir/*"done
fifor file in "$base_dir"/examples/build/libs/kafka-examples*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
doneif [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; thenclients_lib_dir=$(dirname $0)/../clients/build/libsstreams_lib_dir=$(dirname $0)/../streams/build/libsstreams_dependant_clients_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
elseclients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libsstreams_lib_dir=$clients_lib_dirstreams_dependant_clients_lib_dir=$streams_lib_dir
fifor file in "$clients_lib_dir"/kafka-clients*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
donefor file in "$streams_lib_dir"/kafka-streams*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
doneif [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; thenfor file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fidone
elseVERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix numberfor file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;doif should_include_file "$file"; thenCLASSPATH="$file":"$CLASSPATH"fidoneif [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; thenCLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"fiif [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; thenCLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"fi
fifor file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
doCLASSPATH="$CLASSPATH":"$file"
donefor file in "$streams_dependant_clients_lib_dir"/*hamcrest*.jar;
doCLASSPATH="$CLASSPATH":"$file"
donefor file in "$base_dir"/shell/build/libs/kafka-shell*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
donefor dir in "$base_dir"/shell/build/dependant-libs-${SCALA_VERSION}*;
doCLASSPATH="$CLASSPATH:$dir/*"
donefor file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
donefor dir in "$base_dir"/tools/build/dependant-libs-${SCALA_VERSION}*;
doCLASSPATH="$CLASSPATH:$dir/*"
donefor file in "$base_dir"/trogdor/build/libs/trogdor-*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
donefor dir in "$base_dir"/trogdor/build/dependant-libs-${SCALA_VERSION}*;
doCLASSPATH="$CLASSPATH:$dir/*"
donefor cc_pkg in "api" "transforms" "runtime" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
dofor file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fidoneif [ -d "$base_dir/connect/${cc_pkg}/build/dependant-libs" ] ; thenCLASSPATH="$CLASSPATH:$base_dir/connect/${cc_pkg}/build/dependant-libs/*"fi
done# classpath addition for release
for file in "$base_dir"/libs/*;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
donefor file in "$base_dir"/core/build/libs/kafka_${SCALA_BINARY_VERSION}*.jar;
doif should_include_file "$file"; thenCLASSPATH="$CLASSPATH":"$file"fi
done
shopt -u nullglobif [ -z "$CLASSPATH" ] ; thenecho "Classpath is empty. Please build the project first e.g. by running './gradlew jar -PscalaVersion=$SCALA_VERSION'"exit 1
fi# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; thenKAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "
fi# JMX port to use
if [  $JMX_PORT ] && [ -z "$ISKAFKASERVER" ]; thenKAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
fi# Log directory to use
if [ "x$LOG_DIR" = "x" ]; thenLOG_DIR="$base_dir/logs"
fi# Log4j settings
if [ -z "$KAFKA_LOG4J_OPTS" ]; then# Log to console. This is a tool.LOG4J_DIR="$base_dir/config/tools-log4j.properties"# If Cygwin is detected, LOG4J_DIR is converted to Windows format.(( CYGWIN )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
else# create logs directoryif [ ! -d "$LOG_DIR" ]; thenmkdir -p "$LOG_DIR"fi
fi# If Cygwin is detected, LOG_DIR is converted to Windows format.
(( CYGWIN )) && LOG_DIR=$(cygpath --path --mixed "${LOG_DIR}")
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; thenKAFKA_OPTS=""
fi# Set Debug options if enabled
if [ "x$KAFKA_DEBUG" != "x" ]; then# Use default portsDEFAULT_JAVA_DEBUG_PORT="5005"if [ -z "$JAVA_DEBUG_PORT" ]; thenJAVA_DEBUG_PORT="$DEFAULT_JAVA_DEBUG_PORT"fi# Use the defaults if JAVA_DEBUG_OPTS was not setDEFAULT_JAVA_DEBUG_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=${DEBUG_SUSPEND_FLAG:-n},address=$JAVA_DEBUG_PORT"if [ -z "$JAVA_DEBUG_OPTS" ]; thenJAVA_DEBUG_OPTS="$DEFAULT_JAVA_DEBUG_OPTS"fiecho "Enabling Java debug options: $JAVA_DEBUG_OPTS"KAFKA_OPTS="$JAVA_DEBUG_OPTS $KAFKA_OPTS"
fi# Which java to use
if [ -z "$JAVA_HOME" ]; thenJAVA="java"
elseJAVA="$JAVA_HOME/bin/java"
fi# Memory options
if [ -z "$KAFKA_HEAP_OPTS" ]; thenKAFKA_HEAP_OPTS="-Xmx256M"
fi# JVM performance options
# MaxInlineLevel=15 is the default since JDK 14 and can be removed once older JDKs are no longer supported
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; thenKAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true"
fiwhile [ $# -gt 0 ]; doCOMMAND=$1case $COMMAND in-name)DAEMON_NAME=$2CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.outshift 2;;-loggc)if [ -z "$KAFKA_GC_LOG_OPTS" ]; thenGC_LOG_ENABLED="true"fishift;;-daemon)DAEMON_MODE="true"shift;;*)break;;esac
done# GC options
GC_FILE_SUFFIX='-gc.log'
GC_LOG_FILE_NAME=''
if [ "x$GC_LOG_ENABLED" = "xtrue" ]; thenGC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX# The first segment of the version number, which is '1' for releases before Java 9# it then becomes '9', '10', ...# Some examples of the first line of `java --version`:# 8 -> java version "1.8.0_152"# 9.0.4 -> java version "9.0.4"# 10 -> java version "10" 2018-03-20# 10.0.1 -> java version "10.0.1" 2018-04-17# We need to match to the end of the line to prevent sed from printing the characters that do not matchJAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; thenKAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"elseKAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"fi
fi# Remove a possible colon prefix from the classpath (happens at lines like `CLASSPATH="$CLASSPATH:$file"` when CLASSPATH is blank)
# Syntax used on the right side is native Bash string manipulation; for more details see
# http://tldp.org/LDP/abs/html/string-manipulation.html, specifically the section titled "Substring Removal"
CLASSPATH=${CLASSPATH#:}# If Cygwin is detected, classpath is converted to Windows format.
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; thennohup "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
elseexec "$JAVA" $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp "$CLASSPATH" $KAFKA_OPTS "$@"
fi

将脚本覆盖容器里的

kubectl cp kafka-run-class.sh kafka-0:/opt/bitnami/kafka/bin/kafka-run-class.sh -n kafka

再执行创建topic

# 登录zookeeper pod
kubectl exec -it kafka-0 -n kafka -- bash# 参数解释:
# --create: 指定创建topic动作# --topic:指定新建topic的名称#--bootstrap-server: 指定kafka连接地址#--config:指定当前topic上有效的参数值,参数列表参考文档为: Topic-level configuration#--partitions:指定当前创建的kafka分区数量,默认为1个# --replication-factor:指定每个分区的副本数,默认1个# 1、创建topic,三分区,三副本,设置数据过期时间72小时(-1表示不过期,默认是永久保存的,不会自动过期),单位ms,72*3600*1000=259200000
kafka-topics.sh --create --topic test001 --bootstrap-server kafka.kafka:9092 --partitions 3 --replication-factor 3 --config retention.ms=259200000
# 查看
kafka-topics.sh --list --bootstrap-server kafka.kafka:9092
kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092  --topic test001

在这里插入图片描述

生产者/消费者测试

# 【生产者】
kafka-console-producer.sh --broker-list kafka.kafka:9092 --topic test001
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}# 【消费者】
# 从头开始消费
kafka-console-consumer.sh --bootstrap-server kafka.kafka:9092 --topic test001 --from-beginning
# 指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区
kafka-console-consumer.sh --bootstrap-server kafka.kafka:9092 --topic test001 --partition 0 --offset 100 --group test001# 查看数据积压
kafka-consumer-groups.sh --bootstrap-server kafka.kafka:9092 --describe --group test001

删除 topic

# 删除topic,默认是没有启用删除topic的
kafka-topics.sh --delete --topic test001 --bootstrap-server kafka.kafka:9092# 配置启用可以删除topic,topic 配置文件里,delete.topic.enable=true;k8s helm chat包里开启这个参数:
deleteTopicEnable: true
5、卸载
helm uninstall kafka -n kafkakubectl delete pod -n kafka `kubectl get pod -n kafka|awk 'NR>1{print $1}'` --force
kubectl patch ns kafka  -p '{"metadata":{"finalizers":null}}'
kubectl delete ns kafka  --force

四、kafka 分区与副本

Kafka中的分区(Partitions)和副本(Replicas)是关键的概念,它们有助于实现高可用性、容错性和扩展性。下面是有关Kafka分区和副本的基本概念:

1)分区(Partitions):

定义:分区是Kafka中用于存储消息的基本单元。每个主题(Topic)都可以被划分为一个或多个分区。分区中的每条消息都会被分配到一个特定的分区中。

1、作用:

  • 水平扩展:通过将主题划分为多个分区,Kafka可以水平扩展,允许消息的并行处理和更好的性能。
  • 顺序保证:每个分区中的消息保持有序。在同一分区中,消息的写入和读取顺序是严格有序的。

2、分区的属性:

  • 编号:每个分区都有一个唯一的编号(从0开始),用于标识分区。
  • 持久化:分区的数据是持久化的,可以在多个节点之间复制以提高可用性和容错性。
  • 副本数量:每个分区可以有一个或多个副本。

3、生产者和消费者:

  • 生产者可以指定消息发送到特定的分区。
  • 消费者订阅主题时,会消费所有分区中的消息。

2)副本(Replicas):

定义:副本是分区的复制。每个分区可以配置多个副本,这些副本分布在Kafka集群的不同节点上。

1、作用:

  • 高可用性:副本提供了故障恢复和高可用性。当某个节点或分区不可用时,仍然可以从其他节点或副本读取数据。
  • 容错性:通过在多个节点上存储相同的数据,即使某个节点发生故障,数据仍然可用。

2、副本的属性:

  • 同步复制:副本之间可以配置为同步或异步复制。同步复制确保写入操作在所有副本上都完成后才返回成功。
  • 领导者和追随者:每个分区都有一个领导者(Leader)和零个或多个追随者(Follower)。生产者和消费者通常与分区的领导者进行交互。

3、ISR(In-Sync Replicas)

  • ISR 是指与分区领导者保持同步的副本集合。只有ISR中的副本才能成为新的领导者。当某个副本无法保持同步时,它将从ISR中移除。

生产者和消费者与分区和副本的关系:

  • 生产者可以选择将消息发送到特定的分区,也可以根据分区键选择。
  • 消费者订阅主题时,会消费分区中的消息,与分区中的领导者和追随者进行交互。

总体而言,Kafka的分区和副本机制提供了高度的可伸缩性、高可用性和容错性,使其成为处理大规模实时数据流的强大平台。

五、kafka 磁盘扩容

场景:可能因为数据量上涨,就得靠谱扩容磁盘了,这里每个节点增加一块磁盘,如果不新增topic的情况下,是不会写到对应新磁盘的。kafka配置文件log.dirs增加了几个目录。

# log.dirs用来配置多个根目录(以逗号分隔)
log.dirs=/data1,/data2# 修改完配置重启kafka即可

六、数据均衡(分区迁移)

场景:kafka配置文件log.dirs增加了几个目录,但是新目录没有分区数据写入,所以打算进行重分区一下。

1)查看topic分区情况

# 登录zookeeper pod
kubectl exec -it kafka-0 -n kafka -- bash# 为了测试这里多建几个topic
kafka-topics.sh --create --topic test002 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
kafka-topics.sh --create --topic test003 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
kafka-topics.sh --create --topic test004 --bootstrap-server kafka.kafka:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000kafka-topics.sh --create --topic test005 --bootstrap-server kafka.kafka:9092 --partitions 2 --replication-factor 2 --config retention.ms=259200000
kafka-topics.sh --create --topic test006 --bootstrap-server kafka.kafka:9092 --partitions 2 --replication-factor 2 --config retention.ms=259200000
kafka-topics.sh --create --topic test007 --bootstrap-server kafka.kafka:9092 --partitions 2 --replication-factor 2 --config retention.ms=259200000# 查看分区情况
kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092  --topic test001

在这里插入图片描述

2)查看分区大小

# 显示所有的topic详情
kafka-log-dirs.sh --describe --bootstrap-server kafka.kafka:9092
# 只显示test001信息
kafka-log-dirs.sh --describe --bootstrap-server kafka.kafka:9092 --topic-list test001

在这里插入图片描述
数据格式化:

{"version": 1,"brokers": [{"broker": 2,"logDirs": [{"logDir": "/bitnami/kafka/data","error": null,"partitions": [{"partition": "test001-0","size": 380,"offsetLag": 0,"isFuture": false},{"partition": "test001-2","size": 198,"offsetLag": 0,"isFuture": false},{"partition": "test001-1","size": 190,"offsetLag": 0,"isFuture": false}]}]},{"broker": 1,"logDirs": [{"logDir": "/bitnami/kafka/data","error": null,"partitions": [{"partition": "test001-0","size": 380,"offsetLag": 0,"isFuture": false},{"partition": "test001-2","size": 198,"offsetLag": 0,"isFuture": false},{"partition": "test001-1","size": 190,"offsetLag": 0,"isFuture": false}]}]},{"broker": 0,"logDirs": [{"logDir": "/bitnami/kafka/data","error": null,"partitions": [{"partition": "test001-0","size": 380,"offsetLag": 0,"isFuture": false},{"partition": "test001-2","size": 198,"offsetLag": 0,"isFuture": false},{"partition": "test001-1","size": 190,"offsetLag": 0,"isFuture": false}]}]}]
}

3)编写 move-json-file.json,生成执行计划

move-json-file.json 这个文件就是告知想对哪些Topic进行重新分配的计算。

【示例一】分区迁移
{
"topics": [{
"topic": "test002"
}],
"version": 1
}
# 查看分区
kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092  --topic test002
# 查看分区大小
kafka-log-dirs.sh --describe --bootstrap-server kafka.kafka:9092 --topic-list test002

开始执行

# 当前topic在,0节点,迁移到1节点
kafka-reassign-partitions.sh --bootstrap-server kafka.kafka:9092 --topics-to-move-json-file /tmp/move-json-file.json --broker-list "1" --generate# 输出信息:生成了两条信息,第一条为现在的分配情况,第二条为计划更改的内容
# 当前:Current partition replica assignment
{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[0],"log_dirs":["any"]}]}
# 迁移:Proposed partition reassignment configuration
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[1],"log_dirs":["any"]}]}

把计划修改的结果复制,放在第二个json文件中,这里取名为reassignment-json-file.json

【注意】现在还没真正迁移,只是输出迁移信息。可以执行查看就知道了。
kafka-topics.sh --describe --bootstrap-server kafka.kafka:9092 --topic test002

{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[1],"log_dirs":["any"]}]}

【温馨提示】–broker-list “1”:扩容后的所有机器的broker.id。

4)开始迁移

运行kafka-reassign-partition.sh命令根据上述执行计划生成的结果进行分配,命令如下:

echo '{"version":1,"partitions":[{"topic":"test002","partition":0,"replicas":[1],"log_dirs":["any"]}]}' >/tmp/reassignment-json-file.jsonkafka-reassign-partitions.sh --bootstrap-server kafka.kafka:9092 -reassignment-json-file /tmp/reassignment-json-file.json -execute

在这里插入图片描述

【示例二】磁盘间、不同路径分区迁移
{"version": 1,"partitions": [{"topic": "test01","partition": 2,"replicas": [0],"log_dirs": ["/data1"]}, {"topic": "test01","partition": 1,"replicas": [0],"log_dirs": ["/data2"]}]
}

version:固定值 1

开始执行迁移

kafka-reassign-partitions.sh --zookeeper --bootstrap-server kafka.kafka:9092 --reassignment-json-file config/move-json-file.json --execute --bootstrap-server
kafka.kafka:9092 --execute --replica-alter-log-dirs-throttle 10000 --throttle 50000000

参数讲解:

  • --replica-alter-log-dirs-throttle:需要注意的是,如果你迁移的时候包含 副本跨路径迁移(同一个Broker多个路径)那么这个限流措施不会生效,你需要再加上 --replica-alter-log-dirs-throttle 这个限流参数,它限制的是同一个Broker不同路径直接迁移的限流。
  • --throttle 50000000:那么执行移动分区的时候,会被限制流量在50000000 B/s

kafka 磁盘扩容与数据均衡实在操作讲解就先到这里了,有任何疑问也可关注我公众号:大数据与云原生技术分享,进行技术交流,如本篇文章对您有所帮助,麻烦帮忙一键三连(点赞、转发、收藏)~

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/194690.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言基础】分享近期学习到的volatile关键字、__NOP__()函数以及# #if 1 #endif

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…

IIC 实验

IIC 简介 IIC(Inter-Integrated Circuit)总线是一种由 PHILIPS 公司开发的两线式串行总线&#xff0c;用于连接微 控制器以及其外围设备。它是由数据线 SDA 和时钟线 SCL 构成的串行总线&#xff0c;可发送和接收数 据&#xff0c;在 CPU 与被控 IC 之间、IC 与 IC 之间进行双…

记录基于scapy构造ClientHello报文的尝试

最近有个需求就是用scapy构造https的client hello报文&#xff0c;由用户指定servername构造对应的报文。网上对于此的资料甚少&#xff0c;有的也是怎么去解析https报文&#xff0c;但是对于如果构造基本上没有找到相关的资料。 一直觉得最好的老师就是Python的help功能和dir功…

AOT:一个.Net 8最牛逼和最受欢迎关注的功能!

这次.Net 8发布&#xff0c;更新了诸多功能&#xff0c;但从各个编程社区看到大家讨论和交流最多的&#xff0c;还是AOT这个功能。 AOT本身在.Net 7就开始引入了&#xff0c;但这次.Net 8做了诸多更新&#xff1a; 1、增加了macOS 平台的 x64 和 Arm64 体系结构的支持&#x…

02.接口隔离原则(Interface Segregation Principle)

一言 客户端不应该依赖它不需要的接口&#xff0c;即一个类对另一个类的依赖应该建立在最小的接口上。 为什么要有接口隔离原则 反例设计 反例代码 public class Segregation1 { }interface Interface1 {void operation1();void operation2();void operation3();void opera…

VBA之Word应用:文档(Document)的书签

《VBA之Word应用》&#xff08;版权10178982&#xff09;&#xff0c;是我推出第八套教程&#xff0c;教程是专门讲解VBA在Word中的应用&#xff0c;围绕“面向对象编程”讲解&#xff0c;首先让大家认识Word中VBA的对象&#xff0c;以及对象的属性、方法&#xff0c;然后通过实…

Java入门篇 之 抽象类接口

本篇碎碎念&#xff1a;个人认为压力是一种前进的动力&#xff0c;但是不要有太多压力&#xff0c;不然会使心情烦躁&#xff0c;会控制不住自己的情绪&#xff0c;会在一个临界值爆发&#xff0c;一旦爆发&#xff0c;将迟迟不能消散 今日份励志文案: 努力的背后必有加倍的赏赐…

【算法每日一练]-分块(保姆级教程 篇1)POJ3648

插讲一下分块 题目&#xff1a;&#xff08;POJ 3648&#xff09; 一个简单的整数问题 前缀和往往用于静态的不会修改的区间和。遇到经常修改的区间问题&#xff0c;就要用分块或线段树来维护了。 分块算法是优化后的暴力&#xff0c;分块算法有时可以维护一些线段树维护不了的…

mysql的行列互转

mysql的行列互转 多行转多列思路实现代码 多列转多行思路代码 多行转多列 多行转多列&#xff0c;就是数据库中存在的多条具有一定相同值的行数据&#xff0c;通过提取相同值在列头展示来实现行数据转为列数据&#xff0c;一般提取的值为枚举值。 思路 转换前表样式 -> 转…

《洛谷深入浅出基础篇》 P5250 木材仓库————集合应用实例

上链接&#xff1a; P5250 【深基17.例5】木材仓库 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn)https://www.luogu.com.cn/problem/P5250上题干&#xff1a; 题目描述 博艾市有一个木材仓库&#xff0c;里面可以存储各种长度的木材&#xff0c;但是保证没有两个木材的长度是…

YOLOv8改进 | 2023 | InnerIoU、InnerSIoU、InnerWIoU、FoucsIoU等损失函数

论文地址&#xff1a;官方Inner-IoU论文地址点击即可跳转 官方代码地址&#xff1a;官方代码地址-官方只放出了两种结合方式CIoU、SIoU 本位改进地址&#xff1a; 文末提供完整代码块-包括InnerEIoU、InnerCIoU、InnerDIoU等七种结合方式和其Focus变种 一、本文介绍 本文给…

LeetCode【13】罗马数字转整数

题目&#xff1a; 思路&#xff1a; 第十二题的逆运算&#xff0c;方法同理。需要注意的是IV、IX、XL、XC、CD、CM这六种特殊的情况。正常情况下每个字符找到对应的数值累加&#xff0c;这六种特殊字符都是左边的数值比右边的数值小。 这里以IV举例&#xff0c;IV对应数字是1和…