kafka用java收发消息

用java客户端代码来对kafka收发消息
具体代码如下

package com.cool.interesting.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;public class KafkaTest {private static final String BOOTSTRAP_SERVERS = "192.168.47.145:9092";private static final String TOPIC_NAME = "test";public static void main(String[] args) {// 生产者示例produceMessage();// 消费者示例consumeMessage();//从指定偏移量消费消息consumeOffsetMessage();}//生产者代码private static void produceMessage() {Properties props = new Properties();//acks是保证消息的发送机制,有以下几个值//acks = 0:表示生产端发送消息后立即返回,不等待broker端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。//acks = 1: 表示leader副本成功写入就会响应Producer,而无需等待ISR(同步副本)集合中的其他副本写入成功。这种方案提供了适当的持久性,保证了一定的吞吐量。默认值即是1。//acks = all或-1: 表示不仅要等leader副本成功写入,还要求ISR中的其他副本成功写入,才会响应Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。//调优建议:建议根据实际情况设置,如果要严格保证消息不丢失,请设置为all或-1;如果允许存在丢失,建议设置为1;一般不建议设为0,除非无所谓消息丢不丢失。props.put(ProducerConfig.ACKS_CONFIG,1);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//key和value序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//其他配置参数详见org.apache.kafka.clients.producer.ProducerConfig类try (Producer<String, String> producer = new KafkaProducer<>(props)) {for (int i = 0; i < 10; i++) {String message = "Message " + i;//异步发送Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC_NAME,  message));System.out.println("Sent message: " + message);}}}//正常消费者代码private static void consumeMessage() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test99");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅一个topicconsumer.subscribe(Collections.singletonList(TOPIC_NAME));while (true) {//设置kafak从broker拉取消息的超时时间// (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : records) {System.out.println("Received_message: " + record.value());}}}//指定偏移量开始消费private static void consumeOffsetMessage() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//订阅一个topicconsumer.subscribe(Collections.singletonList(TOPIC_NAME));//如果要指定偏移量,必须先poll一次,不然代码报错ConsumerRecords<String, String> poll = consumer.poll(0);System.out.println("poll:"+poll.isEmpty());//创建一个分区(参数为topic_name,和分区序号)TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);// 指定要消费的偏移量long offset = 3;//从指定偏移量开始消息消息consumer.seek(topicPartition, offset);while (true) {//设置kafka从broker拉取消息的超时时间// (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : records) {System.out.println("Received_message: " + record.value());}}}}

安装kafka的可视化工具:offset explorer
offset explorer 是一个用于查看和管理 Kafka 消费者组的工具,它允许你检查消费者组的偏移量(offset),并且可以查看每个消费者组在每个分区上的偏移量情况。这对于监控和调试 Kafka 消费者组非常有用。
下载地址为:https://www.kafkatool.com/download.html
如下图所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/704911.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue 之 后台管理系统的权限路由的管理

目录 前言实现理解三者的概念以及之间的关联账号&#xff08;用户&#xff09;角色菜单 用户权限授权相关概念实现代码实现登录跳转路由&#xff0c;路由守卫中进行权限验证按钮权限封装指令&#xff1a;调用&#xff08;其中一个页面参考&#xff09; 思路&#xff0c;操作流程…

景源畅信数字:做抖音切片的方法分享?

一提起抖音切片&#xff0c;很多人可能会想到那些让人眼前一亮的短视频。它们通常短小精悍&#xff0c;内容丰富多彩&#xff0c;能够迅速吸引观众的注意力。但是&#xff0c;如何制作出这样的切片视频呢?这就是我们今天要探讨的问题。 一、选材与剪辑 制作抖音切片&#xff0…

年度更新!统信UOS服务器版V20(1070)超越期待

不负广大客户期待&#xff01; 统信UOS服务器版V20&#xff08;1070&#xff09;年度首更 功能更强大、性能更卓越、生态更丰富 助您畅享安全、便捷、高效的产品和服务 新平台&#xff0c;新生态 统信UOS服务器版始终坚持进行生态适配&#xff0c;目前已支持超过百万种兼容…

某能源集团电力公司搭建数据报表中心,实现采集填报分析一体化

​在当今这个信息爆炸的时代&#xff0c;数据已成为企业最宝贵的财富&#xff0c;越来越多的企业开始重视数据的积累和归集。在企业日常生产和工作过程中&#xff0c;会产生绵延不断的数据&#xff0c;但这些数据往往没有统一的记录、归纳和整理&#xff0c;或者录入了系统却分…

电子邮箱是什么?付费电子邮箱和免费电子邮箱有什么区别?

注册电子邮箱前&#xff0c;有付费电子邮箱和免费电子邮箱两类选择。付费的电子邮箱和免费的电子邮箱有什么区别呢&#xff1f;区别主要在于存储空间、功能丰富度和售后服务等方面&#xff0c;本文将为您详细介绍。 一、电子邮箱是什么&#xff1f; 电子邮箱就是线上的邮局&a…

【传知代码】VRT: 关于视频修复的模型(论文复现)

前言&#xff1a;随着数字媒体技术的普及&#xff0c;制作和传播视频内容变得日益普遍。但是&#xff0c;视频中由于多种因素&#xff0c;例如传输、存储和录制设备等&#xff0c;经常出现质量上的问题&#xff0c;如图像模糊、噪声干扰和低清晰度等。这类问题对用户的体验和观…

【硬件模块】ESP-01SWiFi模块基于AT指令详解(WiFi,TCP/IP,MQTT)

ESP-01S ESP-01S是由安信可科技开发的一款Wi-Fi模块。其核心处理器是ESP8266&#xff0c;该处理器在较小尺寸的封装中集成了业界领先的Tensilica L106超低功耗32位微型MCU&#xff0c;带有16位精简模式&#xff0c;主频支持80MHz和160MHz&#xff0c;并集成了Wi-Fi MAC/BB/RF/P…

构建企业的多分支网络,你可以有这些选择

为企业构建稳定、灵活的网络&#xff0c;是企业IT人员非常重要的基础工作之一。对于多分支企业而言&#xff0c;总部与各分支之间需要进行数据互联和监管&#xff0c;所以大多面临组网需求。多分支企业组网是指企业总部与分公司、工厂、门店等多点之间的网络组建&#xff0c;不…

Gradio 案例——将 dicom 文件转为 nii文件

文章目录 Gradio 案例——将 dicom 文件转为 nii文件界面截图依赖安装项目目录结构代码 Gradio 案例——将 dicom 文件转为 nii文件 利用 SimpleITK 库&#xff0c;将 dicom 文件转为 nii文件更完整、丰富的示例项目见 GitHub - AlionSSS/dcm2niix-webui: The web UI for dcm2…

谷歌全力反击 OpenAI:Google I/O 2024 揭晓 AI 新篇章,一场激动人心的技术盛宴

&#x1f680; 谷歌全力反击 OpenAI&#xff1a;Google I/O 2024 揭晓 AI 新篇章&#xff0c;一场激动人心的技术盛宴&#xff01; 在这个人工智能的全新时代&#xff0c;只有谷歌能让你眼前一亮&#xff01;来自全球瞩目的 Google I/O 2024 开发者大会&#xff0c;谷歌用一场…

C++学习一(主要对cin的理解)

#include<iostream> int main() {int sum 0, value 0;//读取数据直到遇到文件尾&#xff0c;计算所有读入的值的和while (std::cin >> value){ //等价于sumsumvaluesum value;}std::cout << "Sum is :" << sum << std::endl;sum …

基于Springboot的学生心理压力咨询评判(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的学生心理压力咨询评判&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系…