SpringBoot项目连接,有Kerberos认证的Kafka

在连接Kerberos认证kafka之前,需要了解Kerberos协议
二、什么是Kerberos协议

Kerberos是一种计算机网络认证协议 ,其设计目标是通过密钥系统为网络中通信的客户机(Client)/服务器(Server)应用程序提供严格的身份验证服务,确保通信双方身份的真实性和安全性。不同于其他网络服务,Kerberos协议中不是所有的客户端向想要访问的网络服务发起请求,他就能建立连接然后进行加密通信,而是在发起服务请求后必须先进行一系列的身份认证,包括客户端和服务端两方的双向认证,只有当通信双方都认证通过对方身份之后,才可以互相建立起连接,进行网络通信。即Kerberos协议的侧重在于认证通信双方的身份,客户端需要确认即将访问的网络服务就是自己所想要访问的服务而不是一个伪造的服务器,而服务端需要确认这个客户端是一个身份真实,安全可靠的客户端,而不是一个想要进行恶意网络攻击的用户。

三、Kerberos协议角色组成
Kerberos协议中存在三个角色,分别是:

客户端(Client):发送请求的一方
服务端(Server):接收请求的一方
密钥分发中心(Key distribution KDC)

一,首先需要准备三个文件
(user.keytab,krb5.conf,jass.conf)

其中user.keytab和krb5.conf是两个认证文件,需要厂商提供,就是你连接谁的kafka,让谁提供

jass.conf文件需要自己在本地创建

jass.conf文件内容如下,具体路径和域名需要换成自己的:

debug: truefusioninsight:kafka:bootstrap-servers: 10.80.10.3:21007,10.80.10.181:21007,10.80.10.52:21007security:protocol: SASL_PLAINTEXTkerberos:domain:name: hadoop.798687_97_4a2b_9510_00359f31c5ec.comsasl:kerberos:service:name: kafka


其中kerberos.domain.name:hadoop.798687_97_4a2b_9510_00359f31c5ec.com

hadoop.798687_97_4a2b_9510_00359f31c5ec.com需要根据现场提供给你的域名

二、文件准备好后可以将三个配置文件,放在自己项目中,也可以放在服务器的某个目录下,只要确保项目启动后能读取到即可
我的目录结构如下:


pom依赖:
我用的是华为云的Kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>kafka-sample-01</artifactId><version>2.3.1.RELEASE</version><packaging>jar</packaging><name>kafka-sample-01</name><description>Kafka Sample 1</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.0.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0-hw-ei-302002</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 华为 组件 kafka  start -->
<!--        <dependency>-->
<!--            <groupId>com.huawei</groupId>-->
<!--            <artifactId>kafka-clients</artifactId>-->
<!--            <version>2.4.0</version>-->
<!--            <scope>system</scope>-->
<!--            <systemPath>${project.basedir}/lib/kafka-clients-2.4.0-hw-ei-302002.jar</systemPath>-->
<!--        </dependency>--></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build><repositories><repository><id>huaweicloudsdk</id><url>https://mirrors.huaweicloud.com/repository/maven/huaweicloudsdk/</url><releases><enabled>true</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository><repository><id>central</id><name>Mavn Centreal</name><url>https://repo1.maven.org/maven2/</url></repository></repositories>
</project>

然后再SpringBoot项目启动类如下:

package com.example;import com.common.Foo1;import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.util.backoff.FixedBackOff;import java.io.File;
import java.util.HashMap;
import java.util.Map;/*** @author*/
@SpringBootApplication
public class Application {private final Logger logger = LoggerFactory.getLogger(Application.class);@Value("${fusioninsight.kafka.bootstrap-servers}")public String boostrapServers;@Value("${fusioninsight.kafka.security.protocol}")public String securityProtocol;@Value("${fusioninsight.kafka.kerberos.domain.name}")public String kerberosDomainName;@Value("${fusioninsight.kafka.sasl.kerberos.service.name}")public String kerberosServiceName;public static void main(String[] args) {
//        String filePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main"
//        String filePath = "D:\\Java\\workspace\\20231123MOSPT4eB\\sample-01\\src\\main\\resources\\";String filePath = "/home/yxxt/";System.setProperty("java.security.auth.login.config", filePath + "jaas.conf");System.setProperty("java.security.krb5.conf", filePath + "krb5.conf");SpringApplication.run(Application.class, args);}@Beanpublic ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<String, String> template) {System.out.println(boostrapServers);ConcurrentKafkaListenerContainerFactory<Object, Object> factory= new ConcurrentKafkaListenerContainerFactory<>();configurer.configure(factory, kafkaConsumerFactory);factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template),new FixedBackOff(0L, 2))); // dead-letter after 3 triesreturn factory;}@Beanpublic RecordMessageConverter converter() {return new StringJsonMessageConverter();}// 指定消费监听,该topic有消息时立刻消费@KafkaListener(id = "fooGroup1", topics = "topic_ypgk")public void listen(ConsumerRecord<String, String> record) {System.out.println("监听到了消息-----");logger.info("Received:消息监听成功! " );System.out.println("监听到了-----");System.out.println(record);
//        if (foo.getFoo().startsWith("fail")) {
//            // 触发83行的 ErrorHandler,将异常数据写入 topic名称+.DLT的新topic中
//            throw new RuntimeException("failed");
//        }}// 创建topic,指定分区数、副本数
//    @Bean
//    public NewTopic topic() {
//        return new NewTopic("topic1", 1, (short) 1);
//    }@Beanpublic KafkaAdmin kafkaAdmin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);configs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("kerberos.domain.name", kerberosDomainName);return new KafkaAdmin(configs);}@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {Map<String, Object> configs = new HashMap<>();configs.put("security.protocol", securityProtocol);configs.put("kerberos.domain.name", kerberosDomainName);configs.put("bootstrap.servers", boostrapServers);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new DefaultKafkaConsumerFactory<>(configs);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {Map<String, Object> configs = new HashMap<>();configs.put("security.protocol", securityProtocol);configs.put("kerberos.domain.name", kerberosDomainName);configs.put("bootstrap.servers", boostrapServers);configs.put("sasl.kerberos.service.name", kerberosServiceName);configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);return new KafkaTemplate<>(producerFactory);}
}

生产者:通过发送请求进行向主题里发送消息

package com.example;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import com.common.Foo1;/*** @author haosuwei**/
@RestController
public class Controller {@Autowiredprivate KafkaTemplate<String, String> template;@PostMapping(path = "/send/foo/{what}")public void sendFoo(@PathVariable String what) {Foo1 foo1 = new Foo1(what);this.template.send("topic1", foo1.toString());}}

运行成功,就可以监听到主题消息了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/215641.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

嵌入式FPGA IP正在发现更广阔的用武之地

作者&#xff1a;郭道正, Achronix Semiconductor中国区总经理 在日前落幕的“中国集成电路设计业2023年会暨广州集成电路产业创新发展高峰论坛&#xff08;ICCAD 2023&#xff09;”上&#xff0c;Achronix的Speedcore™嵌入式FPGA硅知识产权&#xff08;eFPGA IP&#xff09…

【VSCode】自定义转换大小写快捷键

文章目录 VSCode 是没有可以直接转换字母大小写的快捷键的&#xff0c;但是可以通过设置去定义 点击左下角设置按钮&#xff0c;并选择键盘快捷方式 在快捷方式里面搜索写&#xff0c;就能找到&#xff1a; 选择要设置的快捷键&#xff0c;并点击左侧的号 在键盘上按住你想设置…

原型 原型对象 原型链

在面向开发对象开发过程中对每一个实例添加方法&#xff0c;会使每一个对象都存在该添加方法造成空间浪费 通过对原型添加公共的属性或方法&#xff0c;使所有实例对象都可访问 原型为了共享公共的成员 prototype 原型: JS为每个构造函数提供一个属性prototype(原型),它的值…

命名空间、字符串、布尔类型、nullptr、类型推导

面向过程语言&#xff1a;C ——> 重视求解过程 面向对象语言&#xff1a;C ——> 重视求解的方法 面向对象的三大特征&#xff1a;封装、继承和多态 C 和 C 在语法上的区别 1、命名空间&#xff08;用于解决命名冲突问题&#xff09; 2、函数重载和运算符重载&#xf…

多功能PHP图床源码:Lsky Pro开源版v2.1 – 最新兰空图床

Lsky Pro是一款功能丰富的在线图片上传和管理工具&#xff0c;即兰空图床。它不仅可以作为个人云相册&#xff0c;还可以用作写作贴图库。 该程序的初始版本于2017年10月由ThinkPHP 5开发&#xff0c;经过多个版本的迭代&#xff0c;于2022年3月发布了全新的2.0版本。 Lsky Pro…

uniapp视频倍速播放插件,uniapp视频试看插件——sunny-video使用文档

sunny-video视频倍速播放器 组件名&#xff1a;sunny-video 效果图 img1img2img3img4 平台差异说明 目前已应用到APP&#xff08;安卓、iOS&#xff09;、微信&#xff08;小程序、H5&#xff09;其它平台未测试 安装方式 本组件符合easycom规范&#xff0c;HBuilderX 2.5…

82基于matlab GUI的图像处理

基于matlab GUI的图像处理&#xff0c;功能包括图像一般处理&#xff08;灰度图像、二值图&#xff09;&#xff1b;图像几何变换&#xff08;旋转可输入旋转角度、平移、镜像&#xff09;、图像边缘检测&#xff08;拉普拉斯算子、sobel算子、wallis算子、roberts算子&#xf…

2022年03月 Scratch(三级)真题解析#中国电子学会#全国青少年软件编程等级考试

Scratch等级考试(1~4级)全部真题・点这里 一、单选题(共25题,每题2分,共50分) 第1题 以下四个选项中,运行哪个积木块,可能得到523这个数值? A: B: C: D: 答案:B 四个选项都遵循统一的公式:随机数ⅹ10+3=523,因此可以得出随

Redis 的过期策略都有哪些?

思考:假如redis的key过期之后&#xff0c;会立即删除吗&#xff1f; Redis对数据设置数据的有效时间&#xff0c;数据过期以后&#xff0c;就需要将数据从内存中删除掉。可以按照不同的规则进行删除&#xff0c;这种删除规则就被称之为数据的删除策略&#xff08;数据过期策略…

多模态——使用stable-video-diffusion将图片生成视频

多模态——使用stable-video-diffusion将图片生成视频 0. 内容简介1. 运行环境2. 模型下载3. 代码梳理3.1 修改yaml文件中的svd路径3.2 修改DeepFloyDataFiltering的vit路径3.3 修改open_clip的clip路径3.4 代码总体结构 4. 资源消耗5. 效果预览 0. 内容简介 近期&#xff0c;…

ChatGPT重磅升级!集简云支持GPT4 Turbo Vision, GPT4 Turbo, Dall.E 3,Whisper等最新模型

在11月7日凌晨&#xff0c;OpenAI全球开发者大会宣布了 GPT-4的一次大升级&#xff0c;推出了 GPT-4 Turbo号称为迄今为止最强的大模型。 此次GPT-4的更新和升级在多个方面显示出强大的优势和潜力。为了让集简云用户能快速体验新模型的能力&#xff0c;我们第一时间整理了大会发…

Tars框架 Tars-Go 学习

Tars 框架安装 网上安装教程比较多&#xff0c;官方可以参数这个 TARS官方文档 (tarsyun.com) 本文主要介绍部署应用。 安装完成后Tars 界面 增加应用amc 部署申请 amc.GoTestServer.GoTestObj 名称不知道的可以参考自己创建的app config 点击刷新可以看到自己部署的应用 服…