【shell-10】shell实现的各种kafka脚本

kafka-shell工具

    • 背景
    • 日志 log
    • 一.启动kafka->(start-kafka)
    • 二.停止kafka->(stop-kafka)
    • 三.创建topic->(create-topic)
    • 四.删除topic->(delete-topic)
    • 五.获取topic列表->(list-topic)
    • 六. 将文件数据 录入到kafka->(file-to-kafka)
    • 七.将kafka数据 下载到文件->(kafka-to-file)
    • 八. 查看topic的groupID消费情况->(list-group)

背景

注意:我用的kafka版本是 3.2.1 其他版本kafka提供的 命令行可能有细微区别。
因为经常要用kafka环境参与测试,所以写了不少脚本。在很多时候可以大大提高测试的效率。
主要包含如下功能:
topic的管理【创建,删除】
topic信息查看【topic列表,topic groupid 消费情况】
topic数据传输【file数据录入到topic,topic数据下载到本地文件】
脚本中做了各种检查,日志的输出做了颜色区分,用起来没啥问题。

日志 log

此文件是个额外的日志文件主要用于打印日志,该文件会被下面的shell文件引用

#!/bin/bash
#日志级别 debug-1, info-2, warn-3, error-4, always-5
LOG_LEVEL=2#调试日志
function log_debug(){content="$(date '+%Y-%m-%d %H:%M:%S') [DEBUG]: $@"[ $LOG_LEVEL -le 1  ] && echo -e "\033[32m"  ${content}  "\033[0m"
}
#信息日志
function log_info(){content="$(date '+%Y-%m-%d %H:%M:%S') [INGO]: $@"[ $LOG_LEVEL -le 2  ] && echo -e "\033[32m"  ${content} "\033[0m"
}
#警告日志
function log_warn(){content="$(date '+%Y-%m-%d %H:%M:%S') [WARN] $@"[ $LOG_LEVEL -le 3  ] && echo -e "\033[33m" ${content} "\033[0m"
}
#错误日志
function log_err(){content="$(date '+%Y-%m-%d %H:%M:%S') [ERROR]: $@"[ $LOG_LEVEL -le 4  ] && echo -e "\033[31m" ${content} "\033[0m"
}
~  

一.启动kafka->(start-kafka)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/logpid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
log_info "Start checking kafka process"
if [ -z $pid ]; thenlog_info "The kafka process does not exist, startting.........................................................................................."
elselog_warn "The kafka process exists and does not need to be started"exit 1
fi
nohup kafka-server-start.sh /home/kafka/kafka_2.12-3.2.1/config/server.properties >>/home/kafka/kafka.log 2>&1 &
# 日志的路径是安装kafka的时候指定的,也要替换成自己的路径
tail -f 20 /home/kafka/kafka.log

二.停止kafka->(stop-kafka)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log
log_info "Start checking kafka process"
pid=`ps -aux | grep /home/kafka/kafka_2.12-3.2.1/bin/ | grep -v grep |awk '{print$2}'`
if [ -z $pid ]; thenlog_warn "The kafka process does not exist and does not need to be stopped"exit 1
elselog_info "The kafka process alive, stopping.............................................................................................................."
fi
kafka-server-stop.sh
log_info "Stop kafka success"

三.创建topic->(create-topic)

下面代码中的路径你要替换成自己的路径

#!/bin/bash
source /home/shell/log
log_info "脚本功能: 创建topic"
log_info "脚本参数: topic名称(必选)"
if [ $# -ne 1 ]; thenlog_err "错误:请传入topic名称"exit 1
fi
#TOPIC名称
TOPIC_NAME=$1
#KAFKA地址
KAFKA_BROKER=ip:9092
# 检查Kafka主题是否存在, 若已存在则放弃创建
if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$TOPIC_NAME$";thenlog_warn "$TOPIC_NAME 已经存在,放弃创建"
else# 默认1副本,3分区kafka-topics.sh --create --bootstrap-server $KAFKA_BROKER --replication-factor 1 --partitions 3 --topic $TOPIC_NAMElog_info "请执行topic-list检查创建是否成功"
fi
~     

在这里插入图片描述

四.删除topic->(delete-topic)

下面代码中的路径你要替换成自己的路径

#!/bin/bashsource /home/shell/log
log_info "脚本作用:删除topic"
log_info "脚本参数: 支持多个topic,用空格分开,可以批量删除"
KAFKA_BROKER=ip:9092
function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$local_topic_name$";thenlog_info "$local_topic_name存在->true"return 0  # 返回true  elselog_warn "$local_topic_name 不存在->false"return 1  # return falsefi
}# 逐个删除topic
for topic in "$@"
doif ! check_kafka_topic $topic; thenlog_info "tpoic->$topic 不存在,跳过删除行为"continueelselog_info "topic->$topic 执行删除"kafka-topics.sh --delete --bootstrap-server $KAFKA_BROKER --topic $topiclog_info "topic->$topic 删除成功"fi
done

在这里插入图片描述

五.获取topic列表->(list-topic)

#!/bin/bash
source /home/shell/log
KAFKA_BROKER=ip:9092  
log_info "脚本作用: 列出topic信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic信息)"
if [ $# -eq 1 ]; thenlog_info "目标$1 详情如下"kafka-topics.sh --describe  --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets" | grep $1
elselog_info "所有topic 列表如下:"kafka-topics.sh --describe  --bootstrap-server $KAFKA_BROKER | grep -v "__consumer_offsets"
fi

在这里插入图片描述

六. 将文件数据 录入到kafka->(file-to-kafka)

#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将文件中的数据录入指定topic"
log_info "脚本参数: 1.文件路劲(必选) 2.topic(必选)"
log_info "参数校验"
log_info "执行条件检查.........................................................................................................."
if [ $# -ne 2 ]; thenlog_err "必须传入两个参数: 1.文件路劲(必选) 2.topic(必选)"exit 1
fiif ! [ -f $1 ]; thenlog_err "$1不是一个有效的数据文件"exit 1
fiFILE_PATH=$1
TOPIC_NAME=$2
KAFKA_BROKER=ip:9092  #检查topic是否存在
function check_kafka_topic() {local local_KAFKA_BROKER=$1if kafka-topics.sh --bootstrap-server $KAFKA_BROKER  --list | grep -q "^$local_KAFKA_BROKER$";thenreturn 0  # 返回true  elsereturn 1  # return falsefi
}#将文件数据推送到kafka
function send_to_kafka(){local local_path=$1local count=0while IFS= read -r line; do  kafka-console-producer.sh --broker-list $KAFKA_BROKER --topic $TOPIC_NAME <<< "$line"  count=$((count+1))done < "$local_path"echo $count
}        if ! check_kafka_topic $TOPIC_NAME;thenlog_err "条件检查不通过, 原因: topic->$TOPIC_NAME不存在, 请先创建topic"exit 1
filog_info "参数检查通过.........................................................................................................."
start_time=`date "+%Y-%m-%d %H:%M:%S"`
start_seconds=$(date -d "$start_time" +%s)log_info "开始录入数............................................................................................................"
count=$(send_to_kafka $FILE_PATH)end_time=`date "+%Y-%m-%d %H:%M:%S"`
end_seconds=$(date -d "$end_time" +%s)
time_diff=$((end_seconds - start_seconds))  log_info "录入条数: $count"
log_info "花费时间:$time_diff 秒"
log_info "录入完成.............................................................................................................."

在这里插入图片描述

七.将kafka数据 下载到文件->(kafka-to-file)

#!/bin/bash
source /home/shell/log
log_info "脚本作用: 将kafka指定topic的数据消费到指定文件中"
log_info "脚本参数:1.数据文件路径(必选) 2.topic名称(必选) 3.groupID(可选->不存在则从头消费,存在则从grooupID offset 开始消费)"
log_info "group-list 脚本可以查看当前的"
# Kafka的bin目录  
KAFKA_BIN_DIR=/path/to/kafka/bin#kafka 地址  
KAFKA_SERVER=ip:9092 # Kafka的配置文件目录  
KAFKA_CONFIG_DIR=/home/kafka/kafka_2.12-3.2.1/config# Kafka消费者配置文件  
CONSUMER_CONFIG=$KAFKA_CONFIG_DIR/consumer.properties# 指定要消费的主题  
TOPIC_NAME=your_topic_name# 指定要写入的文件 
FILE_PATH=$1
TOPIC_NAME=$2
GROUP_ID=$3log_info "执行检察............................................................................................................................"function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $KAFKA_SERVER  --list | grep -q "^$local_topic_name$";thenreturn 0  # 返回true  elsereturn 1  # return falsefi
}if ! check_kafka_topic $TOPIC_NAME;thenlog_err "topic->$TOPIC_NAME 未找到"exit 1
fi
log_info "检查通过............................................................................................................................"log_info "当前topic,所有groupID的消费情况如下>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"
while IFS= read -r line; doif [[ $line == *"PARTITION"* ]]; thencontent="$(date '+%Y-%m-%d %H:%M:%S') [INFO] $line"echo -e "\033[45m" ${content} "\033[0m"else  log_info "$line"fi
done< <(kafka-consumer-groups.sh --bootstrap-server $KAFKA_SERVER --describe  --all-groups | grep -v '__consumer_offsets' | grep "$TOPIC_NAME\|PARTITION")log_info "当前topic,所有groupID的消费情况输出完成>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"log_info "消费进程运行中( CTRL+C 可退出消费 )................................................................................................."
# 运行消费者脚本并将输出重定向到文件  
if [ $# -eq 2 ]; thenkafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning > $FILE_PATH
fi
if [ $# -eq 3 ]; thenkafka-console-consumer.sh --bootstrap-server $KAFKA_SERVER --topic $TOPIC_NAME --from-beginning --group $GROUP_ID > $FILE_PATH
fi

在这里插入图片描述

八. 查看topic的groupID消费情况->(list-group)

#!/bin/bash
kafka_broker=ip:9092
source /home/shell/log
log_info "脚本功能: 查看topic的groupID信息"
log_info "脚本参数: topic名称(可选->未指定topic则列出所有topic的groupID信息)"
function check_kafka_topic() {local local_topic_name=$1if kafka-topics.sh --bootstrap-server $kafka_broker  --list | grep -q "^$local_topic_name$";thenlog_info "$local_topic_name存在->true"return 0  # 返回true  elselog_warn "$local_topic_name 不存在->false"return 1  # return falsefi
}if [ $# -eq 1 ]; thenif ! check_kafka_topic $1; then#topic 不存在则直接退出程序log_warn "topic=$1, 不存在"exit 1filog_info "topic_name=$1 的gruoupID信息如下:"kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep $1 | grep -v __consumer_offsets
elselog_info "所有groupID信息如下:"kafka-consumer-groups.sh --bootstrap-server $kafka_broker --describe --all-groups | grep -v __consumer_offsets
fi

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/433754.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s-基础知识(Service,NodePort,CusterIP,无头服务,NameSpace,资源限制)

Node Node 是 Pod 真正运行的主机&#xff0c;可以是物理机&#xff0c;也可以是虚拟机。 Annotations 原文链接 Annotations 是 key/value 形式附加于对象的注解。不同于 Labels 用于标志和选择对象&#xff0c;Annotations 则是用来记录一些附加信息&#xff0c;用来辅助应…

Docker部署思维导图工具SimpleMindMap并实现公网远程访问

文章目录 1. Docker一键部署思维导图2. 本地访问测试3. Linux安装Cpolar4. 配置公网地址5. 远程访问思维导图6. 固定Cpolar公网地址7. 固定地址访问 SimpleMindMap 是一个可私有部署的web思维导图工具。它提供了丰富的功能和特性&#xff0c;包含插件化架构、多种结构类型&…

如何实现无公网IP实现远程访问MongoDB文件数据库

&#x1f4d1;前言 本文主要是如何实现无公网IP实现远程访问MongoDB文件数据库的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x…

基于Matlab/Simulink直驱式风电储能制氢仿真模型

接着还是以直驱式风电为DG中的研究对象&#xff0c;上篇博客考虑的风电并网惯性的问题&#xff0c;这边博客主要讨论功率消纳的问题。 考虑到风速是随机变化的&#xff0c;导致风电输出功率的波动性和间歇性问题突出&#xff1b;随着其应用规模的不断扩大以及风电在电网中渗透率…

uniapp小程序:内存超过2mb解决方法(简单)message:Error: 上传失败:网络请求错误 代码包大小超过限制。

分析&#xff1a;这种情况是代码文件内存超过2mb无法进行预览上传 解决方法&#xff1a; 1、Hbuilder中点击运行-->运行到小程序模拟器--->运行时是否压缩代码 2、在微信小程序中点击详情--->本地设置&#xff1a; 3、点击预览即可运行了

Elment UI的el-table-column表头旁边有点击按钮类似的操作

Elment UI的el-table-column表头旁边有点击按钮类似的操作 <el-table-column fixed"right" label"操作" ><!-- 表头 --> {{-- <template slot"header" header"scope">--}} {{-- <span…

【机组】单元模块的软件简介和安装

​&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《机组 | 模块单元实验》⏰诗赋清音&#xff1a;云生高巅梦远游&#xff0c; 星光点缀碧海愁。 山川深邃情难晤&#xff0c; 剑气凌云志自修。 目录 【软件简介和安装】 1 性能特…

Spring Boot 中 Service 层依赖注入问题

目录 问题描述 产生错误 问题原因 解决方法 手动注入方法 1、使用工具集 hutool&#xff0c;引入 Maven 依赖 2、编写 SpringUtil 工具类 问题描述 Controller 层方法为 static 静态&#xff0c;引入 Service 层时使用 Autowired 注解自动装配&#xff0c;Controller层方…

通过LiveNVR实现海康大华华为宇视等监控摄像头在服务器上录像存储,并web无插件直播和回放

支持云端录像服务器上面集中录像存储在部署LiveNVR的服务器上面 1、流媒体服务软件2、配置开启录像(云端录像)3、录像回看(云端录像)3.1、查看录像3.1.1、时间轴视图3.1.2、列表视图 4、云端录像相关接口5、如何分享时间轴录像回看&#xff1f;6、iframe集成示例7、RTSP/HLS/FL…

CPU,内存和硬盘之间的关系

计算机三大件&#xff1a;CPU&#xff0c;内存&#xff0c;硬盘。从运算速度来看&#xff0c;CPU>内存>固态硬盘>机械硬盘。 电脑卡顿怎么解决&#xff1f; 1、清理垃圾&#xff1b; 2、释放C盘空间&#xff0c;因为系统需要C盘空间当作虚拟内存&#xff1b; 3、增…

AF700 NHS 酯,AF 700 Succinimidyl Ester,一种明亮且具有光稳定性的近红外染料

AF700 NHS 酯&#xff0c;AF 700 Succinimidyl Ester&#xff0c;一种明亮且具有光稳定性的近红外染料&#xff0c;AF700-NHS-酯&#xff0c;具有水溶性和 pH 值不敏感性 您好&#xff0c;欢迎来到新研之家 文章关键词&#xff1a;AF700 NHS 酯&#xff0c;AF 700 Succinimid…

Genome-wide association studies in R

全基因组关联&#xff08;GWA&#xff09;研究扫描整个物种基因组&#xff0c;寻找多达数百万个SNPs与特定感兴趣特征之间的关联。值得注意的是&#xff0c;感兴趣的性状实际上可以是归因于群体的任何类型的表型&#xff0c;无论是定性的&#xff08;例如疾病状态&#xff09;还…