kafka(三)springboot集成kafka(1)介绍

一、相关组件介绍

1、pom:
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>
2、kafkaProducer

produce的发送主要流程概述如下:

  1. 拦截器对发送的消息拦截处理;

  2. 获取元数据信息;

  3. 序列化处理;

  4. 分区处理;

  5. 批次添加处理;

  6. 发送消息。

 

3、 KafkaConsumer

二、生产者发送消息类型

1、同步发送消息

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}
2、异步发送消息
2.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
2.2、带回调函数的异步发送

 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}
3、发送顺序消息 

三、消费者接收消息

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {// 1. 创建消费者配置对象Properties properties = new Properties();// 2. 给消费者配置对象添加参数(不同于生产者,消费者有 4个必要的配置参数)//  broker的ip地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 配置  反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//配置消费者组(组名必须)properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");// 3. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 注册消费主题ArrayList<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);// 4.调用方法消费数据// 如果kafka集群没有新数据会造成空转// 填写参数为时间,如果没有拉取数据,线程睡眠一会while (true) {// 设置1s中消费的一批数据// Duration.ofSeconds(1)不会导致空转,拉取不到的时候睡眠1sConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 打印消费数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());}}//5.关闭资源
//        consumer.close();不使用的原因是,已关闭进程,就不会再消费数据了,进程停止就以为着JVM为断电了,不再工作}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/513927.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【爬虫】单首音乐的爬取(附源码)

以某狗音乐为例 import requests import re import time import hashlibdef GetResponse(url):# 模拟浏览器headers {User-Agent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0}# 发送请求…

Android开发环境搭建

第一步&#xff1a;Android Studio官网下载 官网&#xff1a;下载 Android Studio 和应用工具 - Android 开发者 | Android Developers (google.cn) 第二步&#xff1a;Android Studio安装 打开下载好的安装包。 改一下安装路径 点击ok即可。 点击next 继续next的 继续next…

android音视频编解码,你有过迷茫吗

3-5年的Android工程师最容易遇到的4个瓶颈是什么&#xff1f; 1.原理认知浅 工作内容多是简单UI界面开发和第三方SDK整合&#xff0c;对原理层和底层开发了解不深 2.技术视野窄 长期在小型软件公司&#xff0c;外包公司工作&#xff0c;技术视野被限制的太厉害 3.薪资提升…

flutter弹窗输入,Android学习的三个终极问题及学习路线规划

题库非常全面包括&#xff1a; Android基础知识: 基本涵盖Android所有知识体系&#xff0c;四大组件&#xff0c;Fragment,WebView,事件分发&#xff0c;View绘制…Java基础知识&高阶知识点: 基础部分不谈了&#xff0c;高阶部分&#xff1a;泛型&#xff0c;反射&#xff…

git使用教程14-Pycharm版本控制与分支管理

一、版本控制 1、版本控制介绍 &#xff08;1&#xff09;Version Control System 版本控制系统&#xff0c;简称VCS。 &#xff08;2&#xff09;版本控制系统分类&#xff1a; 集中式版本控制工具&#xff1a;SVN 分布式版本控制工具&#xff1a;Git 2、Pycharm 支持的版本…

EPSON RA8000CE (RTC模块)压电侠

RA8000CE是一个集成了32.768 kHz数字温度补偿晶体振荡器(DTCXO)的RTC模块。它包括各种功能&#xff0c;如具有闰年校正的秒到年时钟/日历&#xff0c;时间警报&#xff0c;唤醒计时器&#xff0c;时间更新中断&#xff0c;时钟输出和时间戳功能&#xff0c;可以在外部或内部事件…

【Android】位置修改相关

获取位置服务总开关状态 //获取LOCATION_MODE值&#xff0c;但adb状态下无法获取 //0为关闭&#xff0c;1 gps、2 network、3 高精度等 int state Settings.Secure.getInt(mContext.getContentResolver(),Settings.Secure.LOCATION_MODE,Settings.Secure.LOCATION_MODE_HIGH_…

微信小程序开发系列(十一)·小程序页面的跳转设置以及参数传递

目录 1. 跳转到商品列表 1.1 url: 当前小程序内的跳转链接 1.2 navigate&#xff1a;保留当前页面&#xff0c;跳转到应用内的某个页面。但是不能跳到 tabbar 页面 1.3 redirect&#xff1a; 关闭当前页面&#xff0c;跳转到应用内的某个页面。但不能跳转到 tabbar 页面…

linux实现远程文件夹共享-samba

目录 问题描述Samba如何挂载常用参数临时挂载实例一种长期挂载方法&#xff08;已失败&#xff0c;仅供参考&#xff09;查看挂载取消挂载umount失败 问题描述 我的代码需要访问存在于两个系统&#xff08;win和linux&#xff09;的文件夹&#xff0c;我不是文件夹的创建者&am…

Linux开发板移植rz、sz指令实现串口传输文件

一、开发环境 实现开发板和电脑通过串口来收发互传文件。 开发板&#xff1a;NUC980开发板 环境&#xff1a;Ubuntu 22.04.3 LTS 64-bit lrzsz的源码包:例如 lrzsz-0.12.20.tar.gz&#xff0c;下载地址https://ohse.de/uwe/software/lrzsz.html 二、移植步骤 在开发板上移植…

『python爬虫』ip代理池使用 协采云 账密模式(保姆级图文)

目录 实现效果实现思路代码示例总结 欢迎关注 『python爬虫』 专栏&#xff0c;持续更新中 欢迎关注 『python爬虫』 专栏&#xff0c;持续更新中 实现效果 在官网原版demo基础上小改了一下,修正了接口错误(把2023改成2024就可以了),原版demo只能测试单个ip,我这里批量测试所有…

【Linux实践室】Linux常用命令:文件操作|文件夹操作

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;Linux实践室、网络奇遇记 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 一. ⛳️任务描述二. ⛳️相关知识2.1 &#x1f514;Linux文件操作2.1.1 &#x1f47b;创建文件2…