Kafka+zookeeper安全认证技术介绍
Kafka 目前支持多种认证方式,生产环境常见应用的SASL有以下几种:
- SSL/TLS认证:本认证是基于SSL/TLS的加密方式,使用SSL/TLS证书对客户端和服务器进行身份验证,确保在传输层上安全保证数据安全可靠。
- SASL/PLAIN认证:含义是简单身份验证和授权层应用程序接口,PLAIN认证是其中一种最简单的用户名、密码认证方式,生产环境使用维护简单易用。可用于Kafka和其他应用程序之间的认证。
- SASL/SCRAM认证:SCRAM-SHA-256、SCRAM-SHA-512方式认证,本认证需要客户端、服务器共同协同完成认证过程,使用和维护上较为复杂。优势是可动态增加用户,而不必重启kafka组件服务端。
- SASL/GSSAPI 认证:Kerberos认证,本认证适用于大型公司企业生产环境,通常结合Kerberos协议使用。使用Kerberos认证可集成目录服务,比如AD。通过本认证机制可实现优秀的安全性和良好的用户体验。
- Kafka 自带ACL访问控制方式:Kafka提供了ACL(Access Control Lists)来控制用户对特定Topic或Topic Partition的访问权限,实现业务数据的安全性。
企业在选择认证方式时,需要结合业务特点,考虑到企业数据安全性,又要考虑部署后的性能和部署复杂度等因素。在生产实际部署时,可以根据企业安全政策、网络因素、对性能的考量等因素来正确选择合适的认证方式,确保选型的认证机制符合本企业需求。
环境配置
本方案需要测试环境主机裸金属环境为3台主机,
IPx.x.x.15-17
Zookeeper版本:apache-zookeeper-3.6.4-bin
Kafka版本:kafka_2.13-2.7.1
由于登录Kafka、zookeeper认证会严重依赖服务器主机名,所以需先配置/etc/hosts。
设置每个主机的主机名,并确保每台主机名不重复,执行
hostnamectl set-hostname kafka1 && bash
cat >>/etc/hosts <<EOF
x.x.x.15 kafka1
x.x.x.16 kafka2
x.x.x.17 kafka3
EOF
配置SASL/PLAIN+ACL认证
kafka认证配置
目前需要编辑两个文件,一个是server.properties,另一个是kafka_server_jaas.conf。server.properties是kafka的主要配置文件,里面有很多参数可以调整。kafka_server_jaas.conf是sasl认证的配置文件,里面定义了认证的方式和用户信息。这两个文件都在kafka的config目录下。或者把kafka_server_jaas.conf设置在单独路径,但必须在kafka脚本里添加该路径变量。
Kafka的broker端配置:
若要开启SASL机制,需要在broker服务端进行三个方面的设置:
第一:配置“server.properties”的文件内容
第二:编写JAAS配置文件kafka_server_jaas.conf
第三:修改broker启动脚本等各种脚本,并指定JAAS的配置文件kafka_server_jaas.conf存放路径
编写broker服务端server.properties的SASL+ACL配置
分别在3台kafka节点修改,下面参数为SASL相关参数,其他参数正常配置。
vim server.properties
listeners=SASL_PLAINTEXT://x.x.x.15:9092 security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:sdx
参数解释:
Listeners:SASL端口,sasl_plaintext监听器,表示使用sasl和明文传输
security.inter.broker.protocol: 表示Broker间通信使用SASL协议认证
sasl.enabled.mechanisms:认证参数。kafka启用的sasl认证方式
sasl.mechanism.inter.broker.protocol:表示Broker间通信启用PLAIN机制
authorizer.class.name:启用ACL机制,设置kafka的ACL的授权类
super.users:指定超级用户,超级用户不需要进行ACL鉴权,拥有最高权限
全配置文件示例如下:
broker.id=11
zookeeper.request.timeout=30000
listeners=SASL_PLAINTEXT://x.x.x.15:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:sdx
delete.topic.enable=true
log.index.size.max.bytes=1024000
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data1/kafka
log.index.size.max.bytes=1024000
log.index.interval.bytes=4000000
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=x.x.x.15:2181,x.x.x.16:2181,x.x.x.17:2181/kafka
zookeeper.connection.timeout.ms=180000
zookeeper.session.timeout.ms=60000
zookeeper.sync.time.ms =2000
group.initial.rebalance.delay.ms=0
编写broker端JAAS配置文件kafka_server_jaas.conf
创建包含所有客户端连接kafka认证用户信息的JAAS文件,认证用户可根据业务需要任意增加用户数量。
创建存放各种jaas配置文件目录
mkdir /usr/local/kafka_2.13-2.7.1/else_config
文件内容如下:
vim /usr/local/kafka_2.13-2.7.1/else_config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="sdx" #kafka超级管理员,为broker间通信使用
password="sdx123" #kafka超级管理员密码
user_pj="sdx123" #客户端连kafka认证使用的用户sdx,密码sdx123
user_usera="usera1" #客户端连kafka认证使用的用户usera,密码usera1。
user_userb="userb2"; #用户userb,密码userb2。末尾有分号,别漏掉!
}; #末尾有分号,别漏掉
Client { # Client是broker作为客户端连接zk服务端的认证org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kz" # broker连接zookeeper认证的用户password="kz123"; #broker连接zk认证的用户密码。末尾有分号别漏掉
}; #末尾有分号,别漏掉
注:
1)、user_name语句,是用来配置客户端连接Broker时使用的用户名和密码,也就是新建用户的用户名都以user_开头,等号后面是密码,密码可以是明文且没有复杂度要求。完整语句是user_USERNAME="PASSWORD"。
2)、规划用户usera给生产者使用,用户userb给消费者使用。
3.1.3 修改broker启动脚本,并指定JAAS的配置文件存放路径
下面黄色标识为新增变量,位置如下
vim /usr/local/kafka_2.13-2.7.1/bin/kafka-run-class.sh
(( CYGWIN )) && CLASSPATH=$(cygpath --path --mixed "${CLASSPATH}")
KAFKA_OPTS='-Djava.security.auth.login.config=/usr/local/kafka_2.13-2.7.1/else_config/kafka_server_jaas.conf'
编辑认证携带文件
因为开启了安全认证,所以执行命令需要携带含有认证用户信息的认证文件。认证文件仍可放在/usr/local/kafka_2.13-2.7.1/else_config路径下。
编写生产者用户usera的认证文件:
vim usera_producer.properties
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \username="usera" \password="usera1";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
usera_producer.properties的使用方法是通过–producer.config参数携带。
编写消费者用户userb的认证文件:
如果后续指定此配置文件无法消费,需要先查出消费者组名称,然后在文件第一行添加group.id参数,并指定消费者组。
vim userb_consumer.properties
#group.id=console-consumer-94652
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \username="userb" \password="userb2";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
userb_consumer.properties的使用方法是通过--consumer.config参数携带。
编写生产者用户usera的认证文件:
vim usera-writer-jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="usera"password="usera1";
};
编写消费者用户userb的认证文件:
vim userb-read-jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="userb"password="userb2";
};
上述usera-writer-jaas.conf和userb-read-jaas.conf文件的使用方法均需通过–command-config参数携带。
分别修改生产者脚本和消费者脚本
vim /usr/local/kafka_2.13-2.7.1/bin/kafka-console-producer.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.13-2.7.1/else_config/usera-writer-jaas.conf"
vim /usr/local/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_2.13-2.7.1/else_config/userb-read-jaas.conf"
重启kafka
如果重启kafka会无法启动,会报与zookeeper认证失败。我们继续完善zookeeper认证策略设置。最后重启kakfa。
zookeeper认证配置
配置认证文件
编辑zookeeper服务端认证文件,Zookeeper服务端启用sasl认证,添加下面3行认证参数,其他参数正常设置。
vim /usr/local/apache-zookeeper-3.6.4-bin/conf/zoo.cfg
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider #authProvider.1表示开启认证功能,zookeeper配置SASL支持
requireClientAuthScheme=sasl #开启SASL的认证方式
zookeeper.sasl.client=true #开启SASL身份验证
Zookeeper配置文件全文如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data1/zookeeper/snapshot
dataLogDir=/data1/zookeeper/datalogdir
4lw.commands.whitelist=*
clientPort=2181
server.11=x.x.x.15:2888:3888
server.22=x.x.x.16:2888:3888
server.33=x.x.x.17:2888:3888
zookeeper.session.timeout.ms=60000
zookeeper.connection.timeout.ms=60000
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
zookeeper.sasl.client=true
jaas用于定义认证身份信息,我们使用LoginModule实现,举例如org.apache.zookeeper.server.auth.DigestLoginModule,它是基于明文用户、密码的形式进行身份认证。
vim /usr/local/apache-zookeeper-3.6.4-bin/conf/zk_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="sdx" #pj用户是zk集群之间认证使用的
password="sdx123" #pj用户密码
user_sdx="sdx123"
user_kz="kz123" #用户kz是客户端broker(或zkCli.sh)与zk之间使用的,密码是kz123要与kafka_server_jaas.conf的Clinet里的用户密码相同。
user_usera="usera1" #生产用户,供生产者程序认证使用
user_userb="userb2"; #消费用户,供消费者程序认证使用
};
导入kafka认证的相关jar到zookeeper
Zookeeper的认证机制是使用插件,所以需要导入Kafka相关jar包,kafka-clients的相关jar包,在kafka服务下的lib目录中可以找到,因kafka版本不同,相关jar包版本会有所变化。
创建存放kafk的jar包目录
/usr/local/apache-zookeeper-3.6.4-bin/lib
从kafka/lib目录下复制以下5个jar包到每个zookeeper主机的lib目录下,包名分别如下:
kafka-clients-2.3.0.jar
lz4-java-1.6.0.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
snappy-java-1.1.7.3.jar
修改zookeeper的zkEnv.sh脚本文件
添加jaas配置文件添加到zookeeper的环境变量,添加如下新增变量SERVER_JVMFLAGS。黄色标识为新增变量。
vim /usr/local/apache-zookeeper-3.6.4-bin/bin/zkEnv.sh
ZOOBINDIR="${ZOOBINDIR:-/usr/bin}"
ZOOKEEPER_PREFIX="${ZOOBINDIR}/.."
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=/usr/local/apache-zookeeper-3.6.4-bin/conf/zk_jaas.conf"
#check to see if the conf dir is given as an optional argument
重启组件
先使用脚本关闭3台kafka进程,再关闭3台zookeeper组件进程,确认进程停止后,最后启动zookeeper进程,再启动3台kakfa进程。
进入zookeeper客户端查看ACL节点
发现会多出与ACL认证授权相关的节点
# /usr/local/apache-zookeeper-3.6.4-bin/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[kafka, zookeeper]
[zk: localhost:2181(CONNECTED) 1] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, kafka-acl, kafka-acl-changes, kafka-acl-extended, kafka-acl-extended-changes, latest_producer_id_block, log_dir_event_notification]
注:kafka-acl节点存储ACL信息。Kafka-acl-changes节点存储ACL变更信息
kafka命令使用方法举例
创建topic和删除topic
需要使用(必须使用)–zookeeper方式创建:
如果使用–bootstrap-server x.x.x.15:9092方式创建,则kafka日志会报Failed authentication
- 不指定用户创建
./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --create --topic abc --replication-factor 3 --partitions 3
- 指定用户创建
./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --create --topic abc --replication-factor 3 --partitions 3 --command-config /usr/local/kafka_2.13-2.7.1/else_config/usera_producer.properties
- 删除topic操作
./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --delete --topic dsc
查询topic
需要使用–zookeeper方式查询
./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --list
./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --describe
改topic分区数量
需要使用 --zookeeper方式
./bin/kafka-topics.sh --zookeeper x.x.x.15:2181/kafka --topic abc --alter --partitions 4
输出如下
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
ACL授权操作
对生产类用户授权
生产用户(如usera)对某topic授权某操作(如生产、写数据)权限:(acl授权过程也可通过业务端java代码实现,也可通过运维手动授权实现)
# ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka --add --allow-principal User:usera --operation Write --topic abc
对消费类用户授权
消费用户(如userb)对某topic、某主机授权某操作(如消费、读数据)权限:(acl授权过程也可通过业务端java代码实现,也可通过运维手动授权实现)
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka --add --allow-principal User:userb --operation Read --topic abc
查看授权情况
./bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=x.x.x.15:2181/kkafka
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=topic-new, patternType=LITERAL)`: (principal=User:usera, host=*, operation=WRITE, permissionType=ALLOW)(principal=User:userb, host=*, operation=READ, permissionType=ALLOW)
对消费者组授权
对消费者组进行acl授权:对消费者组console-consumer-48035认证授权消费topic abc
如果对所有topic和grpup授权,可以用星号* , 如–topic=* --group=*
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka --allow-principal User:userb --consumer --topic=abc --group=console-consumer-48035 --add
查看acl授权信息
也可–topic指定topic,–group指定消费组
./bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=x.x.x.15:2181/kafka
生产和消费
生产
必须使用–bootstrap-server方式生产,必须携带认证文件
./bin/kafka-console-producer.sh --bootstrap-server x.x.x.15:9092 --topic abc --producer.config /usr/local/kafka_2.13-2.7.1/else_config/usera_producer.properties
消费
必须使用–bootstrap-server方式消费,必须携带认证文件
./bin/kafka-console-consumer.sh --bootstrap-server x.x.x.15:9092 --topic abc --from-beginning --consumer.config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties
如果消费报错如 Not authorized to access group: console-consumer-48035,需要对console-consumer-48035消费者组认证。
方式如下:只添加group.id参数即可
vim /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties
group.id=console-consumer-48035
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \username="userb" \password="userb";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
消费者组相关操作
查询消费组–list
查询消费者组(粗查询),需要使用–bootstrap-server
./bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.15:9092 --list --command-config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties
查询消费组–describe
查询消费者组(细查询–group指定消费组 或–all-groups查所有组),需要使用–bootstrap-server查询。
./bin/kafka-consumer-groups.sh --bootstrap-server x.x.x.15:9092 --describe --all-groups --command-config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties
查看消费组消费偏移量
需要使用–bootstrap-server查询。
./bin/kafka-consumer-groups.sh --describe --bootstrap-server x.x.x.15:9092 --group console-consumer-48035 --command-config /usr/local/kafka_2.13-2.7.1/else_config/userb_consumer.properties
查看log日志文件内容
查看Log文件基本数据信息
# /usr/local/kafka_2.13-2.7.1/bin/kafka-dump-log.sh --files /data1/kafka/pctest-1/00000000000000000001.log --print-data-log
Dumping /data1/kafka/pctest-1/00000000000000000001.log
Starting offset: 1
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1715322009004 size: 72 magic: 2 compresscodec: NONE crc: 3420315536 isvalid: true
| offset: 1 CreateTime: 1715322009004 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: test
查看Log文件具体数据信息
# /usr/local/kafka_2.13-2.7.1/bin/kafka-dump-log.sh --files /data1/kafka/aaa-0/00000000000000000000.log --print-data-log
Dumping /data1/kafka/aaa-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 0 CreateTime: 1718268803238 size: 200 magic: 2 compresscodec: NONE crc: 710512664 isvalid: true
| offset: 0 CreateTime: 1718268803238 keysize: -1 valuesize: 130 sequence: -1 headerKeys: [] payload: ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZ
注:payload参数后为实际数据
查看index文件具体内容
./kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.index
offset为索引值,position为具体位置,可以看到大概每隔600条消息,就建立一个索引。配置项为log.index.size.max.bytes,来控制创建索引的大小;
offset: 972865 position: 25163202
offset: 973495 position: 25179579
offset: 974125 position: 25195956
offset: 974755 position: 25212333
offset: 975385 position: 25228710
offset: 976015 position: 25245087
offset: 976645 position: 25261464
offset: 977275 position: 25277841
查看timeindex文件具体内容
./kafka-dump-log.sh --files /tmp/kafka-logs/testTopic-0/00000000000000000000.timeindex
输出如下:
timestamp: 1691292274425 offset: 475709
timestamp: 1691292274426 offset: 476947
timestamp: 1691292274427 offset: 478255
timestamp: 1691292274428 offset: 479543
timestamp: 1691292274429 offset: 480848
timestamp: 1691292274430 offset: 481767
timestamp: 1691292274431 offset: 483209
timestamp: 1691292274432 offset: 484869
timestamp: 1691292274433 offset: 486408
哪里不懂可以在评论中评论~
文档持续更新中~