kafka 命令脚本说明以及在java中使用

一、命令行使用

1.1、topic 命令

1、关于topic,这里用window 来示例

bin\windows\kafka-topics.bat

在这里插入图片描述

2、创建 first topic,五个分区,1个副本

bin\windows\kafka-topics.bat  --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 --topic first

在这里插入图片描述
3、查看当前服务器中的所有 topic

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

在这里插入图片描述

4、查看 first 主题的详情

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic first

在这里插入图片描述
5、修改分区数**(注意:分区数只能增加,不能减少)**

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic first --partitions 6

在这里插入图片描述

6、删除 topic,该操作在winodw,会出现文件授权问题,日志可以在kafka的启动命令窗口中查看,只需要修改文件权限即可,如果出现这个问题,我们需要清空之前配置的 datakafka-logs 这两个文件中的内容,再次重新启动即可。

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic first

在这里插入图片描述

1.2、生产者命令行操作

1、关于查看操作生产者命令参数,这里用window 来示例

.\bin\windows\kafka-console-producer.bat

在这里插入图片描述

2、发送消息,这里发送了2次的数据,第一次是hello,第二次是world

.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic first

在这里插入图片描述

1.3、消费者命令行操作

1、关于查看操作生产者命令参数,这里用window 来示例

.\bin\windows\kafka-console-consumer.bat

在这里插入图片描述
在这里插入图片描述

2、接受消息,因为前面我们在发送消息的时候,消费者没有启动,所以第一次发的数据这里是收不到的,并没有存储到topic中

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic first

在这里插入图片描述

在这里插入图片描述

3、把主题中所有的数据都读取出来(包括历史数据),可以看到我们获取到了从消费者没有上线之前到上线之后的所有数据,一共6条。

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic first

在这里插入图片描述

1.4、脚本说明

项目Value
connect-standalone.sh用于启动单节点的Standalone模式的Kafka Connect组件。
connect-distributed.sh用于启动多节点的Distributed模式的Kafka Connect组件。
kafka-acls.sh脚本用于设置Kafka权限,比如设置哪些用户可以访问Kafka的哪些TOPIC的权限。
kafka-delegation-tokens.sh用于管理Delegation Token。基于Delegation Token的认证是一种轻量级的认证机制,是对SASL认证机制的补充。
kafka-topics.sh用于管理所有TOPIC。
kafka-console-producer.sh用于生产消息。
kafka-console-consumer.sh用于消费消息。
kafka-producer-perf-test.sh用于生产者性能测试
kafka-consumer-perf-test.sh用于消费者性能测试
kafka-delete-records.sh用于删除Kafka的分区消息,由于Kafka有自己的自动消息删除策略,使用率不高。
kafka-dump-log.sh用于查看Kafka消息文件的内容,包括消息的各种元数据信息、消息体数据。
kafka-log-dirs.sh用于查询各个Broker上的各个日志路径的磁盘占用情况。
kafka-mirror-maker.sh用于在Kafka集群间实现数据镜像。
kafka-preferred-replica-election.sh用于执行Preferred Leader选举,可以为指定的主题执行更换Leader的操作。
kafka-reassign-partitions.sh用于执行分区副本迁移以及副本文件路径迁移。
kafka-run-class.sh用于执行任何带main方法的Kafka类。
kafka-server-start.sh用于启动Broker进程。
kafka-server-stop.sh用于停止Broker进程。
kafka-streams-application-reset.sh用于给Kafka Streams应用程序重设位移,以便重新消费数据。
kafka-verifiable-producer.sh用于测试验证生产者的功能。
kafka-verifiable-consumer.sh用于测试验证消费者功能。
trogdor.sh是Kafka的测试框架,用于执行各种基准测试和负载测试。
kafka-broker-api-versions.sh脚本主要用于验证不同Kafka版本之间服务器和客户端的适配性

1.5、关闭kafka

1、一定要先关闭 kafka,再关闭zookeeper,否则容易出现数据错乱

如果出现数据错错乱,最简单的方法就是清空data和kafka-logs 这两个文件下的内容,重新启动即可

2、关闭

.\bin\windows\kafka-server-stop.bat
.\bin\windows\zookeeper-server-stop.bat

在这里插入图片描述

1.6、选择分区数及kafka性能测试

1、主要工具是 kafka-producer-perf-test.batkafka-consumer-perf-test.bat 两个脚本,可以参考 kafka如何选择分区数及kafka性能测试

二、java 使用

2.1、使用原生客户端

1、依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>

2、发送和消费消息,具体代码如下:

public class KafkaConfig {public static void main(String[] args) {// 声明主题String topic = "first";// 创建消费者Properties consumerConfig = new Properties();consumerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"boot-kafka");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);// 订阅主题并循环拉取消息kafkaConsumer.subscribe(Arrays.asList(topic));new Thread(new Runnable() {@Overridepublic void run() {while (true){ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));for(ConsumerRecord<String, String> record:records){System.out.println(record.value());}}}}).start();// 创建生产者Properties producerConfig = new Properties();producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG,"boot-kafka-client");producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer<>(producerConfig);// 给主题发送消息producer.send(new ProducerRecord<>(topic, "hello,"+System.currentTimeMillis()));}
}

2.2、使用springBoot

1、依赖

 <!-- 不使用kafka的原始客户端,使用spring集成的,这样比较方便  --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><!-- 可以不用指定,springBoot 会帮我们选择,如果有特殊需求,可以更改 --><!--            <version>3.0.2</version>--></dependency>

2、配置文件

server:port: 7280servlet:context-path: /thermal-emqx2kafkashutdown: gracefulspring:application:name: thermal-api-demonstration-tdenginelifecycle:timeout-per-shutdown-phase: 30smvc:pathmatch:matching-strategy: ant_path_matcher  # 不然spring boot 2.6以后的版本 和 swagger 会出现 问题,可以参考 https://blog.csdn.net/qq_41027259/article/details/125747298kafka:bootstrap-servers: 127.0.0.1:9092  # 192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094  连接的 Kafka Broker 主机名称和端口号#properties.key-serializer: # 用于配置客户端的附加属性,对于生产者和消费者都是通用的,。 org.apache.kafka.common.serialization.StringSerializerproducer: # 生产者retries: 3 # 重试次数#acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)#batch-size: 16384 # 一次最多发送数据量#buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # 消费者group-id: test-consumer-group #默认的消费组ID,在Kafka的/config/consumer.properties中查看和修改#enable-auto-commit: true # 是否自动提交offset#auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)#auto-offset-reset: latest  #earliest,latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、发送消息

package cn.jt.thermalemqx2kafka.kafka.controller;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;/*** @author GXM* @version 1.0.0* @Description TODO* @createTime 2023年08月17日*/
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/mock")public String sendKafkaMessage() {Map<String, Object> data = new HashMap<>(2);data.put("id", 1);data.put("name", "gkj");kafkaTemplate.send("first", JSON.toJSONString(data));return "ok";}
}

4、接受消息

package cn.jt.thermalemqx2kafka.kafka.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author GXM* @version 1.0.0* @Description TODO* @createTime 2023年08月17日*/
@Slf4j
@Component
public class KafkaListener {@org.springframework.kafka.annotation.KafkaListener(topics = "first")private void handler(String content) {log.info("consumer received: {} ", content);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/102623.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis从基础到进阶篇(三)----架构原理与集群演变

目录 一、缓存淘汰策略 1.1 LRU原理 1.2 案例分析 1.3 Redis缓存淘汰策略 1.3.1 设置最⼤缓存 1.3.2 淘汰策略 二、Redis事务 2.1 Redis事务典型应⽤—Redis乐观锁 2.2 Redis事务介绍 2.3 事务命令 2.3.1 MULTI 2.3.2 EXEC 2.3.3 DISCARD 2.3.4 WATCH 2.3.5 UNW…

实时操作系统Freertos开坑学习笔记:(八):信号量、事件标志组、任务通知机制

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、信号量的简介1.信号量与队列的区别&#xff1f; 二、二值信号量及其实例1.什么是二值信号量2.二值信号量相关API函数3.二值信号量实例 三、计数型信号量四、…

vscode宏键绑定

开发语言php 实现输入[ 得到 [];的效果 [win]ctrlp,[mac]superp 输入>keyboard 选择 在json文件里增加(目前有缺陷,sublime的设置是比较完美的.或者phpstorm默认不需要配置): {"key": "[","command": "editor.action.insertSnippet&…

Spark2x原理剖析(二)

一、概述 基于社区已有的JDBCServer基础上&#xff0c;采用多主实例模式实现了其高可用性方案。集群中支持同时共存多个JDBCServer服务&#xff0c;通过客户端可以随机连接其中的任意一个服务进行业务操作。即使集群中一个或多个JDBCServer服务停止工作&#xff0c;也不影响用…

【设计模式】一、设计模式七大原则

文章目录 设计模式概述设计模式七大原则设计模式的目的设计模式七大原则1. 单一职责原则2. 接口隔离原则3. 依赖倒转(倒置)原则4. 里氏替换原则5. 开闭原则&#xff08;Open-Closed Principle简称OCP原则&#xff09;6. 迪米特法则7. 合成复用原则&#xff08;Composite Reuse …

【教程】安防监控/视频存储/视频汇聚平台EasyCVR接入智能分析网关V4的操作步骤

TSINGSEE青犀AI边缘计算网关硬件 —— 智能分析网关目前有5个版本&#xff1a;V1、V2、V3、V4、V5&#xff0c;每个版本都能实现对监控视频的智能识别和分析&#xff0c;支持抓拍、记录、告警等&#xff0c;每个版本在算法模型及性能配置上略有不同。硬件可实现的AI检测包括&am…

【Two Stream network (Tsn)】(二) 阅读笔记

贡献 将深度神经网络应用于视频动作识别的难点&#xff0c;是如何同时利用好静止图像上的 appearance information以及物体之间的运动信息motion information。本文主要有三点贡献&#xff1a; 1.提出了一种融合时间流和空间流的双流网络&#xff1b; 2.证明了直接在光流上训…

生物通路数据库收录1600+整合的经典通路

生物通路数据库为科学家提供了关于生物通路的大量信息和资源&#xff0c;特别是在数据整合、信息检索、数据可视化分析、数据交互、生物学研究等方面&#xff0c;积极推动了生物学研究和科学的发展。 世界各地正在创建各种类型的通路数据库&#xff0c;每个数据库都反映了其创…

C++将派生类赋值给基类

在 C/C++ 中经常会发生数据类型的转换,例如将 int 类型的数据赋值给 float 类型的变量时,编译器会先把 int 类型的数据转换为 float 类型再赋值;反过来,float 类型的数据在经过类型转换后也可以赋值给 int 类型的变量。 数据类型转换的前提是,编译器知道如何对数据进行取舍…

bean的管理-bean的获取

获取bean 默认情况下&#xff0c;在Spring项目启动时&#xff0c;会把bean都创建好&#xff08;但是还会受到作用域及延迟初始化的影响&#xff09;放在IOC容器中&#xff0c;如果想主动获取这些bean&#xff0c;可以通过如下方式 根据name获取bean Object getBean&#xff08…

Text-to-SQL小白入门(四)指令进化大模型WizardLM

摘要 本文主要对大模型WizardLM的基本信息进行了简单介绍&#xff0c;展示了WizardLM取得的优秀性能&#xff0c;分析了论文的核心——指令进化方法。 论文概述 基本信息 英文标题&#xff1a;WizardLM: Empowering Large Language Models to Follow Complex Instructions中…