本文已参加「新人创造礼」活动,一起敞开创造之路。

概述

自0.9.0.0.版别引入Security之后,Kafka一直在完善security的功用,以进步kafka集群的安全性。当时Kafka security首要包含3大功用:认证(authentication)、信道加密(encryption)和授权(authorization)。

认证

Kafka SASL的认证规模包含如下:

  • Client与Broker之间
  • Broker与Broker之间
  • Broker与Zookeeper之间

在Kafka体系中,SASL三种完成机制:

  • Kerberos(SASL/GSSAPI – starting at version 0.9.0.0)
  • PLAIN(SASL/PLAIN – starting at version 0.10.0.0)
  • SCRAM(SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 – starting at version 0.10.2.0)

SASL(Simple Authentication and Security Layer)仅仅一个安全层框架,详细完成是Kerberos、PLAIN或SCRAM。

关于一些小公司而言,他们的用户体系并不杂乱,集群用户不多,并且因为运行在内网环境,SSL加密也不是很必要。SASL/PALIN而言, 运维成本相对较小,合适小公司中的Kafka集群,弊端是不能动态增减用户。 因而一个SASL/PLAINTEXT+ACL的集群环境足以应付一般的运用场景。下面我运用最新的Kafka 2.4.1版别来演示下怎么构建支撑SASL(PLAINTEXT) + ACL来构建secured Kafka集群

kafka不同的认证机制:

Kafka SASL安全认证机制与ACL用户权限操控

信道加密

信道加密就是为client到broker、broker到broker以及东西脚本与broker之间的数据传输装备SSL。SSL(Secure Sockets Layer)在TCP握手环节运用非对称加密,在数据传输环节运用对称加密。因为非对称加密比对称加密耗时,但安全性比较高,因而在此即进步了安全性,也进步了速度。
在Kafka体系中,SSL协议作为信道加密机制默许是禁止的,假如需求运用,能够手动发动SSL机制。在一般的上产环境中不建议发动SSL,因为运用SSL就不能运用kafka的ZORE-COPY特性,会大大降低消费时的速度。

授权

授权是经过ACL(Access Control Lists)接口指令来完成用户权限操控。

Kafka安全认证 SASL/PLAINTEXT

环境

操作体系:CentOS release 6.3 (Final)

Kafka Version:kafka_2.11-2.4.1

Zookeeper Version:3.4.13

一、Zookeeper集群装备SASL

1、zoo.cfg文件装备

为zookeeper增加SASL支撑,在装备文件zoo.cfg增加,内容如下:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

2.新建zk_server_jaas.conf文件,为Zookeeper增加账号认证信息

这个文件你放在哪里随意,只需后边zkEnv.sh装备正确的途径就好了。我是放在$ZK_HOME/zk_sasl_dependencies 途径下。zk_server_jaas.conf 文件的内容如下:

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="zk_cluster"
    password="zk_cluster_passwd"
    user_kafka="zk_kafka_client_passwd";
};

username和paasword是zk集群之间认证的用户名暗码。 user_kafka=”zk_kafka_client_passwd”界说了一个用户”kafka”,暗码是”zk_kafka_client_passwd” user_{username}=”password”:方法是界说zk集群的客户端的用户名和暗码

3.将Kafka认证相关jar包导入到Zookeeper

Zookeeper的认证机制是运用插件,“org.apache.kafka.common.security.plain.PlainLoginModule”,所以需求导入Kafka相关jar包,kafka-clients相关jar包,在kafka服务下的lib目录中能够找到,依据kafka不同版别,相关jar包版别会有所改变。

所需求jar包如下,在zookeeper下创建目录zk_sasl_dependencies将jar包放入(目录名与位置能够随意,后续引证指定即可):

kafka-clients-2.4.1.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.28.jar
slf4j-log4j12-1.7.28.ja
snappy-java-1.1.7.3.jar

修正zkEnv.sh

首要意图:

  1. 将这几个jar包使zookeeper读取到
  2. 加载zk_server_jaas.conf装备文件

在 zkEnv.sh 文件最后增加如下内容:

#为zookeeper增加SASL支撑
for i in $ZK_HOME/zk_sasl_dependencies/*.jar; 
do 
    CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZK_HOME/zk_sasl_dependencies/zk_server_jaas.conf "

发动Zookeeper服务

在每个ZK节点都要做相同的装备,装备完成发动ZK集群。

我建立的是伪分布式zk集群,在这里附上我的zoo.cfg 装备:

zoo1.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/hadoop/zk/data/1
dataLogDir=/home/hadoop/zk/logs/1
clientPort=8181
#伪集群布置
server.1=10.194.202.17:2881:3881
server.2=10.194.202.17:2882:3882
server.3=10.194.202.17:2883:3883
#为zookeeper增加SASL支撑
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zoo2.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/hadoop/zk/data/2
dataLogDir=/home/hadoop/zk/logs/2
clientPort=8182
#伪集群布置
server.1=10.194.202.17:2881:3881
server.2=10.194.202.17:2882:3882
server.3=10.194.202.17:2883:3883
#为zookeeper增加SASL支撑
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zoo3.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/hadoop/zk/data/3
dataLogDir=/home/hadoop/zk/logs/3
clientPort=8183
#伪集群布置
server.1=10.194.202.17:2881:3881
server.2=10.194.202.17:2882:3882
server.3=10.194.202.17:2883:3883
#为zookeeper增加SASL支撑
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

一起附上zk集群办理脚本:

zk_cluster.sh
#!/usr/bin/env bash
# --------------------------------------------------------------
#  Author        :lijianjun
#  Create Time   :2020-04-20 12:56
#  Description   :zookeeper cluster 办理脚本
# --------------------------------------------------------------
. ~/.bashrc
ZK_HOME="/home/hadoop/zk"
alias zkServer.sh=$ZK_HOME/bin/zkServer.sh
alias zkCli.sh=$ZK_HOME/bin/zkCli.sh
ZOOKEEPERS="10.194.202.17:8181,10.194.202.17:8182,10.194.202.17:8183"
ZK_NODES=3
function start_zk(){
    for((i=1;i<=$ZK_NODES;i++))
    do 
        zkServer.sh start $ZK_HOME/conf/zoo$i.cfg
    done
}
function stop_zk(){
    for((i=1;i<=$ZK_NODES;i++))
    do 
        zkServer.sh stop $ZK_HOME/conf/zoo$i.cfg
    done
}
function status_zk(){
    for((i=1;i<=$ZK_NODES;i++))
    do 
        zkServer.sh status $ZK_HOME/conf/zoo$i.cfg
    done
}
function restart_zk(){
    for((i=1;i<=$ZK_NODES;i++))
    do 
        zkServer.sh restart $ZK_HOME/conf/zoo$i.cfg
    done
}
function init_clean(){
    stop_zk
    rm -rf $ZK_HOME/data/*
    rm -rf $ZK_HOME/logs/*
    for((i=1;i<=$ZK_NODES;i++))
    do  
        mkdir -p $ZK_HOME/data/$i
        echo $i > $ZK_HOME/data/$i/myid
    done
}
function init_zk(){
    init_clean
    start_zk
}
function conn_cluster(){
    zkCli.sh -server $ZOOKEEPERS
}
case "$1" in
    start)
        start_zk
        ;;
    stop)
        stop_zk
        ;;
    status)
        status_zk
        ;;
    restart)
        restart_zk
        ;;
    conn)
        conn_cluster
        ;;
    clean)
        init_clean
        ;;
    init)
        init_zk
        ;;
    *)
        echo "Usage: $0 {start|stop|restart|status|init|clean|conn}"
        RETVAL=1
esac
exit $RETVAL

二、Kafka集群装备SASL

1.创建kafka_server_jaas.conf文件,该文件名能够自己修正,为kafka增加认证信息

内容如下(这里的Client与Zookeeper相对应,KafkaServer与后边调用时读取的KafkaClient相对应,是消费出产的账号暗码,不要弄混了):

KafkaServer {
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="kafkacluster"
 password="kafkaclusterpasswd"
 user_kafkacluster="kafkaclusterpasswd"
 user_admin="adminpasswd"
 user_consumer="consumerpasswd"
 user_producer="producerpasswd";
};
Client{
 org.apache.kafka.common.security.plain.PlainLoginModule required  
 username="kafka"  
 password="zk_kafka_client_passwd";  
};

  KafkaServer模块,运用user_{name}来界说多个用户,供客户端程序(出产者、顾客程序或kafka broker之间)认证运用,能够界说多个,后续装备或许还能够依据不同的用户界说不同的ACL权限。上例我界说了三个用户,一个是admin,一个是kafkacluster,一个是producer,一个是consumer,等号后边是对应用户的暗码(如user_producer界说了用户名为producer,暗码为producerpasswd的用户)。再选择一个用户,用于Kafka内部的各个broker之间通讯,这里我选择kafkacluster用户,对应的暗码是”kafkaclusterpasswd”。

  Client模块,首要是kafka broker链接到zookeeper,最为zookeeper的一个客户端。从上文的Zookeeper JAAS文件中选择一个用户kafka,暗码为”zk_kafka_client_passwd”。

2.在Kafka server.properties增加、修正如下信息

# Set listeners
listeners=SASL_PLAINTEXT://10.194.202.17:8092
## Set protocol
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
###security.inter.broker.protocol和inter.broker.listener.name 只能设置一个,不然报错
security.inter.broker.protocol=SASL_PLAINTEXT
# set acl
## default false | true to accept  all the users to use it
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
##增加超级用户,超级用户有必要在kafka_server_jaas.conf文件 KafkaServer模块授权,如:user_admin="adminpasswd"
super.users=User:admin;User:kafkacluster

3.Kafka发动脚本中参加装备,把第一步创建的文件 kafka_server_jaas.conf 加载到发动环境中

修正kafka的kafka-server-start.sh文件

修正 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
为 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf"

或许 在发动kafka之前履行一下脚原本加载到发动环境中(本人是选用的这种做法)

export KAFKA_HEAP_OPTS=$KAFKA_HEAP_OPTS" -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf"

4.发动Kafka服务

kafka server.properties 完好装备

# Set listeners
listeners=SASL_PLAINTEXT://10.194.202.17:8092
## Set protocol
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
###security.inter.broker.protocol和inter.broker.listener.name 只能设置一个,不然报错
security.inter.broker.protocol=SASL_PLAINTEXT
# set acl
# default false | true to accept  all the users to use it
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#增加超级用户,超级用户有必要在kafka_server_jaas.conf文件 KafkaServer模块授权,如:user_admin="adminpasswd"
super.users=User:admin;User:kafkacluster
log.retention.hours=168
zookeeper.connect=10.194.202.17:8181,10.194.202.17:8182,10.194.202.17:8183/kafka
compression.type=producer
delete.topic.enable=true
log.dirs=/home/hadoop/kafka/data
log.dir=/home/hadoop/kafka/data
num.partitions=1
min.insync.replicas=1
default.replication.factor=3
auto.create.topics.enable=false
log.retention.check.interval.ms=300000

发动

kafka-server-start.sh  $KAFKA_HOME/config/server.properties

三、客户端调用认证

因为运用consumer和producer用户之前先需求ACL授权,ACL授权将在下面解说,在这里先运用超级用户:admin,超级用户不受ACL权限操控。

指令行脚本客户端拜访

有以下三种方法:

装备 kafka_client_jaas.conf 文件

KafkaClient{
 org.apache.kafka.common.security.plain.PlainLoginModule required  
 username="admin"  
 password="adminpasswd";  
};

在发动kafka 出产或许消费之前履行一下脚原本加载到发动环境中

export KAFKA_HEAP_OPTS=$KAFKA_HEAP_OPTS" -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_client_jaas.conf"

consumer、producer 发动脚本

#consumer
kafka-console-consumer.sh --bootstrap-server 10.194.202.17:8092 
--consumer-property security.protocol=SASL_PLAINTEXT 
--consumer-property sasl.mechanism=PLAIN 
--consumer-property client.id=consumer_01 
--topic test.1 
--group c1
#producer
kafka-console-producer.sh --broker-list 10.194.202.17:8092 
--consumer-property security.protocol=SASL_PLAINTEXT 
--consumer-property sasl.mechanism=PLAIN 
--consumer-property client.id=producer_01 
--topic test.1

装备sasl.conf文件

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpasswd";

consumer、producer 发动脚本

#consumer
kafka-console-consumer.sh --bootstrap-server 10.194.202.17:8092 
--consumer.config $KAFKA_HOME/config/sasl.conf 
--consumer-property client.id=consumer_01 
--topic test.1 
--group c1
#producer
kafka-console-producer.sh --broker-list 10.194.202.17:8092 
--consumer.config $KAFKA_HOME/config/sasl.conf 
--consumer-property client.id=producer_01 
--topic test.1

拼接sasl认证(本人选用这种方法)

consumer、producer 发动脚本

#consumer
kafka-console-consumer.sh --bootstrap-server 10.194.202.17:8092 
--consumer-property "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='adminpasswd';" 
--consumer-property security.protocol=SASL_PLAINTEXT 
--consumer-property sasl.mechanism=PLAIN 
--consumer-property client.id=consumer_01 
--topic test.1 
--group c1
#producer
kafka-console-producer.sh --broker-list 10.194.202.17:8092 
--consumer-property "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='adminpasswd';" 
--consumer-property security.protocol=SASL_PLAINTEXT 
--consumer-property sasl.mechanism=PLAIN 
--consumer-property client.id=producer_01 
--topic test.1

java客户端拜访

有以下两种方法:

装备 kafka_client_jaas.conf 文件

KafkaClient{
 org.apache.kafka.common.security.plain.PlainLoginModule required  
 username="admin"  
 password="adminpasswd";  
};

在producer 或许 consumer增加以下三项装备:

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
System.setProperty("java.security.auth.login.config", "conf/kafka_client_jaas.conf");

无装备文件形式(本人选用这种装备)

在producer 或许 consumer增加以下三项装备:

props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpasswd";");

demo

package com.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
 * Created by lijianjun 2020/4/15
 */
public class Kafka_Producer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.194.202.17:8092");
        props.put("client.id", "producer_01");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("compression.type", "gzip");//none, gzip, snappy, 或许 lz4. 默许none
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // Java客户端调用认证
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producerpasswd";");
        //System.setProperty("java.security.auth.login.config", "C:/Kafkaanquan/kafka_client_jaas.conf"); 读取装备文件方法,与读取装备文件二选一
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        try {
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<String, String>("test.1", Integer.toString(i), Integer.toString(i))).get();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        producer.close();
    }
}

Kafka 用户权限操控 ACL

  在以上都装备好的基础上,在开端之前,咱们简略学习下Kafka ACL的格局。依据官网的介绍,Kafka中一条ACL的格局如下:“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”。它们的含义描述如下:

  • principal:表明一个Kafka user
  • operation:表明一个详细的操作类型,有效值: Read(读), Write(写), Create(创建), Delete(删去), Alter(修正), Describe(描述), ClusterAction(集群操作), All(一切)
  • Host:表明连向Kafka集群的client的IP地址,假如是‘*’则表明一切IP。留意:当时Kafka不支撑主机名,只能指定IP地址
  • Resource:表明一种Kafka资源类型。当时共有5种类型:TOPIC、CLUSTER、GROUP、transactional-id、delegation-token

检查 acls

检查一切权限

kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka --list

一起也能够指定一种资源类型

kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka --list --topic test.1

增加Acls

==留意:==

  • 增加acl时,有必要指定:–allow-principal,–deny-principal中的一个。
  • 假如不指定IP地址(即–allow-host、–deny-host),默许是一切IP地址

答应producer用户从ip地址(10.194.202.17)对topic(test.1)履行Read和Write操作

kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka 
--add 
--allow-principal User:producer 
--allow-host 10.194.202.17 
--operation Read 
--operation Write 
--topic test.1

指定资源匹配形式

默许是匹配整个文本(默许:LITERAL) <ANY|MATCH|LITERAL|PREFIXED>

kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka 
--add 
--allow-principal User:producer 
--producer --topic test.* 
--resource-pattern-type PREFIXED

增加producer权限

  • –producer 是一种出产端的方便式写法,相当于 –operation WRITE –operation WRITE –operation CREATE,包含的权限有:WRITE、WRITE、CREATE
答应producer从一切IP地址对topic(test.1)履行出产数据操作
kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka 
--add 
--allow-principal User:producer 
--producer 
--topic test.1
答应producer从ip地址(172.30.23.154)对topic(test.1)履行出产数据操作
kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka 
--add 
--allow-principal User:producer 
--allow-host 172.30.23.154 
--producer 
--topic test.1
拒绝consumer用户对topic(test.1) 履行写权限
kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka 
--add 
--deny-principal User:consumer 
--operation Write 
--topic test.1

增加consumer权限

  • –consumer 是一种消费端的方便式写法,相当于 –operation READ –operation WRITE ,包含的权限有:WRITE、WRITE
答应consumer用户从一切IP地址对topic(test.1)履行消费数据操作
kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka 
--add 
--allow-principal User:consumer 
--consumer 
--topic test.1 
--group c1 
经过 –operation 指定消费权限最为灵敏

==留意:== –topic与–group,为两个不同的资源,假如资源匹配形式不同,则需求分开指定

  1. consumer用户匹配消费以test.为前缀的topic
kafka-acls.sh --authorizer-properties 
zookeeper.connect=10.194.202.17:8181/kafka 
--add 
--allow-principal User:consumer 
--operation Read --operation Describe 
--topic test. 
--resource-pattern-type PREFIXED
  1. consumer用户只能经过groupid(c1)消费数据
kafka-acls.sh --authorizer-properties 
zookeeper.connect=10.194.202.17:8181/kafka 
--add --allow-principal User:consumer 
--operation Read 
--operation Describe 
--group c1 
--resource-pattern-type LITERAL

移除 Acls

删去不同的资源及资源类型

删去group资源

删去–group(消费组)资源,资源类型为LITERAL,c1的一切权限

kafka-acls.sh --authorizer-properties 
zookeeper.connect=10.194.202.17:8181/kafka 
--remove --group c1 
--resource-pattern-type LITERAL
Are you sure you want to delete all ACLs for resource filter 
`ResourcePattern(resourceType=GROUP, name=c1, patternType=LITERAL)`? (y/n)

删去topic资源

删去–topic(主题)资源,资源类型为LITERAL,topic(test.1)下面的一切权限

kafka-acls.sh --authorizer-properties 
zookeeper.connect=10.194.202.17:8181/kafka 
--remove --topic test.1 
--resource-pattern-type LITERAL
Are you sure you want to delete all ACLs for resource filter 
`ResourcePattern(resourceType=TOPIC, name=test.1, patternType=LITERAL)`? (y/n)

不同的资源类型

删去–topic(主题)资源,资源类型为PREFIXED,以(test.)为前缀的topic下面的一切权限

kafka-acls.sh --authorizer-properties 
zookeeper.connect=10.194.202.17:8181/kafka 
--remove --topic test. 
--resource-pattern-type PREFIXED
Are you sure you want to delete all ACLs for resource filter 
`ResourcePattern(resourceType=TOPIC, name=test., patternType=PREFIXED)`? (y/n)

删去等级操控

经过是否制定用户(–allow-principal)来操控移除ACL的权限粒度

细粒度的删去

==留意:==

  • 假如不指定IP地址(即–allow-host、–deny-host),默许是一切IP地址
  • 假如不指定操作类型(即–operation),默许是ALL
不指定IP地址和操作类型
kafka-acls.sh --authorizer-properties 
zookeeper.connect=10.194.202.17:8181/kafka 
--remove 
--topic test.1 
--allow-principal User:user01 
--resource-pattern-type LITERAL
Are you sure you want to remove ACLs: 
 	(principal=User:user01, host=*, operation=ALL, permissionType=ALLOW) 
 from resource filter `ResourcePattern(resourceType=TOPIC, name=test.1, patternType=LITERAL)`? (y/n)
指定IP地址和操作类型
kafka-acls.sh --authorizer-properties 
zookeeper.connect=10.194.202.17:8181/kafka 
--remove 
--topic test.1 
--allow-principal User:user01 
--allow-host 10.194.202.17 
--operation Create 
--resource-pattern-type LITERAL
Are you sure you want to remove ACLs: 
 	(principal=User:user01, host=10.194.202.17, operation=CREATE, permissionType=ALLOW) 
 from resource filter `ResourcePattern(resourceType=TOPIC, name=test.1, patternType=LITERAL)`? (y/n)

粗粒度的删去

==留意:==

  • 假如不指定用户(即–allow-principal、–deny-principal),则指定 IP地址和操作类型无效。 删去只能经过资源类型、名称和资源匹配类型三个条件来删去ACLS权限, 会删去此资源类型、名称、和资源匹配类型下的一切的权限,所以请谨慎操作。
不指定用户,指定IP地址和操作类型
kafka-acls.sh --authorizer-properties 
zookeeper.connect=10.194.202.17:8181/kafka 
--remove 
--topic test.1 
--allow-host 10.194.202.17 
--operation Create 
--resource-pattern-type LITERAL
Are you sure you want to delete all ACLs for resource filter 
`ResourcePattern(resourceType=TOPIC, name=test.1, patternType=LITERAL)`? (y/n)
不指定用户,不指定IP地址和操作类型
kafka-acls.sh --authorizer-properties 
zookeeper.connect=10.194.202.17:8181/kafka 
--remove 
--topic test.1 
--resource-pattern-type LITERAL
Are you sure you want to delete all ACLs for resource filter 
`ResourcePattern(resourceType=TOPIC, name=test.1, patternType=LITERAL)`? (y/n)

装备SASL_PLAINTEXT认证和ACLS授权常见问题

kafka发动报错

问题1

[Controller id=11, targetBrokerId=11] Failed authentication with nj03-ns-passport-201611-c2c-m12-09.nj03.baidu.com/10.194.202.17 (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)

原因: SASL_PLAINTEXT认证时用户暗码验证不经过,即 kafka_server_jaas.conf 文件中KafkaServer 模块 Broker与Broker之间sasl用户暗码验证不经过,如下:

username="kafkacluster1"
password="kafkaclusterpasswd"
user_kafkacluster="kafkaclusterpasswd"

问题2

ERROR [KafkaApi-11] Error when handling request: clientId=11, correlationId=0, api=UPDATE_METADATA, version=6, body={controller_id=11,controller_epoch=1,broker_epoch=4294970758,topic_states=[],live_brokers=[{id=11,endpoints=[{port=8092,host=10.194.202.17,listener=SASL_PLAINTEXT,security_protocol=2,_tagged_fields={}}],rack=null,_tagged_fields={}}],_tagged_fields={}} (kafka.server.KafkaApis) org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=0, connectionId=10.194.202.17:8092-10.194.202.17:59100-0, session=Session(User:kafkacluster,/10.194.202.17), listenerName=ListenerName(SASL_PLAINTEXT), securityProtocol=SASL_PLAINTEXT, buffer=null) is not authorized

原因: kafka_server_jaas.conf 文件中KafkaServer 模块 Broker与Broker之间sasl用户暗码验证正确,但acls 授权没经过。 需求增加超级用户,超级用户不需求acls授权,在server.properties文件中增加kafkacluster用户,如下:

super.users=User:admin;User:kafkacluster

kafka 客户端(包含consumer和producer)衔接报错

问题1

[2020-04-17 16:17:20,986] WARN [Consumer clientId=consumer_01, groupId=c1] Bootstrap broker 10.194.202.17:8092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2020-04-17 16:17:20,989] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password

原因: SASL_PLAINTEXT认证时用户暗码验证不经过,即 kafka_server_jaas.conf 文件中KafkaServer 模块 client与Broker之间sasl用户暗码验证不经过。 检查如下用户名暗码:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='consumer' password='consumerpasswd';

问题2

[2020-04-17 16:17:01,166] WARN [Consumer clientId=consumer_01, groupId=c1] Error while fetching metadata with correlation id 2 : {test.1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2020-04-17 16:17:01,172] ERROR [Consumer clientId=consumer_01, groupId=c1] Topic authorization failed for topics [test.1] (org.apache.kafka.clients.Metadata)
[2020-04-17 16:17:01,174] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test.1]

原因: SASL_PLAINTEXT认证时用户暗码验证正确,但acls 授权没经过,即此用户没有拜访此topic(test.1)的权限,请联系办理员对用户授权

参考: Apache Kafka Kafka 中文文档 – ApacheCN