202108201623 - kafka集群部署

news/2025/3/22 2:23:35/文章来源:https://www.cnblogs.com/route/p/18783751

1. 依赖部署

# zookeeper# kafka
server.1=c5:2881:3881
server.2=c6:2881:3881
server.3=c7:2881:3881kafka-server-stop.shbin/kafka-server-start.sh --daemon config/server.properties

2. 部署kafka-eagle

3. kafka权限控制

3.1 SASL/PLAIN

1、服务端配置

  1. 配置 config/server.properties
# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer# SASL_PLAINTEXT 
# 在三台机器上换成每台机器对应的hostname/ip
listeners=SASL_PLAINTEXT://c8:8085
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN# 设置admin超级用户
super.users=User:admin#设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问
#默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
allow.everyone.if.no.acl.found=false
  1. 添加配置 config/kafka_server_jaas.conf
    前三行是配置管理员账户(该账户与上面server.properties中配置的super.users一样)user_test_reader="test_reader" 表示添加一个用户名为test_reader 对应的密码为test_reader。即 user_用户名="该用户的密码"。
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin"user_test_reader="test_reader"user_test_writer="test_writer"};
  1. 修改kafka-server-start.sh文件
#修改最后一行,改成下面的内容
source /etc/profileexec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf  kafka.Kafka "$@"
  1. 重启kafka集群
    至此,kafka集群已开启SASL安全认证 并添加了用户admin、test_reader、test_reader 。

2、客户端配置及对用户授权

1. shell客户端配置

任一非broker节点

  • 生产者
    添加 config/test_writer_jaas.conf 这里的用户名密码必须和服务器端配置的一样
   KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="test_writer"password="test_writer";};

修改kafka-console-profucer.sh

#修改最后一行,改成下面的内容
source /etc/profileexec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/test_writer_jaas.conf  kafka.tools.ConsoleProducer "$@"

修改producer.properties

# 末尾添加以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
  • 消费者
    添加 config/test_reader_jaas.conf 这里的用户名密码必须和服务器端配置的一样
   KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="test_reader"password="test_reader";};

修改kafka-console-consumer.sh

#修改最后一行,改成下面的内容
source /etc/profileexec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/test_reader_jaas.conf  kafka.tools.ConsoleProducer "$@"

修改consumer.properties
注意: 这个配置文件中 group.id=test-consumer-group,待会在给控制台客户端授权消费者权限时还需要指定这个消费者组。

# 末尾添加以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
2. 进行ACL授权

对 test_writer 用户授权,可以向 topic_1 主题生产数据

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=c8:8083/myKafka --add --allow-principal User:test_writer --operation Write --topic topic_1

对 test_reader 用户授权,可以从 topic_1 主题消费数据
注意:同时要给消费者组配置权限,消费者组名称和consumer.properties中的group.id一致

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=c8:8083/myKafka --add --allow-principal User:test_reader --operation Read --topic topic_1 --group test-consumer-group

测试

bin/kafka-console-producer.sh --topic topic_1 --broker-list c8:8085 --producer.config config/producer.propertiesbin/kafka-console-consumer.sh --topic topic_1 --bootstrap-server c8:8085 --consumer.config config/consumer.properties
3. java代码本地连接

先配置本地hosts文件

1.1.1.2 c8

生产者

  • 第一种方式
// 创建生产者  
Properties configs = new Properties();  
// broker hostname:port  注意:这里的c8不能使用IP,要和broker的server.properties中的 listeners 配置完全一致。
configs.put("bootstrap.servers", "c8:8085");
//  用户名密码
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=test_writer password=test_writer;");  
//  sasl
configs.put("security.protocol", "SASL_PLAINTEXT");  
//   sasl
configs.put("sasl.mechanism", "PLAIN");  
configs.put("key.serializer", IntegerSerializer.class);  
configs.put("value.serializer", StringSerializer.class);  
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);  ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(  "topic_1", // 主题  "hello12312" // 消息字符串  
);  
System.out.println("消息已创建");  // 生产者发送消息到broker,消息的同步确认  
Future<RecordMetadata> future = producer.send(record);  
RecordMetadata metadata = future.get();  
System.out.println("消息的主题是:" + metadata.topic());  
System.out.println("消息的偏移量是:" + metadata.offset());  
System.out.println("消息的分区是:" + metadata.partition());
  • 第二种方式
    E:/tmp/test_writer_jaas.conf
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="test_writer"password="test_writer";};

java代码

// 添加环境变量
System.setProperty("java.security.auth.login.config", "E:/tmp/test_writer_jaas.conf");// 创建生产者  
Properties configs = new Properties();  
// broker hostname:port  注意:这里的c8不能使用IP,要和broker的server.properties中的 listeners 配置完全一致。
configs.put("bootstrap.servers", "c8:8085");
//  sasl
configs.put("security.protocol", "SASL_PLAINTEXT");  
//   sasl
configs.put("sasl.mechanism", "PLAIN");  
configs.put("key.serializer", IntegerSerializer.class);  
configs.put("value.serializer", StringSerializer.class);  
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);  ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(  "topic_1", // 主题  "hello12312" // 消息字符串  
);  
System.out.println("消息已创建");  // 生产者发送消息到broker,消息的同步确认  
Future<RecordMetadata> future = producer.send(record);  
RecordMetadata metadata = future.get();  
System.out.println("消息的主题是:" + metadata.topic());  
System.out.println("消息的偏移量是:" + metadata.offset());  
System.out.println("消息的分区是:" + metadata.partition());

消费者

// 环境变量
System.setProperty("java.security.auth.login.config", "E:/tmp/test_reader_jaas.conf");  
HashMap<String, Object> configs = new HashMap<>();  
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "c8:8085");  
configs.put("security.protocol", "SASL_PLAINTEXT");  
configs.put("sasl.mechanism", "PLAIN");  
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);  
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);  
// 消费组
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");  
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);  consumer.subscribe(Arrays.asList("topic_1"));  ConsumerRecords<Integer, String> records = consumer.poll(3000);  records.forEach((record) -> System.out.println(record.topic() + " " + record.offset()  + record.value()));

怀疑是 外网请求无法转发到内网?也不合理啊

原因是 broker-list 代码中要和 kafka集群配置的 完全一样。

那是否本地可以用1.1.1.2 c5 代理,伪装一下?


kafka-topics.sh --list --zookeeper 192.168.169.1:2181/myKafka2kafka-topics.sh --zookeeper  c8:8083/myKafka --create --topic topic_1 --partitions 1 --replication-factor 1

3.2 SCRAM

# 创建SCRAM证书
bin/kafka-configs.sh --zookeeper 192.168.169.1:2181 --alter --add-config 'SCRAM-SHA-256=[password=123],
SCRAM-SHA-512=[password=123]' --entity-type users --entity-name admin

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/902149.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于风险的完整性和检查建模(RBIIM)MATLAB仿真

1.程序功能描述 基于风险的完整性和检查建模(Risk-Based Integrity and Inspection Modeling ,RBIIM)MATLAB仿真,对比prior density,posterior perfect inspection,posterior imp inpection,cummulative posterior imperfect inspection四个指标。 2.测试软件版本以及…

202108151156 - kafka消费积压案例

0. 背景 上游厂家生产信令数据,我方消费kafka数据,过滤后插入HBase。 上游生产的信令数据分了4个主题,每个主题有若干分区,这4个主题的数据消费后都插入同一张HBase表。 问题:kafka消息积压达到百亿。 以下以topic1为例,有6个分区。 1. 查看消费滞后情况 kafka-consumer-…

mysql 多表怎么连接的

前言 简单描述一下多表怎么连接的。 正文 首先,我们得抛开我们一些自以为是的想法。 我想过这个问题,就是为什么我们背乘法口诀的时候,我们总是背: 22 = 4, 99=81 这样背下去,似乎这是口诀。然而这是缓存,不是计算,既然不是计算那么就不是逻辑学。 我们理所当然的想9*9…

202108120808 - 类加载器及双亲委派机制

Bootstrap ClassLoader 这是加载器中的大 Boss,任何类的加载行为,都要经它过问。它的作用是加载核心类库,也就是 rt.jar、resources.jar、charsets.jar 等。当然这些 jar 包的路径是可以指定的,-Xbootclasspath 参数可以完成指定操作。 这个加载器是 C++ 编写的,随着 JVM …

keil仿真时导出数据操作

keil仿真时导出数据操作 save D:\savedata.txt 0x20001013,0x20001035

spring-boot-starter-validation

官方提供的注解 spring-boot-starter-validation 是 Spring Boot 提供的一个 starter,是一个用于验证 Java Bean 的标准,它提供了一套注解和相应的运行时 API 来定义和执行校验规则。 具体来说,当你在项目中引入 spring-boot-starter-validation 后,你可以使用一系列预定义…

省选算法复习

省选算法复习 1. 线段树优化建图 当我们需要向区间内所有点连边或者从区间中所有点连到某个点的时候,便可以使用线段树来优化,如果需要从区间每一个点连到另一个区间每一个点的话,加一个虚点就好了。 这不是一个很困难的技巧,关键在于要建模。 P5471 [NOI2019] 弹跳 - 洛谷…

fastadmin订单父子表管理端

fastadmin后台父子表使用方法 发布于 2021-01-22 12:48:10fastadmin后台的所有表格都是支持父子表配置的,只需要简单修改一下对应的JS即可,下面直接进入主题。示例是我的全国省市行政区划表,是从国家统计局网站采集下来的,共五级行政数据,非常适合用来做父子表,按照级别一…

Rudolf and k Bridges

Rudolf and k Bridges 题目 大致题意上图为俯视图 有一个\(nXm\)的网格,下标从\(1-n\) 以及从 \(1-m\),\((i, j)\) 的值就是这个这垂直一格水的深度 现在要安装支架,有几个信息:\((i, 1)\) 和 \((i, m)\) 处必须要安装相邻支架的距离不能超过 \(d\), 相邻距离为 \(abs(j - …

背离Divergence Trading ,贪小便宜

趋势交易(trend trading)和背离交易(divergence trading),代表了两种不同的交易策略。做背离交易相当于赌市场短期失效,承认你比市场聪明,虽然能赚小钱,但往往是亏大钱的根源。 贪小便宜爱背离,贪小便宜(gain small advantages)不爱止损(cut losses),所以背离和不止损…

在鸿蒙NEXT开发中实现一个语音识别组件

鸿蒙系统发布以后都不知道叫它5.0版本还是NEXT版本了,哈哈,反正是最新版本就对了。对于语音转换文字,鸿蒙系统提供了离线语音识别模型speechRecognizer,语种目前支持中文,识别效果非常不错。今天要分享的是使用speechRecognizer实现一个语音识别组件。要实现语音识别,首先…

激光代加工产品一览-代加工-外协加工-委外加工-激光代加工-河南郑州亚克力切割雕刻代加工-芯晨微纳(河南)

关键词:河南省郑州市、激光代加工、激光打标、激光切割、激光雕刻、激光打孔、激光毛化、激光分切 简介:芯晨微纳(河南)光电科技有限公司,专注于激光微纳代加工、设备/耗材代理销售、设备租赁、技术推广服务,可处理材料类型及应用范围十分广泛,欢迎来电咨询(韩经理1823…