diff --git a/nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaDsl.kt b/nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaDsl.kt index c5b52b2..4c92d74 100644 --- a/nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaDsl.kt +++ b/nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaDsl.kt @@ -36,10 +36,11 @@ class KafkaBuilder(private val imageName: String, private val componentName: Com private val producers = mutableListOf() fun producer(frequency: Duration, topic: String, + partitions: Int = 1, keySerializer: MessageSerializer = MessageSerializer.String, valueSerializer: MessageSerializer = MessageSerializer.String, init: ProducerBuilder.() -> Unit) { - producers.add(ProducerBuilder(frequency, topic, keySerializer.serializerClass, valueSerializer.serializerClass).apply(init).build()) + producers.add(ProducerBuilder(frequency, topic, partitions, keySerializer.serializerClass, valueSerializer.serializerClass).apply(init).build()) } fun build(): KafkaConfig = KafkaConfig(imageName, producers, componentName) @@ -49,6 +50,7 @@ class KafkaBuilder(private val imageName: String, private val componentName: Com class ProducerBuilder( private val frequency: Duration, private val topic: String, + private val partitions: Int, private val keySerializer: Class>, private val valueSerializer: Class> ) { @@ -153,7 +155,7 @@ class ProducerBuilder( } fun build(): ProducerConfig { - return ProducerConfig(frequency, topic, keySerializer, valueSerializer, messageGenerator) + return ProducerConfig(frequency, topic, partitions, keySerializer, valueSerializer, messageGenerator) } } @@ -165,7 +167,9 @@ data class KafkaConfig( ) data class ProducerConfig( - val frequency: Duration, val topic: String, + val frequency: Duration, + val topic: String, + val partitions: Int, val keySerializer: Class>, val valueSerializer: Class>, val messageGenerator: () -> Any diff --git a/nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaExecutor.kt b/nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaExecutor.kt index 199fc5b..d35d8d0 100644 --- a/nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaExecutor.kt +++ b/nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/kafka/KafkaExecutor.kt @@ -14,16 +14,21 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.Producer import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.errors.TopicExistsException import org.testcontainers.containers.KafkaContainer import org.testcontainers.utility.DockerImageName import reactor.core.publisher.Flux import java.util.* +import java.util.concurrent.TimeUnit val StackRunner.kafka: List get() { @@ -60,6 +65,9 @@ class KafkaExecutor(private val config: KafkaConfig) : InfrastructureComponent val producer = createKafkaProducer(bootstrapServers, producerConfig) producers.add(producer) @@ -131,6 +139,33 @@ class KafkaExecutor(private val config: KafkaConfig) : InfrastructureComponent) { + val adminProps = Properties().apply { + put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + } + + AdminClient.create(adminProps).use { adminClient -> + // Group topics by name to handle duplicates and use max partitions + val topicConfigs = producers.groupBy { it.topic } + .mapValues { (_, configs) -> configs.maxOf { it.partitions } } + .map { (topicName, partitions) -> + NewTopic(topicName, partitions, 1.toShort()) // replication factor of 1 for single broker + } + + if (topicConfigs.isNotEmpty()) { + try { + val result = adminClient.createTopics(topicConfigs) + result.all().get(30, TimeUnit.SECONDS) // Wait for completion with timeout + logger.info { "Created topics: ${topicConfigs.map { "${it.name()}(${it.numPartitions()} partitions)" }}" } + } catch (e: TopicExistsException) { + logger.warn(e) { "Some topics already exist: ${e.message}" } + } catch (e: Exception) { + logger.error(e) { "Failed to create topics due to unexpected error: ${e.message}" } + } + } + } + } } data class KafkaContainerConfig(val bootstrapServers: String)