kafka客户端生产者消费者kafka可视化工具(可生产和消费消息)

点击下载《kafka客户端生产者消费者kafka可视化工具(可生产和消费消息)》

1. 前言

因在工作中经常有用到kafka做消息的收发,每次调试过程中,经常需要查看接收的消息内容以及人为发送消息,从网上搜寻了一下,找到的工具大部分都是Kafka Tool,此工具功能还比较全面,但是操作起来个人认为并不是那么方便,于是,就萌生了自己写一个简单又好用的kafka客户端的想法。

以下为官方提供的Kafka tool:

在这里插入图片描述

可以看出传统的Kafka客户端工具与消费者相关的功能很完善,但是缺乏生产者相关功能,只能被动的查看已接收消息,且不能消费消息,这对于开发人员来讲有点难以接受。

2. 我写的Kafka工具

2.1 生产者

Kafka生产者是负责向Kafka发送消息的应用程序组件。在Kafka中,生产者负责创建消息并将其发送到Kafka集群中的特定主题(topic)中。生产者将消息封装为ProducerRecord对象,该对象包含消息的主题、分区、键和值等信息。

在这里插入图片描述

生产者客户端开发包括以下步骤:

  1. 配置生产者客户端参数:生产者需要指定broker的地址清单(bootstrap.servers),用于与Kafka集群建立连接。
  2. 创建生产者实例:根据配置参数,创建生产者实例。
  3. 构建待发送的消息:将要发送的消息封装为ProducerRecord对象,并指定要发送到的主题。
  4. 发送消息:使用生产者实例将消息发送到Kafka集群。
  5. 关闭生产者实例:在完成消息发送后,关闭生产者实例以释放资源。

Kafka生产者支持异步发送消息,允许应用程序继续执行其他任务而无需等待消息发送完成。如果发送消息失败,生产者会尝试重新发送消息几次,或者可以选择将错误传递给应用程序进行进一步处理。

2.1.1 连接Kafka

在这里插入图片描述

  • 输入Bootstrap Servers、UserName、Password即可连接kafka服务端。
  • 如果服务端未指定Passord,则不用输入Password。
  • 连接时将在Message中显示连接结果。

2.1.2 生产消息

在这里插入图片描述

连接成功后,即可生产消息,其中Topic、Message为必填项,Key可为空,Type指定消息内容的格式,可为文本或JSON格式数据。

以下为示例消息:

在这里插入图片描述

2.2 消费者

Kafka消费者是负责从Kafka主题(topic)中读取消息的应用程序组件。消费者订阅感兴趣的主题,并从该主题中拉取消息进行处理。

在Kafka中,消费者通过消费者组(Consumer Group)进行管理,消费者组是一组共享相同消费者组名的消费者实例。消费者组负责将主题中的消息分配给组内的各个消费者实例进行消费。当主题中的消息量较大时,通过消费者组的扩展,可以实现消息的分布式消费,提高系统的吞吐量和可用性。

在这里插入图片描述

消费者客户端开发包括以下步骤:

  1. 配置消费者客户端参数:指定broker的地址清单(bootstrap.servers),用于与Kafka集群建立连接。此外,还需要指定消费者的组名、自动提交偏移量(auto.commit.enable)等参数。
  2. 创建消费者实例:根据配置参数,创建消费者实例。
  3. 订阅主题:使用subscribe()方法订阅感兴趣的主题。消费者可以订阅一个或多个主题,也可以使用正则表达式订阅特定模式的主题。
  4. 处理消息:在消息拉取时,消费者从broker中读取消息,并进行处理。消费者可以使用pull模式或push模式来接收消息,其中pull模式允许消费者根据自身消费能力以适当速率消费消息,而push模式则由broker主动将消息推送给消费者。
  5. 提交偏移量:在处理完每条消息后,消费者需要提交偏移量(offset),以便在下次消费时从正确的位置继续读取。提交偏移量可以保证消息被可靠地处理且只被处理一次。
  6. 关闭消费者实例:在完成消息处理后,关闭消费者实例以释放资源。

2.2.1 连接Kafka

在这里插入图片描述

  • 输入Bootstrap Servers、UserName、Password即可连接kafka服务端。
  • 如果服务端未指定Passord,则不用输入Password。
  • 如果未指定GroupId,将会默认设置为default-consumer。
  • 连接时将在Message中显示连接结果。

2.2.2 消费消息

连接成功后,即可消费消息,其中Topic必填项。

以下为示例消息:

在这里插入图片描述

3. 特别说明

Kafka生产者和消费者工具是专门为处理Apache Kafka消息队列而设计的强大工具。这两个工具共同协作,一个用于将信息发送到Kafka集群,另一个用于从集群中接收信息。

Kafka生产者工具的主要作用是将应用程序产生的数据发送到Kafka集群的主题中。这个过程是异步的,允许应用程序在发送消息后继续执行其他任务。生产者还具有重试和错误处理机制,以确保消息在传输过程中不会丢失。

Kafka消费者工具则负责从主题中读取并处理这些消息。消费者可以并行地从多个分区读取消息,提高了处理大量数据的效率。此外,消费者还可以自动处理偏移量,以便在出现问题时能够重新开始消费。

这两个工具的优点在于它们提供了一种可靠且高效的方式来处理和传输大规模数据。通过Kafka生产者和消费者工具,应用程序可以轻松地与Kafka集群进行交互,从而实现实时数据处理和流分析等功能。

点击下载《kafka客户端生产者消费者kafka可视化工具(可生产和消费消息)》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/453393.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python常用pandas函数nlargest / nsmallest及其手动实现

目录 pandas库 Series和DataFrame nlargest和nsmallest 用法示例 代替方法 手动实现 模拟代码 pandas库 是Python中一个非常强大的数据处理库,提供了高效的数据分析方法和数据结构。它特别适用于处理具有关系型数据或带标签数据的情况,同时在时间序列分析方面也有着出…

银河麒麟 aarch64 Mysql环境安装

一、操作系统版本信息 组件版本操作系统Kylin V10 (SP3) /(Lance)-aarch64-Build23/20230324Kernel4.19.90-52.22.v2207.ky10.aarch64MySQLmysql-8.3.0JDK1.8.0_312 二、MySQL下载 官网下载地址:https://dev.mysql.com/downloads/mysql/ 三、MySQL 安装 3.1 删…

C# CAD界面-自定义工具栏(三)

运行环境 vs2022 c# cad2016 调试成功 一、引用 二、开发代码进行详细的说明 初始化与获取AutoCAD核心对象: Database db HostApplicationServices.WorkingDatabase;:这行代码获取当前工作中的AutoCAD数据库对象。在AutoCAD中,所有图形数…

MQ面试题整理(持续更新)

1. MQ的优缺点 优点:解耦,异步,削峰 缺点: 系统可用性降低 系统引入的外部依赖越多,越容易挂掉。万一 MQ 挂了,MQ 一挂,整套系统崩 溃,你不就完了?系统复杂度提高 硬生…

2024年Java面试题大全 面试题附答案详解,BTA内部面试题

基础篇 1、 Java语言有哪些特点 1、简单易学、有丰富的类库 2、面向对象(Java最重要的特性,让程序耦合度更低,内聚性更高) 阿里内部资料 基本类型 大小(字节) 默认值 封装类 6、Java自动装箱与拆箱 装箱就是…

python 时间计算器

第一个函数time_calculator可以根据用户的需求增加或减少时间。 第二个函数calculate_time_difference可以计算两个时间点之间的差异。这两个函数都是自动化测试和时间管理中非常有用的工具。 from datetime import datetime, timedeltadef time_calculator(start_date, days…

MySQL- 运维-分库分表-Mycat

一、Mycat概述 1、安装 2、概念介绍 二、Mycat入门 启动服务 三、Mycat配置 1、schema.xml 2、rule.xml 3、server.xml 四、Mycat分片 1、垂直分库 2、水平分表 五、Mycat管理及监控 1、Mycat原理 2、Mycat管理工具 (1)、命令行 (2&#…

PyTorch 2.2 中文官方教程(十)

使用整体追踪分析的追踪差异 原文:pytorch.org/tutorials/beginner/hta_trace_diff_tutorial.html 译者:飞龙 协议:CC BY-NC-SA 4.0 作者: Anupam Bhatnagar 有时,用户需要识别由代码更改导致的 PyTorch 操作符和 CUDA 内核的变化…

elasticsearch重置密码操作

安装es的时候需要测试这个url:http://127.0.0.1:9200/ 出现弹窗让我输入账号和密码。我第一次登录,没有设置过账号和密码, 解决方法是:在es的bin目录下打开cmd窗口,敲命令:.\elasticsearch-reset-password…

基于若依的ruoyi-nbcio流程管理系统自定义业务回写状态的一种新方法(二)

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码: https://gitee.com/nbacheng/ruoyi-nbcio 演示地址:RuoYi-Nbcio后台管理系统 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码: https://gitee.com/nbacheng/n…

openssl3.2 - use openssl cmd create ca and p12

文章目录 openssl3.2 - use openssl cmd create ca and p12概述笔记实验的openssl环境建立CA生成私钥和证书请求生成CA证书用CA签发应用证书用CA对应用证书进行签名将已经签名好的PEM证书封装为P12证书验证P12证书是否可用END openssl3.2 - use openssl cmd create ca and p12 …

YouTrack 用户登录提示 JIRA 错误

就算输入正确的用户名和密码,我们也得到了下面的错误信息: youtrack Cannot retrieve JIRA user profile details. 解决办法 出现这个问题是因为 YouTrack 在当前的系统重有 JIRA 的导入关联。 需要把这个导入关联取消掉。 找到后台配置的导入关联&a…