Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses zookeeper to store all the acls. The Authorizer is configured by setting authorizer.class.name in server.properties. To enable the out of the box implementation use:
Kafka acls are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H on any Resource R matching ResourcePattern RP”. You can read more about the acl structure in KIP-11 and resource patterns in KIP-290. In order to add, remove or list acls you can use the Kafka authorizer CLI. By default, if no ResourcePatterns match a specific Resource R, then R has no associated acls, and therefore no one other than super users is allowed to access R. If you want to change that behavior, you can include the following in server.properties.
One can also add super users in server.properties like the following (note that the delimiter is semicolon since SSL user names may contain comma). Default PrincipalType string “User” is case sensitive.
By default, the SSL user name will be of the form “CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”. One can change that by setting
ssl.principal.mapping.rules to a customized rule in server.properties. This config allows a list of rules for mapping X.500 distinguished name to short name. The rules are evaluated in order and the first rule that matches a distinguished name is used to map it to a short name. Any later rules in the list are ignored.
The format of
ssl.principal.mapping.rules is a list where each rule starts with “RULE:” and contains an expression as the following formats. Default rule will return string representation of the X.500 certificate distinguished name. If the distinguished name matches the pattern, then the replacement command will be run over the name. This also supports lowercase/uppercase options, to force the translated result to be all lower/uppercase case. This is done by adding a “/L” or “/U’ to the end of the rule.
ssl.principal.mapping.rules values are:
RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/, RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L, RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L, DEFAULT
Above rules translate distinguished name “CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown” to “serviceuser” and “CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown” to “adminuser@admin”.
For advanced use cases, one can customize the name by setting a customized PrincipalBuilder in server.properties like the following.
By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting
sasl.kerberos.principal.to.local.rules to a customized rule in server.properties. The format of
sasl.kerberos.principal.to.local.rules is a list where each rule works in the same way as the auth_to_local in Kerberos configuration file (krb5.conf). This also support additional lowercase/uppercase rule, to force the translated result to be all lowercase/uppercase. This is done by adding a “/L” or “/U” to the end of the rule. check below formats for syntax. Each rules starts with RULE: and contains an expression as the following formats. See the kerberos documentation for more details.
RULE:[n:string](regexp)s/pattern/replacement/ RULE:[n:string](regexp)s/pattern/replacement/g RULE:[n:string](regexp)s/pattern/replacement//L RULE:[n:string](regexp)s/pattern/replacement/g/L RULE:[n:string](regexp)s/pattern/replacement//U RULE:[n:string](regexp)s/pattern/replacement/g/U
An example of adding a rule to properly translate user@MYDOMAIN.COM to user while also keeping the default rule in place is:
Kafka Authorization management CLI can be found under bin directory with all the other CLIs. The CLI script is called kafka-acls.sh. Following lists all the options that the script supports:
|–add||Indicates to the script that user is trying to add an acl.||Action|
|–remove||Indicates to the script that user is trying to remove an acl.||Action|
|–list||Indicates to the script that user is trying to list acls.||Action|
|–authorizer||Fully qualified class name of the authorizer.||kafka.security.authorizer.AclAuthorizer||Configuration|
|–authorizer-properties||key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181||Configuration|
|–bootstrap-server||A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of –bootstrap-server or –authorizer option must be specified.||Configuration|
|–command-config||A property file containing configs to be passed to Admin Client. This option can only be used with –bootstrap-server option.||Configuration|
|–cluster||Indicates to the script that the user is trying to interact with acls on the singular cluster resource.||ResourcePattern|
|–topic [topic-name]||Indicates to the script that the user is trying to interact with acls on topic resource pattern(s).||ResourcePattern|
|–group [group-name]||Indicates to the script that the user is trying to interact with acls on consumer-group resource pattern(s)||ResourcePattern|
|–transactional-id [transactional-id]||The transactionalId to which ACLs should be added or removed. A value of * indicates the ACLs should apply to all transactionalIds.||ResourcePattern|
|–delegation-token [delegation-token]||Delegation token to which ACLs should be added or removed. A value of * indicates ACL should apply to all tokens.||ResourcePattern|
|–resource-pattern-type [pattern-type]||Indicates to the script the type of resource pattern, (for –add), or resource pattern filter, (for –list and –remove), the user wishes to use.|
When adding acls, this should be a specific pattern type, e.g. ‘literal’ or ‘prefixed’.
When listing or removing acls, a specific pattern type filter can be used to list or remove acls from a specific type of resource pattern, or the filter values of ‘any’ or ‘match’ can be used, where ‘any’ will match any pattern type, but will match the resource name exactly, and ‘match’ will perform pattern matching to list or remove all acls that affect the supplied resource(s).
WARNING: ‘match’, when used in combination with the ‘–remove’ switch, should be used with care.
|–allow-principal||Principal is in PrincipalType:name format that will be added to ACL with Allow permission. Default PrincipalType string “User” is case sensitive. |
You can specify multiple –allow-principal in a single command.
|–deny-principal||Principal is in PrincipalType:name format that will be added to ACL with Deny permission. Default PrincipalType string “User” is case sensitive. |
You can specify multiple –deny-principal in a single command.
|–principal||Principal is in PrincipalType:name format that will be used along with –list option. Default PrincipalType string “User” is case sensitive. This will list the ACLs for the specified principal. |
You can specify multiple –principal in a single command.
|–allow-host||IP address from which principals listed in –allow-principal will have access.||if –allow-principal is specified defaults to * which translates to “all hosts”||Host|
|–deny-host||IP address from which principals listed in –deny-principal will be denied access.||if –deny-principal is specified defaults to * which translates to “all hosts”||Host|
|–operation||Operation that will be allowed or denied.|
Valid values are:ReadWriteCreateDeleteAlterDescribeClusterActionDescribeConfigsAlterConfigsIdempotentWriteAll
|–producer||Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE and CREATE on topic.||Convenience|
|–consumer||Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group.||Convenience|
|–idempotent||Enable idempotence for the producer. This should be used in combination with the –producer option.|
Note that idempotence is enabled automatically if the producer is authorized to a particular transactional-id.
|–force||Convenience option to assume yes to all queries and do not prompt.||Convenience|
|–zk-tls-config-file||Identifies the file where ZooKeeper client TLS connectivity properties for the authorizer are defined. Any properties other than the following (with or without an “authorizer.” prefix) are ignored: zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable, zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type||Configuration|
- Adding Acls
Suppose you want to add an acl “Principals User:Bob and User:Alice are allowed to perform Operation Read and Write on Topic Test-Topic from IP 198.51.100.0 and IP 198.51.100.1”. You can do that by executing the CLI with following options:bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –add –allow-principal User:Bob –allow-principal User:Alice –allow-host 198.51.100.0 –allow-host 198.51.100.1 –operation Read –operation Write –topic Test-topicBy default, all principals that don’t have an explicit acl that allows access for an operation to a resource are denied. In rare cases where an allow acl is defined that allows access to all but some principal we will have to use the –deny-principal and –deny-host option. For example, if we want to allow all users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we can do so using following commands:bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –add –allow-principal User:* –allow-host * –deny-principal User:BadBob –deny-host 198.51.100.3 –operation Read –topic Test-topicNote that
--deny-hostonly support IP addresses (hostnames are not supported). Above examples add acls to a topic by specifying –topic [topic-name] as the resource pattern option. Similarly user can add acls to cluster by specifying –cluster and to a consumer group by specifying –group [group-name]. You can add acls on any resource of a certain type, e.g. suppose you wanted to add an acl “Principal User:Peter is allowed to produce to any Topic from IP 184.108.40.206” You can do that by using the wildcard resource ‘*’, e.g. by executing the CLI with following options:bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –add –allow-principal User:Peter –allow-host 220.127.116.11 –producer –topic *You can add acls on prefixed resource patterns, e.g. suppose you want to add an acl “Principal User:Jane is allowed to produce to any Topic whose name starts with ‘Test-‘ from any host”. You can do that by executing the CLI with following options:bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –add –allow-principal User:Jane –producer –topic Test- –resource-pattern-type prefixedNote, –resource-pattern-type defaults to ‘literal’, which only affects resources with the exact same name or, in the case of the wildcard resource name ‘*’, a resource with any name.
- Removing Acls
Removing acls is pretty much the same. The only difference is instead of –add option users will have to specify –remove option. To remove the acls added by the first example above we can execute the CLI with following options: bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –remove –allow-principal User:Bob –allow-principal User:Alice –allow-host 198.51.100.0 –allow-host 198.51.100.1 –operation Read –operation Write –topic Test-topic If you want to remove the acl added to the prefixed resource pattern above we can execute the CLI with following options: bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –remove –allow-principal User:Jane –producer –topic Test- –resource-pattern-type Prefixed
- List Acls
We can list acls for any resource by specifying the –list option with the resource. To list all acls on the literal resource pattern Test-topic, we can execute the CLI with following options:bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –list –topic Test-topicHowever, this will only return the acls that have been added to this exact resource pattern. Other acls can exist that affect access to the topic, e.g. any acls on the topic wildcard ‘*’, or any acls on prefixed resource patterns. Acls on the wildcard resource pattern can be queried explicitly:bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –list –topic *However, it is not necessarily possible to explicitly query for acls on prefixed resource patterns that match Test-topic as the name of such patterns may not be known. We can list all acls affecting Test-topic by using ‘–resource-pattern-type match’, e.g.bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –list –topic Test-topic –resource-pattern-type matchThis will list acls on all matching literal, wildcard and prefixed resource patterns.
- Adding or removing a principal as producer or consumer
The most common use case for acl management are adding/removing a principal as producer or consumer so we added convenience options to handle these cases. In order to add User:Bob as a producer of Test-topic we can execute the following command: bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –add –allow-principal User:Bob –producer –topic Test-topicSimilarly to add Alice as a consumer of Test-topic with consumer group Group-1 we just have to pass –consumer option: bin/kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –add –allow-principal User:Bob –consumer –topic Test-topic –group Group-1 Note that for consumer option we must also specify the consumer group. In order to remove a principal from producer or consumer role we just need to pass –remove option.
- Admin API based acl management
Users having Alter permission on ClusterResource can use Admin API for ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting with zookeeper/authorizer directly. All the above examples can be executed by using –bootstrap-server option. For example:
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic
Protocol calls are usually performing some operations on certain resources in Kafka. It is required to know the operations and resources to set up effective protection. In this section we’ll list these operations and resources, then list the combination of these with the protocols to see the valid scenarios.
There are a few operation primitives that can be used to build up privileges. These can be matched up with certain resources to allow specific protocol calls for a given user. These are:
The operations above can be applied on certain resources which are described below.
- Topic: this simply represents a Topic. All protocol calls that are acting on topics (such as reading, writing them) require the corresponding privilege to be added. If there is an authorization error with a topic resource, then a TOPIC_AUTHORIZATION_FAILED (error code: 29) will be returned.
- Group: this represents the consumer groups in the brokers. All protocol calls that are working with consumer groups, like joining a group must have privileges with the group in subject. If the privilege is not given then a GROUP_AUTHORIZATION_FAILED (error code: 30) will be returned in the protocol response.
- Cluster: this resource represents the cluster. Operations that are affecting the whole cluster, like controlled shutdown are protected by privileges on the Cluster resource. If there is an authorization problem on a cluster resource, then a CLUSTER_AUTHORIZATION_FAILED (error code: 31) will be returned.
- TransactionalId: this resource represents actions related to transactions, such as committing. If any error occurs, then a TRANSACTIONAL_ID_AUTHORIZATION_FAILED (error code: 53) will be returned by brokers.
- DelegationToken: this represents the delegation tokens in the cluster. Actions, such as describing delegation tokens could be protected by a privilege on the DelegationToken resource. Since these objects have a little special behavior in Kafka it is recommended to readKIP-48 and the related upstream documentation at Authentication using Delegation Tokens.
In the below table we’ll list the valid operations on resources that are executed by the Kafka API protocols.
|PROTOCOL (API KEY)||OPERATION||RESOURCE||NOTE|
|PRODUCE (0)||Write||TransactionalId||An transactional producer which has its transactional.id set requires this privilege.|
|PRODUCE (0)||IdempotentWrite||Cluster||An idempotent produce action requires this privilege.|
|PRODUCE (0)||Write||Topic||This applies to a normal produce action.|
|FETCH (1)||ClusterAction||Cluster||A follower must have ClusterAction on the Cluster resource in order to fetch partition data.|
|FETCH (1)||Read||Topic||Regular Kafka consumers need READ permission on each partition they are fetching.|
|METADATA (3)||Create||Cluster||If topic auto-creation is enabled, then the broker-side API will check for the existence of a Cluster level privilege. If it’s found then it’ll allow creating the topic, otherwise it’ll iterate through the Topic level privileges (see the next one).|
|METADATA (3)||Create||Topic||This authorizes auto topic creation if enabled but the given user doesn’t have a cluster level permission (above).|
|OFFSET_COMMIT (8)||Read||Group||An offset can only be committed if it’s authorized to the given group and the topic too (see below). Group access is checked first, then Topic access.|
|OFFSET_COMMIT (8)||Read||Topic||Since offset commit is part of the consuming process, it needs privileges for the read action.|
|OFFSET_FETCH (9)||Describe||Group||Similarly to OFFSET_COMMIT, the application must have privileges on group and topic level too to be able to fetch. However in this case it requires describe access instead of read. Group access is checked first, then Topic access.|
|FIND_COORDINATOR (10)||Describe||Group||The FIND_COORDINATOR request can be of “Group” type in which case it is looking for consumergroup coordinators. This privilege would represent the Group mode.|
|FIND_COORDINATOR (10)||Describe||TransactionalId||This applies only on transactional producers and checked when a producer tries to find the transaction coordinator.|
|LIST_GROUPS (16)||Describe||Cluster||When the broker checks to authorize a list_groups request it first checks for this cluster level authorization. If none found then it proceeds to check the groups individually. This operation doesn’t return CLUSTER_AUTHORIZATION_FAILED.|
|LIST_GROUPS (16)||Describe||Group||If none of the groups are authorized, then just an empty response will be sent back instead of an error. This operation doesn’t return CLUSTER_AUTHORIZATION_FAILED. This is applicable from the 2.1 release.|
|SASL_HANDSHAKE (17)||The SASL handshake is part of the authentication process and therefore it’s not possible to apply any kind of authorization here.|
|API_VERSIONS (18)||The API_VERSIONS request is part of the Kafka protocol handshake and happens on connection and before any authentication. Therefore it’s not possible to control this with authorization.|
|CREATE_TOPICS (19)||Create||Cluster||If there is no cluster level authorization then it won’t return CLUSTER_AUTHORIZATION_FAILED but fall back to use topic level, which is just below. That’ll throw error if there is a problem.|
|CREATE_TOPICS (19)||Create||Topic||This is applicable from the 2.0 release.|
|OFFSET_FOR_LEADER_EPOCH (23)||ClusterAction||Cluster||If there is no cluster level privilege for this operation, then it’ll check for topic level one.|
|OFFSET_FOR_LEADER_EPOCH (23)||Describe||Topic||This is applicable from the 2.1 release.|
|ADD_PARTITIONS_TO_TXN (24)||Write||TransactionalId||This API is only applicable to transactional requests. It first checks for the Write action on the TransactionalId resource, then it checks the Topic in subject (below).|
|ADD_OFFSETS_TO_TXN (25)||Write||TransactionalId||Similarly to ADD_PARTITIONS_TO_TXN this is only applicable to transactional request. It first checks for Write action on the TransactionalId resource, then it checks whether it can Read on the given group (below).|
|DESCRIBE_CONFIGS (32)||DescribeConfigs||Cluster||If broker configs are requested, then the broker will check cluster level privileges.|
|DESCRIBE_CONFIGS (32)||DescribeConfigs||Topic||If topic configs are requested, then the broker will check topic level privileges.|
|ALTER_CONFIGS (33)||AlterConfigs||Cluster||If broker configs are altered, then the broker will check cluster level privileges.|
|ALTER_CONFIGS (33)||AlterConfigs||Topic||If topic configs are altered, then the broker will check topic level privileges.|
|DESCRIBE_LOG_DIRS (35)||Describe||Cluster||An empty response will be returned on authorization failure.|
|SASL_AUTHENTICATE (36)||SASL_AUTHENTICATE is part of the authentication process and therefore it’s not possible to apply any kind of authorization here.|
|CREATE_DELEGATION_TOKEN (38)||Creating delegation tokens has special rules, for this please see the Authentication using Delegation Tokenssection.|
|RENEW_DELEGATION_TOKEN (39)||Renewing delegation tokens has special rules, for this please see theAuthentication using Delegation Tokens section.|
|EXPIRE_DELEGATION_TOKEN (40)||Expiring delegation tokens has special rules, for this please see the Authentication using Delegation Tokenssection.|
|DESCRIBE_DELEGATION_TOKEN (41)||Describe||DelegationToken||Describing delegation tokens has special rules, for this please see theAuthentication using Delegation Tokens section.|
|INCREMENTAL_ALTER_CONFIGS (44)||AlterConfigs||Cluster||If broker configs are altered, then the broker will check cluster level privileges.|
|INCREMENTAL_ALTER_CONFIGS (44)||AlterConfigs||Topic||If topic configs are altered, then the broker will check topic level privileges.|