1. 依赖部署
# zookeeper# kafka
server.1=c5:2881:3881
server.2=c6:2881:3881
server.3=c7:2881:3881kafka-server-stop.shbin/kafka-server-start.sh --daemon config/server.properties
2. 部署kafka-eagle
3. kafka权限控制
3.1 SASL/PLAIN
1、服务端配置
- 配置 config/server.properties
# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer# SASL_PLAINTEXT
# 在三台机器上换成每台机器对应的hostname/ip
listeners=SASL_PLAINTEXT://c8:8085
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN# 设置admin超级用户
super.users=User:admin#设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问
#默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
allow.everyone.if.no.acl.found=false
- 添加配置 config/kafka_server_jaas.conf
前三行是配置管理员账户(该账户与上面server.properties中配置的super.users一样)user_test_reader="test_reader" 表示添加一个用户名为test_reader 对应的密码为test_reader。即 user_用户名="该用户的密码"。
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin"user_test_reader="test_reader"user_test_writer="test_writer"};
- 修改kafka-server-start.sh文件
#修改最后一行,改成下面的内容
source /etc/profileexec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf kafka.Kafka "$@"
- 重启kafka集群
至此,kafka集群已开启SASL安全认证 并添加了用户admin、test_reader、test_reader 。
2、客户端配置及对用户授权
1. shell客户端配置
任一非broker节点
- 生产者
添加 config/test_writer_jaas.conf 这里的用户名密码必须和服务器端配置的一样
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="test_writer"password="test_writer";};
修改kafka-console-profucer.sh
#修改最后一行,改成下面的内容
source /etc/profileexec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/test_writer_jaas.conf kafka.tools.ConsoleProducer "$@"
修改producer.properties
# 末尾添加以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- 消费者
添加 config/test_reader_jaas.conf 这里的用户名密码必须和服务器端配置的一样
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="test_reader"password="test_reader";};
修改kafka-console-consumer.sh
#修改最后一行,改成下面的内容
source /etc/profileexec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/test_reader_jaas.conf kafka.tools.ConsoleProducer "$@"
修改consumer.properties
注意: 这个配置文件中 group.id=test-consumer-group,待会在给控制台客户端授权消费者权限时还需要指定这个消费者组。
# 末尾添加以下内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
2. 进行ACL授权
对 test_writer 用户授权,可以向 topic_1 主题生产数据
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=c8:8083/myKafka --add --allow-principal User:test_writer --operation Write --topic topic_1
对 test_reader 用户授权,可以从 topic_1 主题消费数据
注意:同时要给消费者组配置权限,消费者组名称和consumer.properties中的group.id一致
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=c8:8083/myKafka --add --allow-principal User:test_reader --operation Read --topic topic_1 --group test-consumer-group
测试
bin/kafka-console-producer.sh --topic topic_1 --broker-list c8:8085 --producer.config config/producer.propertiesbin/kafka-console-consumer.sh --topic topic_1 --bootstrap-server c8:8085 --consumer.config config/consumer.properties
3. java代码本地连接
先配置本地hosts文件
1.1.1.2 c8
生产者
- 第一种方式
// 创建生产者
Properties configs = new Properties();
// broker hostname:port 注意:这里的c8不能使用IP,要和broker的server.properties中的 listeners 配置完全一致。
configs.put("bootstrap.servers", "c8:8085");
// 用户名密码
configs.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=test_writer password=test_writer;");
// sasl
configs.put("security.protocol", "SASL_PLAINTEXT");
// sasl
configs.put("sasl.mechanism", "PLAIN");
configs.put("key.serializer", IntegerSerializer.class);
configs.put("value.serializer", StringSerializer.class);
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs); ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>( "topic_1", // 主题 "hello12312" // 消息字符串
);
System.out.println("消息已创建"); // 生产者发送消息到broker,消息的同步确认
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println("消息的主题是:" + metadata.topic());
System.out.println("消息的偏移量是:" + metadata.offset());
System.out.println("消息的分区是:" + metadata.partition());
- 第二种方式
E:/tmp/test_writer_jaas.conf
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="test_writer"password="test_writer";};
java代码
// 添加环境变量
System.setProperty("java.security.auth.login.config", "E:/tmp/test_writer_jaas.conf");// 创建生产者
Properties configs = new Properties();
// broker hostname:port 注意:这里的c8不能使用IP,要和broker的server.properties中的 listeners 配置完全一致。
configs.put("bootstrap.servers", "c8:8085");
// sasl
configs.put("security.protocol", "SASL_PLAINTEXT");
// sasl
configs.put("sasl.mechanism", "PLAIN");
configs.put("key.serializer", IntegerSerializer.class);
configs.put("value.serializer", StringSerializer.class);
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs); ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>( "topic_1", // 主题 "hello12312" // 消息字符串
);
System.out.println("消息已创建"); // 生产者发送消息到broker,消息的同步确认
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();
System.out.println("消息的主题是:" + metadata.topic());
System.out.println("消息的偏移量是:" + metadata.offset());
System.out.println("消息的分区是:" + metadata.partition());
消费者
// 环境变量
System.setProperty("java.security.auth.login.config", "E:/tmp/test_reader_jaas.conf");
HashMap<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "c8:8085");
configs.put("security.protocol", "SASL_PLAINTEXT");
configs.put("sasl.mechanism", "PLAIN");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 消费组
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Arrays.asList("topic_1")); ConsumerRecords<Integer, String> records = consumer.poll(3000); records.forEach((record) -> System.out.println(record.topic() + " " + record.offset() + record.value()));
怀疑是 外网请求无法转发到内网?也不合理啊
原因是 broker-list 代码中要和 kafka集群配置的 完全一样。
那是否本地可以用1.1.1.2 c5 代理,伪装一下?
kafka-topics.sh --list --zookeeper 192.168.169.1:2181/myKafka2kafka-topics.sh --zookeeper c8:8083/myKafka --create --topic topic_1 --partitions 1 --replication-factor 1
3.2 SCRAM
# 创建SCRAM证书
bin/kafka-configs.sh --zookeeper 192.168.169.1:2181 --alter --add-config 'SCRAM-SHA-256=[password=123],
SCRAM-SHA-512=[password=123]' --entity-type users --entity-name admin