SpringBoot集成Kafka之SASL_SSL

系列文章目录


文章目录

  • 系列文章目录
  • 前言


前言

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。
在这里插入图片描述


Kafka 是一个由 LinkedIn 开发的分布式消息系统,它于2011年年初开源,现在由著名的 Apache 基金会维护与开发。 Kafka 使用 Scala 实现,被用作 LinkedIn 的活动流和运营数据处理的管道,现在也被诸多互联网企业广泛地用作数据流管道和消息系统, Kafka 是基于消息发布﹣订阅模式实现的消息系统。
在这里插入图片描述
Win安装:Kafka安装Windows版 - Java小强技术博客 (javacui.com)

SpringBoot集成:SpringBoot集成Kafka - Java小强技术博客 (javacui.com)

由于公司使用时,必须要求Prodcuer&Consumer must connect to Kafka via SASL_SSL,因为进行了相关集成的编码,进行记录。

首先之前Spring的配置文件我们就不需要了,为了测试方便先编码到代码中,手续可以手动指定到配置文件中。

这里需要IO包,来操作JKS文件,看其是否存在,因此POM需要引入

<dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version>
</dependency>

然后消息监听类,需要改造下,首先指定一个ID,然后指定为手动启动

package com.example.springboot.listener;import com.alibaba.fastjson.JSON;
import com.example.springboot.model.Blog;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaMessListener {private static final String messTopic = "test";@KafkaListener(id="KafkaMessListener", topics = messTopic, groupId = "javagroup", autoStartup = "false")public void messListener(Blog blog) {System.out.println("消费端 收到消息:" + JSON.toJSONString(blog));}
}

编写消息提供者Producer配置类

package com.example.springboot.config;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.boot.system.ApplicationHome;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;/*** Kafka服务端配置*/
@Slf4j
@Configuration
public class KafkaProducerConfig {@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** the producer factory config*/@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9002");props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 3);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 106384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.springframework.kafka.support.serializer.JsonSerializer");props.put("security.protocol", "SASL_SSL");props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");//打包成jar后,无法找到文件的解决方法(复制jar包里的文件,到文件目录里)ApplicationHome applicationHome = new ApplicationHome(KafkaProducerConfig.class);//项目打包成jar包所在的根路径String rootPath = applicationHome.getSource().getParentFile().toString();String configFilePath = rootPath + "\\client_truststore.jks";log.info("证书文件地址:" + configFilePath);File configFile = new File(configFilePath);if (!configFile.exists()) {try {//获取类路径下的指定文件流  (项目目录下的:  /resource/client_truststore.jks)InputStream in = this.getClass().getClassLoader().getResourceAsStream("client_truststore.jks");FileUtils.copyInputStreamToFile(Objects.requireNonNull(in, "client_truststore.jks文件找不到"), configFile);} catch (IOException e) {throw new IllegalArgumentException("client_truststore.jks文件找不到->" + e.getMessage());}}props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, configFilePath);props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Abc123");//注意passwod结尾的分号一定不要漏props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username='YouName' password='YouPass';");return new DefaultKafkaProducerFactory<String, String>(props);}}

这了之前在Spring中配置的属性,我们都编码在代码里面了,特别注意的是对于JKS文件的目录指定,因为SpringBoot程序最后都是打成Jar包的,因此需要上述方式来指定JKS文件位置。

然后我们编码消息监听者的配置类,因为我们已经指定了监听程序手动启动,因此配置类中需要编码手工开启某监听的启动。

package com.example.springboot.config;import com.example.springboot.listener.KafkaMessListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.system.ApplicationHome;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;/*** Kafka消费端配置*/
@Configuration
@EnableKafka
@Slf4j
public class KafkaConsumerConfig {@Beanpublic ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {return args -> {MessageListenerContainer kafkaMessListener = registry.getListenerContainer("KafkaMessListener");kafkaMessListener.start();};}/*** 配置监听,将消费工厂信息配置进去*/@Beanpublic ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<Integer, String>();factory.setConsumerFactory(consumerFactory());return factory;}/*** 消费 消费工厂*/@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());}/*** 消费配置*/public Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9002");props.put(ConsumerConfig.GROUP_ID_CONFIG, "YouGroup_ID");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费// latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据// none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.springframework.kafka.support.serializer.JsonDeserializer");props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // 信任所有包可以序列化props.put("security.protocol", "SASL_SSL");props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");//打包成jar后,无法找到文件的解决方法(复制jar包里的文件,到文件目录里)ApplicationHome applicationHome = new ApplicationHome(KafkaConsumerConfig.class);//项目打包成jar包所在的根路径String rootPath = applicationHome.getSource().getParentFile().toString();String configFilePath = rootPath + "\\client_truststore.jks";log.info("证书文件地址:" + configFilePath);File configFile = new File(configFilePath);if (!configFile.exists()) {try {//获取类路径下的指定文件流  (项目目录下的:  /resource/client_truststore.jks)InputStream in = this.getClass().getClassLoader().getResourceAsStream("client_truststore.jks");FileUtils.copyInputStreamToFile(Objects.requireNonNull(in, "client_truststore.jks文件找不到"), configFile);} catch (IOException e) {throw new IllegalArgumentException("client_truststore.jks文件找不到->" + e.getMessage());}}props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, configFilePath);props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Abc123");//注意passwod结尾的分号一定不要漏props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username='YouName' password='YouPass';");return props;}
}

如上可以手工启动消息开始处理消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/619614.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件无线电安全之HackRF One初探

HackRF介绍 HackRF是一款开源软件无线电&#xff08;SDR&#xff09;平台&#xff0c;由Great Scott Gadgets公司推出。它具有广泛的频率覆盖范围&#xff0c;从1 MHz到6 GHz&#xff0c;支持大部分常见的无线通信频段。采用软件定义无线电技术&#xff0c;HackRF提供了自定义…

最优算法100例之43-包含min函数的栈

专栏主页:计算机专业基础知识总结(适用于期末复习考研刷题求职面试)系列文章https://blog.csdn.net/seeker1994/category_12585732.html 题目描述 题目描述: 定义栈的数据结构,请在该类型中实现一个能够得到栈的最小元素的min函数,在该栈中,调用min,push及pop的时间复杂…

DC-2靶机知识点

知识点总结 1.IP访问与域名访问 2.端口扫描 3.目录扫描 4.cewl密码生成器 5.指纹探测 6.爆破ssh 7.msf的使用 8.rbash逃逸 9.git提权 靶机&#xff0c;攻击机就不多说了&#xff0c;给个靶机地址 https://download.vulnhub.com/dc/DC-2.zip 环境配置 因为访问该靶机…

移动应用安全合规动态:网信办、金管局发文强调数据安全;3月个人信息违规抽查结果出炉!(第五期)

一、监管部门动向&#xff1a;国家互联网信息办公室公布《促进和规范数据跨境流动规定》; 工信部发布《关于网络安全保险典型服务方案目录的公示》 二、安全新闻&#xff1a;恶意软件警报&#xff01;黑客利用软件即服务攻击印度安卓用户&#xff1b;Cerberus银行恶意软件的虚…

杭州掀起快递物流创新浪潮,2024长三角快递物流展7月共绘智慧物流新蓝图

杭州&#xff0c;作为中国的电商之都&#xff0c;近年来在快递物流行业背景与应用方面取得了显著的发展。随着电子商务的迅猛增长&#xff0c;杭州的快递物流行业迅速崛起&#xff0c;成为支撑电商产业发展的重要力量。 2024长三角&#xff08;杭州&#xff09;快递物流供应链…

vue3.4 新特性 defineModel() 宏

v-model 简介 官网是这样解释 v-model 的 v-model 的功能是&#xff0c;实现数据的双向绑定【本质上是 :value 和 input 语法糖】 如果是表单元素&#xff0c;下面两种写法是一样&#xff0c;这时v-model就是语法糖&#xff0c;帮你简化了操作 <input v-model"messag…

浅尝一下ECS(Entity Component System)(学习笔记)

参考文章&#xff1a;浅谈Unity ECS&#xff08;一&#xff09;Uniy ECS基础概念介绍&#xff1a;面向未来的ECS - 知乎 (zhihu.com) 视频链接&#xff1a;【青幻译制】GDC讲座系列之三 守望先锋的游戏架构和网络代码_哔哩哔哩_bilibili 云风的 BLOG: 浅谈《守望先锋》中的 E…

机器学习—特征预处理和降维(四)

什么是特征预处理&#xff1f; 通过一些转换函数将特征数据转换成更加适合算法模型的特征数据过程 1包含内容 数值型数据的无量纲化&#xff1a; 归一化标准化 2特征预处理API sklearn. preprocessing为什么要进行归一化 or 标准化&#xff1f; 特征的单位或者大小相差较大…

neo4j使用详解(十八、java driver使用及性能优化<高级用法>——最全参考)

Neo4j系列导航&#xff1a; neo4j安装及简单实践 cypher语法基础 cypher插入语法 cypher插入语法 cypher查询语法 cypher通用语法 cypher函数语法 neo4j索引及调优 neo4j java Driver等更多 1.依赖引入 <dependency><groupId>org.neo4j.driver</groupId><…

Springboot+Vue项目-基于Java+MySQL的校园周边美食探索及分享平台系统(附源码+演示视频+LW)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;Java毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计 &…

微信过期文件怎么恢复?四个高招助你轻松解决(2024新版)

“微信的文件未下载的情况下过期了&#xff0c;平时微信也没有登录在电脑上&#xff0c;之前也没有进行过数据备份&#xff0c;如何找回这个文件啊&#xff01;&#xff01;感谢回答&#xff01;” “急求&#xff0c;当时忘记点击下载&#xff0c;现在急用微信文件下载不了&a…

Excel 记录单 快速录入数据

一. 调出记录单 ⏹记录单功能默认是隐藏的&#xff0c;通过如下如图所示的方式&#xff0c;将记录单功能显示出来。 二. 录入数据 ⏹先在表格中录入一行数据&#xff0c;给记录单一个参考 ⏹将光标至于表格右上角&#xff0c;然后点击记录单按钮&#xff0c;调出记录单 然后点…