Skip to content

Commit

Permalink
make topic configs optional (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
wlezzar authored Oct 9, 2020
1 parent 7a508e9 commit bce8aee
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 17 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
![Build](https://github.com/adevinta/zoe/workflows/Build%20test/badge.svg)

# Zoe: The Kafka CLI for humans

Zoe is a command line tool to interact with kafka in an easy and intuitive way. Wanna see this in action ? check out this demo...
Expand Down
2 changes: 1 addition & 1 deletion zoe-cli/src/commands/lambda.kt
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class DeployLambda : CliktCommand(name = "deploy", help = "Deploy zoe core as an
private val environment by inject<EnvConfig>()

private val jarUrl: URL
by option("--jar-url", help = "Url to the zoe jar file", hidden = true, envvar = "ZOE_JAR_URL")
by option("--jar-url", help = "Url to the zoe jar file", envvar = "ZOE_JAR_URL")
.convert { URL(it) }
.defaultLazy {
val baseUrl = "https://github.com/adevinta/zoe/releases/download"
Expand Down
41 changes: 25 additions & 16 deletions zoe-core/src/functions/admin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ package com.adevinta.oss.zoe.core.functions

import com.adevinta.oss.zoe.core.functions.DeploySchemaResponse.ActualRunResponse
import com.adevinta.oss.zoe.core.functions.DeploySchemaResponse.DryRunResponse
import com.adevinta.oss.zoe.core.utils.admin
import com.adevinta.oss.zoe.core.utils.consumer
import com.adevinta.oss.zoe.core.utils.json
import com.adevinta.oss.zoe.core.utils.uuid
import com.adevinta.oss.zoe.core.utils.*
import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonValue
Expand All @@ -22,8 +19,10 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.avro.Schema
import org.apache.avro.compiler.idl.Idl
import org.apache.kafka.clients.admin.Config
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.TopicAuthorizationException
import org.apache.kafka.common.TopicPartition as KafkaTopicPartition

/**
Expand All @@ -34,26 +33,36 @@ val listTopics = zoeFunction<AdminConfig, ListTopicsResponse>(name = "topics") {
cli.listTopics().listings().get()
.map { it.name() }
.let { topics ->
val describeTopicsFuture = cli
.describeTopics(topics).all()

val describeConfigsFuture = cli
.describeConfigs(topics.map { ConfigResource(ConfigResource.Type.TOPIC, it) }).all()

val (kafkaTopicDescriptions, configResources) = describeTopicsFuture.get() to describeConfigsFuture.get()
val describeTopicsFuture =
cli
.describeTopics(topics)
.all()

val configResources: Map<ConfigResource, Config> =
cli
.describeConfigs(topics.map { ConfigResource(ConfigResource.Type.TOPIC, it) })
.all()
.runCatching { get() }
.onFailure {
when (it) {
is TopicAuthorizationException -> logger.warn("not authorized to describe the config!")
else -> logger.warn("unexpected error on config describe request", it)
}
}
.getOrElse { emptyMap() }

kafkaTopicDescriptions.entries.map { (_, topic) ->
describeTopicsFuture.get().entries.map { (_, topic) ->
val configMap = configResources[ConfigResource(ConfigResource.Type.TOPIC, topic.name())]
?.entries()
?.filter { it.value().isNotEmpty() }
?.map { it.name() to it.value() }
?.toMap()?.toSortedMap() ?: sortedMapOf()

TopicDescription(
topic.name(),
topic.isInternal,
topic.partitions().map { it.partition() },
configMap
topic = topic.name(),
internal = topic.isInternal,
partitions = topic.partitions().map { it.partition() },
config = configMap
)
}
}
Expand Down

0 comments on commit bce8aee

Please sign in to comment.