diff --git a/azpubsub/src/main/scala/com/microsoft/azpubsub/kafka/admin/AzPubSubAclCommand.scala b/azpubsub/src/main/scala/com/microsoft/azpubsub/kafka/admin/AzPubSubAclCommand.scala index ed9c01a1e3fc5..336ad0bf723e2 100644 --- a/azpubsub/src/main/scala/com/microsoft/azpubsub/kafka/admin/AzPubSubAclCommand.scala +++ b/azpubsub/src/main/scala/com/microsoft/azpubsub/kafka/admin/AzPubSubAclCommand.scala @@ -1,6 +1,6 @@ package com.microsoft.azpubsub.kafka.admin import kafka.admin.AclCommand -import kafka.admin.AclCommand.{AclCommandOptions, AdminClientService} +import kafka.admin.AclCommand.{AclCommandOptions, AdminClientService, confirmAction, getResourceFilter, getResourceFilterToAcls} import kafka.utils.{CommandLineUtils, Exit, Json, Logging} import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, AclPermissionType} import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType} @@ -44,6 +44,32 @@ object AzPubSubAclCommand extends Logging { class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminClientService(opts) { + private val Newline = scala.util.Properties.lineSeparator + + override def removeAcls(): Unit = { + withAdminClient(opts) { adminClient => + val filters = getResourceFilter(opts, dieIfNoResourceFound = false) + val resourceToAcls = getAcls(adminClient, filters) + + val filterToAcl = getResourceFilterToAcls(opts) + + for ((filter, acls) <- filterToAcl) { + val filteredPrincipalAclOperationsMap = getPrincipalAclOperationsMap(resourceToAcls, filter) + if (acls.isEmpty) { + if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)")) + removeAcls(adminClient, acls, filter) + } else { + val updatedAcls = dropAclsWithSharedOperation(opts, acls, filteredPrincipalAclOperationsMap) + if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${updatedAcls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)")) { + removeAcls(adminClient, updatedAcls, filter) + } + } + } + + listAcls() + } + } + override def printAcls(filters: Set[ResourcePatternFilter], listPrincipals: Set[KafkaPrincipal], resourceToAcls: Map[ResourcePattern, Set[AccessControlEntry]]): Unit = { if (!opts.options.has(opts.outputAsProducerConsumerOpt)) { super.printAcls(filters, listPrincipals, resourceToAcls) @@ -57,7 +83,7 @@ class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminC val allPrincipalsFilteredResourceToAcls = resourceToAcls.mapValues(acls => acls.filterNot(acl => listPrincipals.forall( principal => !principal.toString.equals(acl.principal)))).filter(entry => entry._2.nonEmpty) - var producerConsumerAclMap = aclToProducerConsumerMapping(allPrincipalsFilteredResourceToAcls) + val producerConsumerAclMap = aclToProducerConsumerMapping(allPrincipalsFilteredResourceToAcls) outputAsJson(producerConsumerAclMap) } } @@ -80,9 +106,9 @@ class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminC } def GetProducerAclOperations(): Set[AclOperation] = { - var dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--producer", "--topic", "Test-topic") - var dummyOpt = new AclCommandOptions(dummyArgs) - var resourceMap = AclCommand.getProducerResourceFilterToAcls(dummyOpt) + val dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--producer", "--topic", "Test-topic") + val dummyOpt = new AclCommandOptions(dummyArgs) + val resourceMap = AclCommand.getProducerResourceFilterToAcls(dummyOpt) for ((key, value) <- resourceMap) { if (key.resourceType() == ResourceType.TOPIC) { var aclOperationList = Set[AclOperation]() @@ -90,13 +116,13 @@ class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminC return aclOperationList } } - return Set[AclOperation]() + Set[AclOperation]() } def GetConsumerAclOperations(): Set[AclOperation] = { - var dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--consumer", "--topic", "Test-topic", "--group", "Test-group") - var dummyOpt = new AclCommandOptions(dummyArgs) - var resourceMap = AclCommand.getConsumerResourceFilterToAcls(dummyOpt) + val dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--consumer", "--topic", "Test-topic", "--group", "Test-group") + val dummyOpt = new AclCommandOptions(dummyArgs) + val resourceMap = AclCommand.getConsumerResourceFilterToAcls(dummyOpt) for ((key, value) <- resourceMap) { if (key.resourceType() == ResourceType.TOPIC) { var aclOperationList = Set[AclOperation]() @@ -104,13 +130,13 @@ class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminC return aclOperationList } } - return Set[AclOperation]() + Set[AclOperation]() } def GetGroupAclOperations(): Set[AclOperation] = { - var dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--consumer", "--topic", "Test-topic", "--group", "Test-group") - var dummyOpt = new AclCommandOptions(dummyArgs) - var resourceMap = AclCommand.getConsumerResourceFilterToAcls(dummyOpt) + val dummyArgs = Array[String]("--bootstrap-server", "localhost:9092", "--add", "--allow-principal", "User:Bob", "--consumer", "--topic", "Test-topic", "--group", "Test-group") + val dummyOpt = new AclCommandOptions(dummyArgs) + val resourceMap = AclCommand.getConsumerResourceFilterToAcls(dummyOpt) for ((key, value) <- resourceMap) { if (key.resourceType() == ResourceType.GROUP) { var aclOperationList = Set[AclOperation]() @@ -118,14 +144,14 @@ class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminC return aclOperationList } } - return Set[AclOperation]() + Set[AclOperation]() } def aclToProducerConsumerMapping(resourceToAcls:Map[ResourcePattern,Set[AccessControlEntry]]):mutable.Map[ResourcePattern,mutable.Set[AzPubSubAccessControlEntry]] = { var producerConsumerGroupAclMap = mutable.Map[ResourcePattern, mutable.Set[AzPubSubAccessControlEntry]]() - var producerAclOperations = GetProducerAclOperations() - var consumerAclOperations = GetConsumerAclOperations() - var groupAclOperations = GetGroupAclOperations() + val producerAclOperations = GetProducerAclOperations() + val consumerAclOperations = GetConsumerAclOperations() + val groupAclOperations = GetGroupAclOperations() resourceToAcls.foreach(resource => { producerConsumerGroupAclMap += (resource._1 -> mutable.Set[AzPubSubAccessControlEntry]()) @@ -140,35 +166,73 @@ class AzPubSubAdminClientService(opts: AzPubSubAclCommandOptions) extends AdminC }) principalAclMap.foreach { case (principal, acls) => { var strayAcls = acls - var filteredAclOperations = mutable.Set[AclOperation]() - var filteredAcls = acls.filter(x => (x.host() == "*" && x.permissionType() == AclPermissionType.ALLOW)) + val filteredAclOperations = mutable.Set[AclOperation]() + val filteredAcls = acls.filter(x => (x.host() == "*" && x.permissionType() == AclPermissionType.ALLOW)) filteredAcls.foreach(x => filteredAclOperations.add(x.operation())) if (resource._1.resourceType() == ResourceType.TOPIC) { if (producerAclOperations.subsetOf(filteredAclOperations)) { strayAcls = strayAcls.filterNot(x => (x.host() == "*" && x.permissionType() == AclPermissionType.ALLOW && producerAclOperations.contains(x.operation()))) - var modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "PRODUCER") + val modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "PRODUCER") producerConsumerGroupAclMap(resource._1).add(modifiedAcl) } if (consumerAclOperations.subsetOf(filteredAclOperations)) { strayAcls = strayAcls.filterNot(x => (x.host() == "*" && x.permissionType() == AclPermissionType.ALLOW && consumerAclOperations.contains(x.operation()))) - var modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "CONSUMER") + val modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "CONSUMER") producerConsumerGroupAclMap(resource._1).add(modifiedAcl) } } else if (resource._1.resourceType() == ResourceType.GROUP) { if (groupAclOperations.subsetOf(filteredAclOperations)) { strayAcls = strayAcls.filterNot(x => (x.host() == "*" && x.permissionType() == AclPermissionType.ALLOW && groupAclOperations.contains(x.operation()))) - var modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "GROUP") + val modifiedAcl = new AzPubSubAccessControlEntry(principal, "*", AclOperation.ANY, AclPermissionType.ALLOW, "GROUP") producerConsumerGroupAclMap(resource._1).add(modifiedAcl) } } strayAcls.foreach(acl => { - var modifiedAcl = new AzPubSubAccessControlEntry(principal, acl.host(), acl.operation(), acl.permissionType(), "NONE") + val modifiedAcl = new AzPubSubAccessControlEntry(principal, acl.host(), acl.operation(), acl.permissionType(), "NONE") producerConsumerGroupAclMap(resource._1).add(modifiedAcl) }) }} }) - return producerConsumerGroupAclMap + producerConsumerGroupAclMap + } + + def dropAclsWithSharedOperation(opt: AzPubSubAclCommandOptions, acls: Set[AccessControlEntry], principalAclOperationsMap: Map[String, Set[AclOperation]]): Set[AccessControlEntry] ={ + val producerAclOperations = GetProducerAclOperations() + val consumerAclOperations = GetConsumerAclOperations() + + var updatedAcls = acls + acls.foreach(acl => { + if (acl.operation() == AclOperation.DESCRIBE) { + if ((opt.options.has(opt.producerOpt) && consumerAclOperations.subsetOf(principalAclOperationsMap(acl.principal()))) + || (opt.options.has(opt.consumerOpt) && producerAclOperations.subsetOf(principalAclOperationsMap(acl.principal())))) { + updatedAcls -= acl + } + } + }) + + updatedAcls + } + + def getPrincipalAclOperationsMap(resourceToAcls: Map[ResourcePattern, Set[AccessControlEntry]], filter: ResourcePatternFilter): Map[String, Set[AclOperation]] ={ + val principalAclOperationsMap = mutable.Map[String, Set[AclOperation]]() + + resourceToAcls.foreach(resource => { + if (resource._1.name() == filter.name() && resource._1.resourceType() == filter.resourceType() && resource._1.patternType() == filter.patternType()) { + resource._2.foreach(acl => { + if (acl.host() == "*" && acl.permissionType() == AclPermissionType.ALLOW) { + if (principalAclOperationsMap.contains(acl.principal())) { + principalAclOperationsMap(acl.principal()) += acl.operation() + } + else { + principalAclOperationsMap += (acl.principal() -> Set(acl.operation())) + } + } + }) + } + }) + + principalAclOperationsMap.toMap } } diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index bc6a5b026a755..d9855ef495253 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -100,7 +100,7 @@ object AclCommand extends Logging { class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging { - private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = { + protected def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = { val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else @@ -171,7 +171,7 @@ object AclCommand extends Logging { } } - private def removeAcls(adminClient: Admin, acls: Set[AccessControlEntry], filter: ResourcePatternFilter): Unit = { + protected def removeAcls(adminClient: Admin, acls: Set[AccessControlEntry], filter: ResourcePatternFilter): Unit = { if (acls.isEmpty) adminClient.deleteAcls(List(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asJava).all().get() else { @@ -180,7 +180,7 @@ object AclCommand extends Logging { } } - private def getAcls(adminClient: Admin, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = { + protected def getAcls(adminClient: Admin, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = { val aclBindings = if (filters.isEmpty) adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList else { @@ -428,7 +428,7 @@ object AclCommand extends Logging { resourceToAcl } - private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = { + def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = { var resourceToAcls = Map.empty[ResourcePatternFilter, Set[AccessControlEntry]] //if none of the --producer or --consumer options are specified , just construct ACLs from CLI options. @@ -530,14 +530,14 @@ object AclCommand extends Logging { Set.empty[String] } - private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = { + def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = { if (opts.options.has(principalOptionSpec)) opts.options.valuesOf(principalOptionSpec).asScala.map(s => JSecurityUtils.parseKafkaPrincipal(s.trim)).toSet else Set.empty[KafkaPrincipal] } - private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = { + def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = { val patternType: PatternType = opts.options.valueOf(opts.resourcePatternType) var resourceFilters = Set.empty[ResourcePatternFilter] @@ -563,7 +563,7 @@ object AclCommand extends Logging { resourceFilters } - private def confirmAction(opts: AclCommandOptions, msg: String): Boolean = { + def confirmAction(opts: AclCommandOptions, msg: String): Boolean = { if (opts.options.has(opts.forceOpt)) return true println(msg)