diff --git a/pom.xml b/pom.xml index 65bf1ef..0bda0cf 100644 --- a/pom.xml +++ b/pom.xml @@ -37,7 +37,7 @@ - 1.5.31 + 2.3.10 1.2.2 UTF-8 org.xyro.kumulus.MainKt @@ -50,11 +50,6 @@ ${kotlin.version} provided - - org.jetbrains.kotlin - kotlin-stdlib-common - ${kotlin.version} - org.jetbrains.kotlin kotlin-reflect @@ -68,15 +63,15 @@ compile - io.github.microutils - kotlin-logging - 1.4.6 + io.github.oshai + kotlin-logging-jvm + 7.0.3 org.junit.jupiter junit-jupiter-api - 5.7.0 + 6.0.2 test @@ -88,7 +83,7 @@ org.hdrhistogram HdrHistogram - 2.1.9 + 2.2.2 test @@ -132,7 +127,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.7.0 + 3.15.0 @@ -163,7 +158,7 @@ org.apache.maven.plugins maven-jar-plugin - 3.1.0 + 3.5.0 @@ -176,7 +171,7 @@ org.apache.maven.plugins maven-source-plugin - 2.2.1 + 3.4.0 attach-sources @@ -189,7 +184,7 @@ org.jetbrains.dokka dokka-maven-plugin - 2.0.0 + 2.1.0 pre-site @@ -202,7 +197,7 @@ org.apache.maven.plugins maven-gpg-plugin - 1.5 + 3.2.8 sign-artifacts @@ -219,7 +214,7 @@ com.github.gantsign.maven ktlint-maven-plugin - 1.7.0 + 3.5.0 diff --git a/src/main/kotlin/org/xyro/kumulus/ExecutionPool.kt b/src/main/kotlin/org/xyro/kumulus/ExecutionPool.kt index 6b08b19..7df4087 100644 --- a/src/main/kotlin/org/xyro/kumulus/ExecutionPool.kt +++ b/src/main/kotlin/org/xyro/kumulus/ExecutionPool.kt @@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger class ExecutionPool( size: Int, - private val threadFun: (KumulusMessage) -> Unit + private val threadFun: (KumulusMessage) -> Unit, ) { // uncapped, memory for in-flight tuples should be taken into account and factored into max-spout-pending private val mainQueue = LinkedBlockingQueue() diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusAcker.kt b/src/main/kotlin/org/xyro/kumulus/KumulusAcker.kt index 8a04638..140c961 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusAcker.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusAcker.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.shade.org.eclipse.jetty.util.ConcurrentHashSet import org.apache.storm.tuple.Tuple import org.xyro.kumulus.component.KumulusComponent @@ -16,7 +16,7 @@ class KumulusAcker( private val maxSpoutPending: Long, private val allowExtraAcking: Boolean, private val messageTimeoutMillis: Long, - private val spoutAvailabilityCheckTimeout: Long + private val spoutAvailabilityCheckTimeout: Long, ) { companion object { private val logger = KotlinLogging.logger {} @@ -34,7 +34,10 @@ class KumulusAcker( } } - fun startTree(component: KumulusSpout, messageId: Any?) { + fun startTree( + component: KumulusSpout, + messageId: Any?, + ) { logger.debug { "startTree() -> component: $component, messageId: $messageId" } if (messageId == null) { notifySpout(component, messageId, listOf()) @@ -71,20 +74,25 @@ class KumulusAcker( messageState.spout, messageId, removedState.pendingTasks.map { it.key }, - removedState.failedTasks.toList() + removedState.failedTasks.toList(), ) decrementPending() } } }, - messageTimeoutMillis, TimeUnit.MILLISECONDS + messageTimeoutMillis, + TimeUnit.MILLISECONDS, ) } } } } - fun expandTrees(component: KumulusComponent, dest: Int, tuple: KumulusTuple) { + fun expandTrees( + component: KumulusComponent, + dest: Int, + tuple: KumulusTuple, + ) { logger.debug { "expandTrees() -> component: $component, dest: $dest, tuple: $tuple" } (tuple.kTuple as TupleImpl).spoutMessageId?.let { messageId -> if (allowExtraAcking && state[messageId] == null) { @@ -97,7 +105,10 @@ class KumulusAcker( } } - fun fail(component: KumulusComponent, input: Tuple?) { + fun fail( + component: KumulusComponent, + input: Tuple?, + ) { logger.debug { "fail() -> component: $component, input: $input" } (input as TupleImpl).spoutMessageId?.let { messageId -> val messageState = state[messageId] @@ -112,15 +123,19 @@ class KumulusAcker( } } - fun ack(component: KumulusComponent, input: Tuple?) { + fun ack( + component: KumulusComponent, + input: Tuple?, + ) { logger.debug { "ack() -> component: $component, input: $input" } (input as TupleImpl).spoutMessageId?.let { messageId -> val messageState = state[messageId] if (allowExtraAcking && state[messageId] == null) { return } - if (messageState == null) + if (messageState == null) { error("State missing for messageId $messageId while acking tuple in $component. Tuple: $input") + } checkComplete(messageState, component, input) } } @@ -144,11 +159,13 @@ class KumulusAcker( } } - fun getPendingCount(): Long { - return this.currentPending.get() - } + fun getPendingCount(): Long = this.currentPending.get() - private fun checkComplete(messageState: MessageState, component: KumulusComponent, input: Tuple?) { + private fun checkComplete( + messageState: MessageState, + component: KumulusComponent, + input: Tuple?, + ) { (input as TupleImpl).spoutMessageId?.let { spoutMessageId -> val removedTask = messageState.pendingTasks.remove(component.taskId) if (removedTask == null) { @@ -174,24 +191,33 @@ class KumulusAcker( } } - private fun debugMessage(component: KumulusComponent, spoutMessageId: Any, messageState: MessageState) { + private fun debugMessage( + component: KumulusComponent, + spoutMessageId: Any, + messageState: MessageState, + ) { logger.debug { "Pending task from $component for message $spoutMessageId was completed. " + - "Current pending tuples are:" + messageState.pendingTasks.let { - if (it.isEmpty()) { - " Empty\n" - } else { - val sb = StringBuilder("\n") - it.forEach { (k, v) -> - sb.append("$k: $v\n") + "Current pending tuples are:" + + messageState.pendingTasks.let { + if (it.isEmpty()) { + " Empty\n" + } else { + val sb = StringBuilder("\n") + it.forEach { (k, v) -> + sb.append("$k: $v\n") + } + sb.toString() } - sb.toString() } - } } } - private fun notifySpout(spout: KumulusSpout, spoutMessageId: Any?, failedTasks: List) { + private fun notifySpout( + spout: KumulusSpout, + spoutMessageId: Any?, + failedTasks: List, + ) { this.notifySpout(spout, spoutMessageId, listOf(), failedTasks) } @@ -199,7 +225,7 @@ class KumulusAcker( spout: KumulusSpout, spoutMessageId: Any?, timeoutTasks: List, - failedTasks: List + failedTasks: List, ) { emitter.completeMessageProcessing(spout, spoutMessageId, timeoutTasks, failedTasks) } @@ -209,7 +235,9 @@ class KumulusAcker( synchronized(waitObject) { val currentPending = currentPending.decrementAndGet() if (currentPending >= maxSpoutPending) { - logger.error { "Max spout pending must have exceeded limit of $maxSpoutPending, current after decrement is $currentPending" } + logger.error { + "Max spout pending must have exceeded limit of $maxSpoutPending, current after decrement is $currentPending" + } assert(false) { "Max spout pending must have exceeded limit of $maxSpoutPending, current after decrement is $currentPending" } @@ -224,7 +252,7 @@ class KumulusAcker( } inner class MessageState( - val spout: KumulusSpout + val spout: KumulusSpout, ) { val pendingTasks = ConcurrentHashMap() val failedTasks = ConcurrentHashSet() diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusEmitter.kt b/src/main/kotlin/org/xyro/kumulus/KumulusEmitter.kt index 347fe91..9ecacfb 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusEmitter.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusEmitter.kt @@ -5,12 +5,18 @@ import org.xyro.kumulus.component.KumulusSpout interface KumulusEmitter { fun getDestinations(tasks: List): List - fun execute(destComponent: KumulusComponent, kumulusTuple: KumulusTuple) + + fun execute( + destComponent: KumulusComponent, + kumulusTuple: KumulusTuple, + ) + fun completeMessageProcessing( spout: KumulusSpout, spoutMessageId: Any?, timeoutTasks: List, - failedTasks: List + failedTasks: List, ) + fun throwException(t: Throwable) } diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusStormTransformer.kt b/src/main/kotlin/org/xyro/kumulus/KumulusStormTransformer.kt index 6223b3e..2b68d40 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusStormTransformer.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusStormTransformer.kt @@ -38,27 +38,35 @@ class KumulusStormTransformer { */ @Suppress("UNCHECKED_CAST") @JvmStatic - fun initializeTopology(topology: StormTopology, rawConfig: MutableMap, stormId: String): KumulusTopology { + fun initializeTopology( + topology: StormTopology, + rawConfig: MutableMap, + stormId: String, + ): KumulusTopology { val serializedBoltsMap = readBolts(topology) val serializedSpoutsMap = readSpouts(topology) - val boltsMap = serializedBoltsMap.entries.associate { (id, bolt) -> - val boltObject = bolt._bolt_object - ?: throw IllegalArgumentException("Bolt '$id' has no serialized bolt object") - id to ( - Utils.javaDeserialize(boltObject._serialized_java, Serializable::class.java) as? IRichBolt - ?: throw IllegalArgumentException("Bolt '$id' is not an IRichBolt") + val boltsMap = + serializedBoltsMap.entries.associate { (id, bolt) -> + val boltObject = + bolt._bolt_object + ?: throw IllegalArgumentException("Bolt '$id' has no serialized bolt object") + id to ( + Utils.javaDeserialize(boltObject._serialized_java, Serializable::class.java) as? IRichBolt + ?: throw IllegalArgumentException("Bolt '$id' is not an IRichBolt") ) - } + } - val spoutsMap = serializedSpoutsMap.entries.associate { (id, spout) -> - val spoutObject = spout._spout_object - ?: throw IllegalArgumentException("Spout '$id' has no serialized spout object") - id to ( - Utils.javaDeserialize(spoutObject._serialized_java, Serializable::class.java) as? IRichSpout - ?: throw IllegalArgumentException("Spout '$id' is not an IRichSpout") + val spoutsMap = + serializedSpoutsMap.entries.associate { (id, spout) -> + val spoutObject = + spout._spout_object + ?: throw IllegalArgumentException("Spout '$id' has no serialized spout object") + id to ( + Utils.javaDeserialize(spoutObject._serialized_java, Serializable::class.java) as? IRichSpout + ?: throw IllegalArgumentException("Spout '$id' is not an IRichSpout") ) - } + } val componentCommonMap = serializedSpoutsMap.mapValues { (_, spec) -> spec._common!! } + @@ -77,7 +85,7 @@ class KumulusStormTransformer { fun initializeTopology( topology: KumulusTopologyDefinition, rawConfig: MutableMap, - stormId: String + stormId: String, ): KumulusTopology { val spoutsMap = topology.spouts.mapValues { (_, declaration) -> declaration.spout } val boltsMap = topology.bolts.mapValues { (_, declaration) -> declaration.bolt } @@ -96,7 +104,7 @@ class KumulusStormTransformer { boltsMap: Map, componentCommons: Map, rawConfig: MutableMap, - stormId: String + stormId: String, ): KumulusTopology { val taskToComponent = mutableMapOf() @@ -132,21 +140,23 @@ class KumulusStormTransformer { for (i in 1..parallelism) { taskToComponent[id] = name - val tasks: MutableList = if (!componentToSortedTasks.containsKey(name)) { - componentToSortedTasks[name] = mutableListOf() - componentToSortedTasks[name] as MutableList - } else { - componentToSortedTasks[name] as MutableList - } + val tasks: MutableList = + if (!componentToSortedTasks.containsKey(name)) { + componentToSortedTasks[name] = mutableListOf() + componentToSortedTasks[name] as MutableList + } else { + componentToSortedTasks[name] as MutableList + } tasks.add(id) - val streamToFields: MutableMap = if (componentToStreamToFields.containsKey(name)) { - componentToStreamToFields[name] as MutableMap - } else { - componentToStreamToFields[name] = mutableMapOf() - componentToStreamToFields[name] as MutableMap - } + val streamToFields: MutableMap = + if (componentToStreamToFields.containsKey(name)) { + componentToStreamToFields[name] as MutableMap + } else { + componentToStreamToFields[name] = mutableMapOf() + componentToStreamToFields[name] as MutableMap + } for (stream in componentCommon._streams.keys) { val streamInfo = componentCommon._streams[stream] @@ -158,9 +168,10 @@ class KumulusStormTransformer { } } - val config = rawConfig.mapValues { it -> - return@mapValues (it.value as? Int)?.toLong() ?: it.value - } + val config = + rawConfig.mapValues { it -> + return@mapValues (it.value as? Int)?.toLong() ?: it.value + } componentToSortedTasks.forEach { componentId: String, taskIds: List -> val componentCommon = componentCommons[componentId] @@ -168,72 +179,80 @@ class KumulusStormTransformer { taskIds.forEach { taskId -> val componentInstance = if (taskId == Constants.SYSTEM_TASK_ID.toInt()) { - BasicBoltExecutor(object : BaseBasicBolt() { - override fun execute(input: Tuple?, collector: BasicOutputCollector?) {} - override fun declareOutputFields(declarer: OutputFieldsDeclarer?) { - // Declared hard-coded - } - }) + BasicBoltExecutor( + object : BaseBasicBolt() { + override fun execute( + input: Tuple?, + collector: BasicOutputCollector?, + ) {} + + override fun declareOutputFields(declarer: OutputFieldsDeclarer?) { + // Declared hard-coded + } + }, + ) } else if (spoutsMap.containsKey(componentId)) { val component = spoutsMap[componentId]!! Utils.javaDeserialize( Utils.javaSerialize(component as Serializable), - Serializable::class.java + Serializable::class.java, ) } else if (boltsMap.containsKey(componentId)) { val component = boltsMap[componentId]!! Utils.javaDeserialize( Utils.javaSerialize(component as Serializable), - Serializable::class.java + Serializable::class.java, ) } else { throw Exception( - "Component name '$componentId' was not found in underlying topology object" + "Component name '$componentId' was not found in underlying topology object", ) } - val context = TopologyContext( - topology, - config, - taskToComponent, - componentToSortedTasks, - componentToStreamToFields, - stormId, - codeDir, - pidDir, - taskId, - workerPort, - workerTasks, - defaultResources, - userResources, - executorData, - registeredMetrics, - Atom(Object()) - ) - - kComponents += when (componentInstance) { - is IRichBolt -> { - val kumulusBolt = KumulusBolt(config, context, componentInstance, componentCommon) - kumulusBolt.apply { - val boltConfig = - componentInstance.componentConfiguration ?: mapOf() - boltConfig[Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS]?.let { secs -> - if (secs !is Number) { - throw IllegalArgumentException( - "Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS must be a number. Got: $secs" - ) + val context = + TopologyContext( + topology, + config, + taskToComponent, + componentToSortedTasks, + componentToStreamToFields, + stormId, + codeDir, + pidDir, + taskId, + workerPort, + workerTasks, + defaultResources, + userResources, + executorData, + registeredMetrics, + Atom(Object()), + ) + + kComponents += + when (componentInstance) { + is IRichBolt -> { + val kumulusBolt = KumulusBolt(config, context, componentInstance, componentCommon) + kumulusBolt.apply { + val boltConfig = + componentInstance.componentConfiguration ?: mapOf() + boltConfig[Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS]?.let { secs -> + if (secs !is Number) { + throw IllegalArgumentException( + "Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS must be a number. Got: $secs", + ) + } + this.tickSecs = secs } - this.tickSecs = secs } } + is IRichSpout -> KumulusSpout(config, context, componentInstance) + else -> + throw Throwable( + "Component of type ${componentInstance::class.qualifiedName} " + + "is not acceptable by Kumulus", + ) } - is IRichSpout -> KumulusSpout(config, context, componentInstance) - else -> - throw Throwable( - "Component of type ${componentInstance::class.qualifiedName} " + - "is not acceptable by Kumulus" - ) - } } } @@ -259,19 +278,23 @@ class KumulusStormTransformer { } private fun buildStormTopology(topology: KumulusTopologyDefinition): StormTopology { - val spouts = topology.spouts.mapValues { (_, declaration) -> - SpoutSpec().apply { - _spout_object = serializeComponentObject(declaration.spout as Serializable) - _common = declaration.common.deepCopy() - } - }.toMutableMap() - - val bolts = topology.bolts.mapValues { (_, declaration) -> - Bolt().apply { - _bolt_object = serializeComponentObject(declaration.bolt as Serializable) - _common = declaration.common.deepCopy() - } - }.toMutableMap() + val spouts = + topology.spouts + .mapValues { (_, declaration) -> + SpoutSpec().apply { + _spout_object = serializeComponentObject(declaration.spout as Serializable) + _common = declaration.common.deepCopy() + } + }.toMutableMap() + + val bolts = + topology.bolts + .mapValues { (_, declaration) -> + Bolt().apply { + _bolt_object = serializeComponentObject(declaration.bolt as Serializable) + _common = declaration.common.deepCopy() + } + }.toMutableMap() return StormTopology().apply { set_spouts(spouts) @@ -280,28 +303,28 @@ class KumulusStormTransformer { } } - private fun serializeComponentObject(component: Serializable): ComponentObject { - return ComponentObject().apply { + private fun serializeComponentObject(component: Serializable): ComponentObject = + ComponentObject().apply { set_serialized_java(Utils.javaSerialize(component)) } - } private fun validateTopology(components: MutableList) { components.forEach { src -> (src as? KumulusBolt)?.apply { inputs.forEach { gid, grouping -> - val input = components.find { it.componentId == gid._componentId } - ?: throw KumulusTopologyValidationException( - "Component '$componentId' is connected to non-existent component " + - "'${gid._componentId}'" - ) + val input = + components.find { it.componentId == gid._componentId } + ?: throw KumulusTopologyValidationException( + "Component '$componentId' is connected to non-existent component " + + "'${gid._componentId}'", + ) when (input) { is KumulusBolt -> { if (!input.streams.containsKey(gid._streamId)) { throw KumulusTopologyValidationException( "Component '$componentId' is connected to non-existent stream " + - "'${gid._streamId}' of component '${gid._componentId}'" + "'${gid._streamId}' of component '${gid._componentId}'", ) } if (grouping.is_set_fields) { @@ -309,7 +332,7 @@ class KumulusStormTransformer { if (!grouping._fields.all { declaredFields.contains(it) }) { throw KumulusTopologyValidationException( "Component '$componentId' is connected to stream '${gid._streamId}' of component " + - "'${gid._componentId}' grouped by non existing fields ${grouping._fields}" + "'${gid._componentId}' grouped by non existing fields ${grouping._fields}", ) } } @@ -323,5 +346,7 @@ class KumulusStormTransformer { } } - class KumulusTopologyValidationException(msg: String) : Exception(msg) + class KumulusTopologyValidationException( + msg: String, + ) : Exception(msg) } diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt index f14871e..8c09ed3 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.Config import org.apache.storm.Constants import org.apache.storm.tuple.Tuple @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicReference @Suppress("MemberVisibilityCanBePrivate") class KumulusTopology( private val components: List, - config: Map + config: Map, ) : KumulusEmitter { private val maxSpoutPending: Long = config[Config.TOPOLOGY_MAX_SPOUT_PENDING] as Long? ?: 0L private val poolSize = (config[CONF_THREAD_POOL_CORE_SIZE] as? Long ?: 1L).toInt() @@ -38,9 +38,10 @@ class KumulusTopology( private val shutDownHook = CountDownLatch(1) private val crashException = AtomicReference() private val shutdownTimeoutSecs = config[CONF_SHUTDOWN_TIMEOUT_SECS] as? Long ?: 10L - private val taskIdToComponent: Map = this.components - .map { Pair(it.taskId, it) } - .toMap() + private val taskIdToComponent: Map = + this.components + .map { Pair(it.taskId, it) } + .toMap() private val atomicThreadsInUse = AtomicInteger(0) private val atomicMaxThreadsInUse = AtomicInteger(0) @@ -51,9 +52,10 @@ class KumulusTopology( private val scheduledExecutorPoolSize: Int = (config[CONF_SCHEDULED_EXECUTOR_THREAD_POOL_SIZE] as? Long ?: 5L).toInt() - private val rejectedExecutionHandler = RejectedExecutionHandler { _, _ -> - logger.error { "Execution was rejected, current pool size: $scheduledExecutorPoolSize" } - } + private val rejectedExecutionHandler = + RejectedExecutionHandler { _, _ -> + logger.error { "Execution was rejected, current pool size: $scheduledExecutorPoolSize" } + } private val scheduledExecutor = ScheduledThreadPoolExecutor(scheduledExecutorPoolSize, rejectedExecutionHandler) internal val acker: KumulusAcker @@ -80,13 +82,14 @@ class KumulusTopology( init { logger.info { "Initializing a Kumulus topology" } logger.info { "Kumulus topology configuration: $config" } - this.acker = KumulusAcker( - this, - maxSpoutPending, - config[CONF_EXTRA_ACKING] as? Boolean ?: false, - (config[Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS] as? Long)?.times(1000) ?: 0L, - config[CONF_SPOUT_AVAILABILITY_PASS_TIMEOUT] as? Long ?: 50L - ) + this.acker = + KumulusAcker( + this, + maxSpoutPending, + config[CONF_EXTRA_ACKING] as? Boolean ?: false, + (config[Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS] as? Long)?.times(1000) ?: 0L, + config[CONF_SPOUT_AVAILABILITY_PASS_TIMEOUT] as? Long ?: 50L, + ) } companion object { @@ -113,7 +116,10 @@ class KumulusTopology( * @param unit timeout duration unit */ @Throws(TimeoutException::class) - fun prepare(time: Long, unit: TimeUnit) { + fun prepare( + time: Long, + unit: TimeUnit, + ) { val start = System.currentTimeMillis() this.prepare() @@ -143,16 +149,18 @@ class KumulusTopology( when (component) { is KumulusSpout -> SpoutPrepareMessage( - component, KumulusSpoutCollector(component, this, acker, onReportErrorHook) + component, + KumulusSpoutCollector(component, this, acker, onReportErrorHook), ) is KumulusBolt -> { BoltPrepareMessage( - component, KumulusBoltCollector(component, this, acker, onReportErrorHook) + component, + KumulusBoltCollector(component, this, acker, onReportErrorHook), ) } else -> throw UnsupportedOperationException() - } + }, ) if (component is KumulusBolt) { @@ -161,19 +169,22 @@ class KumulusTopology( { if (started.get()) { try { - val tuple = KumulusTuple( - systemComponent, - Constants.SYSTEM_TICK_STREAM_ID, - listOf(), - null - ) + val tuple = + KumulusTuple( + systemComponent, + Constants.SYSTEM_TICK_STREAM_ID, + listOf(), + null, + ) boltExecutionPool.enqueue(ExecuteMessage(component, tuple)) } catch (e: Exception) { logger.error(e) { "Error in sending tick tuple" } } } }, - tickSecs, tickSecs, TimeUnit.SECONDS + tickSecs, + tickSecs, + TimeUnit.SECONDS, ) } } @@ -186,10 +197,12 @@ class KumulusTopology( */ fun start(block: Boolean = false) { this.resetMetrics() - val spouts = components.asSequence() - .filter { it is KumulusSpout } - .map { it as KumulusSpout } - .toList() + val spouts = + components + .asSequence() + .filter { it is KumulusSpout } + .map { it as KumulusSpout } + .toList() spouts.forEach { spout -> spout.start(this) @@ -219,12 +232,13 @@ class KumulusTopology( } // KumulusEmitter impl - override fun getDestinations(tasks: List): List { - return tasks.map { taskIdToComponent[it]!! } - } + override fun getDestinations(tasks: List): List = tasks.map { taskIdToComponent[it]!! } // KumulusEmitter impl - override fun execute(destComponent: KumulusComponent, kumulusTuple: KumulusTuple) { + override fun execute( + destComponent: KumulusComponent, + kumulusTuple: KumulusTuple, + ) { boltExecutionPool.enqueue(ExecuteMessage(destComponent, kumulusTuple)) } @@ -233,7 +247,7 @@ class KumulusTopology( spout: KumulusSpout, spoutMessageId: Any?, timeoutTasks: List, - failedTasks: List + failedTasks: List, ) { spout.queue.add( AckMessage( @@ -241,8 +255,8 @@ class KumulusTopology( spoutMessageId, timeoutTasks.isEmpty() && failedTasks.isEmpty(), timeoutTasks.map { this.taskIdToComponent[it]!!.componentId }, - failedTasks.map { this.taskIdToComponent[it]!!.componentId } - ) + failedTasks.map { this.taskIdToComponent[it]!!.componentId }, + ), ) } @@ -271,7 +285,7 @@ class KumulusTopology( is KumulusSpout -> c.prepare(message.collector as KumulusSpoutCollector) is KumulusBolt -> c.prepare(message.collector as KumulusBoltCollector) else -> throw UnsupportedOperationException( - "Class ${c.javaClass.canonicalName} is not a valid Kumulus component" + "Class ${c.javaClass.canonicalName} is not a valid Kumulus component", ) } } finally { @@ -317,7 +331,7 @@ class KumulusTopology( message.component.componentId, message.component.taskId, delay, - message.tuple.kTuple + message.tuple.kTuple, ) } catch (e: Exception) { logger.error("An exception was thrown from busy-hook callback, ignoring", e) @@ -346,7 +360,8 @@ class KumulusTopology( { boltExecutionPool.enqueue(message) }, - queuePushbackWait, TimeUnit.MILLISECONDS + queuePushbackWait, + TimeUnit.MILLISECONDS, ) } } @@ -377,7 +392,10 @@ class KumulusTopology( } } - private fun callBusyHook(bolt: KumulusBolt, message: ExecuteMessage) { + private fun callBusyHook( + bolt: KumulusBolt, + message: ExecuteMessage, + ) { onBusyBoltHook?.let { onBusyBoltHook -> val waitNanos = bolt.waitStart.getAndSet(0) if (waitNanos > 0) { @@ -388,7 +406,7 @@ class KumulusTopology( bolt.componentId, bolt.taskId, System.nanoTime() - waitNanos, - message.tuple.kTuple + message.tuple.kTuple, ) } catch (e: Exception) { logger.error("An exception was thrown from busy-hook callback, ignoring", e) @@ -401,8 +419,10 @@ class KumulusTopology( } } - class KumulusTopologyCrashedException(exception: Throwable?) : RuntimeException( - "Kumulus topology had crashed due to an uncaught exception", - exception - ) + class KumulusTopologyCrashedException( + exception: Throwable?, + ) : RuntimeException( + "Kumulus topology had crashed due to an uncaught exception", + exception, + ) } diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusTuple.kt b/src/main/kotlin/org/xyro/kumulus/KumulusTuple.kt index d52546d..6ad61af 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusTuple.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusTuple.kt @@ -9,26 +9,26 @@ class KumulusTuple( component: KumulusComponent, streamId: String, tuple: List, - messageId: Any? + messageId: Any?, ) { private val spoutMessageId = messageId - val kTuple: Tuple = TupleImpl( - component.context, - tuple, - component.taskId, - streamId, - KumulusMessageId(), - spoutMessageId - ) + val kTuple: Tuple = + TupleImpl( + component.context, + tuple, + component.taskId, + streamId, + KumulusMessageId(), + spoutMessageId, + ) - override fun toString(): String { - return "KumulusTuple: " + + override fun toString(): String = + "KumulusTuple: " + "MsgID ${(kTuple as TupleImpl).spoutMessageId}, " + "Source: ${kTuple.sourceComponent}, " + "Source Stream: ${kTuple.sourceStreamId}, " + "Tuple: ${kTuple.values}" - } class KumulusMessageId : MessageId(HashMap()) } diff --git a/src/main/kotlin/org/xyro/kumulus/collector/KumulusBoltCollector.kt b/src/main/kotlin/org/xyro/kumulus/collector/KumulusBoltCollector.kt index 9e02a2a..50c01d3 100644 --- a/src/main/kotlin/org/xyro/kumulus/collector/KumulusBoltCollector.kt +++ b/src/main/kotlin/org/xyro/kumulus/collector/KumulusBoltCollector.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus.collector -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.task.IOutputCollector import org.apache.storm.tuple.Tuple import org.xyro.kumulus.KumulusAcker @@ -12,19 +12,24 @@ class KumulusBoltCollector( component: KumulusComponent, private val emitter: KumulusEmitter, acker: KumulusAcker, - errorHandler: ((String, Int, Throwable) -> Unit)? + errorHandler: ((String, Int, Throwable) -> Unit)?, ) : KumulusCollector( - component, - emitter, - acker, - errorHandler -), + component, + emitter, + acker, + errorHandler, + ), IOutputCollector { companion object { val logger = KotlinLogging.logger { } } - override fun emitDirect(taskId: Int, streamId: String?, anchors: MutableCollection?, tuple: MutableList?) { + override fun emitDirect( + taskId: Int, + streamId: String?, + anchors: MutableCollection?, + tuple: MutableList?, + ) { TODO("not implemented") } diff --git a/src/main/kotlin/org/xyro/kumulus/collector/KumulusCollector.kt b/src/main/kotlin/org/xyro/kumulus/collector/KumulusCollector.kt index cc77e2b..fe9cf07 100644 --- a/src/main/kotlin/org/xyro/kumulus/collector/KumulusCollector.kt +++ b/src/main/kotlin/org/xyro/kumulus/collector/KumulusCollector.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus.collector -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.grouping.CustomStreamGrouping import org.apache.storm.tuple.Tuple import org.apache.storm.utils.Utils @@ -15,7 +15,7 @@ abstract class KumulusCollector( protected val component: KumulusComponent, private val emitter: KumulusEmitter, protected val acker: KumulusAcker, - private val errorHandler: ((String, Int, Throwable) -> Unit)? = null + private val errorHandler: ((String, Int, Throwable) -> Unit)? = null, ) { companion object { private val logger = KotlinLogging.logger {} @@ -23,21 +23,22 @@ abstract class KumulusCollector( // Impl. org.apache.storm.task.IOutputCollector fun reportError(error: Throwable?) { - val reportError = error - ?: Exception("reportError was called with null error. An error in component might be shadowed") + val reportError = + error + ?: Exception("reportError was called with null error. An error in component might be shadowed") errorHandler?.let { it(this.component.componentId, this.component.taskId, reportError) } ?: logger.error( "An error was reported from bolt " + "${component.componentId}/${component.taskId}", - reportError + reportError, ) } private fun componentEmit( streamId: String?, tuple: MutableList, - messageId: Any? + messageId: Any?, ): MutableList { val ret = mutableListOf() @@ -50,11 +51,13 @@ abstract class KumulusCollector( val emitToInstance = emitter.getDestinations(tasks) // First, expand all trees - executes += emitToInstance.map { destComponent -> - val kumulusTuple = KumulusTuple(component, streamId ?: Utils.DEFAULT_STREAM_ID, tuple, messageId) - acker.expandTrees(component, destComponent.taskId, kumulusTuple) - destComponent to kumulusTuple - }.toList() + executes += + emitToInstance + .map { destComponent -> + val kumulusTuple = KumulusTuple(component, streamId ?: Utils.DEFAULT_STREAM_ID, tuple, messageId) + acker.expandTrees(component, destComponent.taskId, kumulusTuple) + destComponent to kumulusTuple + }.toList() logger.trace { "Finished emitting from bolt $component" } @@ -71,21 +74,29 @@ abstract class KumulusCollector( return ret } - fun emit(streamId: String?, anchors: MutableCollection?, tuple: MutableList): MutableList { - val messageId = anchors - ?.map { (it as TupleImpl).spoutMessageId } - ?.toSet() - ?.filter { it != null } - ?.apply { - if (this.size > 1) { - logger.debug { "Found more than a single message ID in emitted anchors: $anchors" } - } - } - ?.firstOrNull() + fun emit( + streamId: String?, + anchors: MutableCollection?, + tuple: MutableList, + ): MutableList { + val messageId = + anchors + ?.map { (it as TupleImpl).spoutMessageId } + ?.toSet() + ?.filter { it != null } + ?.apply { + if (this.size > 1) { + logger.debug { "Found more than a single message ID in emitted anchors: $anchors" } + } + }?.firstOrNull() return componentEmit(streamId, tuple, messageId) } - fun emit(streamId: String?, tuple: MutableList, messageId: Any?): MutableList { + fun emit( + streamId: String?, + tuple: MutableList, + messageId: Any?, + ): MutableList { if (component !is KumulusSpout) { throw RuntimeException("Bolts wrong emit method called for ${component.componentId}/${component.taskId}") } diff --git a/src/main/kotlin/org/xyro/kumulus/collector/KumulusSpoutCollector.kt b/src/main/kotlin/org/xyro/kumulus/collector/KumulusSpoutCollector.kt index 2e149f0..2d61630 100644 --- a/src/main/kotlin/org/xyro/kumulus/collector/KumulusSpoutCollector.kt +++ b/src/main/kotlin/org/xyro/kumulus/collector/KumulusSpoutCollector.kt @@ -10,19 +10,22 @@ class KumulusSpoutCollector( component: KumulusComponent, emitter: KumulusEmitter, acker: KumulusAcker, - errorHandler: ((String, Int, Throwable) -> Unit)? + errorHandler: ((String, Int, Throwable) -> Unit)?, ) : KumulusCollector( - component, - emitter, - acker, - errorHandler -), + component, + emitter, + acker, + errorHandler, + ), ISpoutOutputCollector { - override fun emitDirect(taskId: Int, streamId: String?, tuple: MutableList?, messageId: Any?) { + override fun emitDirect( + taskId: Int, + streamId: String?, + tuple: MutableList?, + messageId: Any?, + ) { TODO("not implemented") // To change body of created functions use File | Settings | File Templates. } - override fun getPendingCount(): Long { - return acker.getPendingCount() - } + override fun getPendingCount(): Long = acker.getPendingCount() } diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt index 733f9fb..e0cb27f 100644 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusBolt.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus.component -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.generated.ComponentCommon import org.apache.storm.generated.GlobalStreamId import org.apache.storm.generated.Grouping @@ -14,7 +14,7 @@ class KumulusBolt( config: Map, context: TopologyContext, componentInstance: IRichBolt, - common: ComponentCommon? + common: ComponentCommon?, ) : KumulusComponent(config, context) { companion object { private val logger = KotlinLogging.logger {} @@ -28,7 +28,9 @@ class KumulusBolt( private val bolt: IRichBolt = componentInstance fun prepare(collector: KumulusBoltCollector) { - logger.info { "Created bolt '$componentId' with taskId $taskId (index: ${context.thisTaskIndex}). Object hashcode: ${this.hashCode()}" } + logger.info { + "Created bolt '$componentId' with taskId $taskId (index: ${context.thisTaskIndex}). Object hashcode: ${this.hashCode()}" + } bolt.prepare(config, context, OutputCollector(collector)) super.prepare() } diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt index b367ed4..fa28704 100644 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusComponent.kt @@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicLong abstract class KumulusComponent( protected val config: Map, - val context: TopologyContext + val context: TopologyContext, ) { val inUse = AtomicBoolean(false) val isReady = AtomicBoolean(false) @@ -37,53 +37,62 @@ abstract class KumulusComponent( val groupingStateMap: MutableMap> = mutableMapOf() context.thisTargets.forEach { stream, groupings -> groupings.forEach { component, grouping -> - val kGrouping = if (grouping.is_set_all) { - AllGrouping() - } else if (grouping.is_set_none || grouping.is_set_shuffle || grouping.is_set_local_or_shuffle) { - ShuffleGrouping() - } else if (grouping.is_set_fields) { - FieldsGrouping(grouping._fields, context.thisOutputFieldsForStreams[stream]!!) - } else if (grouping.is_set_custom_serialized) { - val customGrouping = Utils.javaDeserialize(grouping._custom_serialized, Serializable::class.java)!! - customGrouping as CustomStreamGrouping - } else { - throw UnsupportedOperationException("Grouping type $grouping isn't currently supported by Kumulus") - } + val kGrouping = + if (grouping.is_set_all) { + AllGrouping() + } else if (grouping.is_set_none || grouping.is_set_shuffle || grouping.is_set_local_or_shuffle) { + ShuffleGrouping() + } else if (grouping.is_set_fields) { + FieldsGrouping(grouping._fields, context.thisOutputFieldsForStreams[stream]!!) + } else if (grouping.is_set_custom_serialized) { + val customGrouping = Utils.javaDeserialize(grouping._custom_serialized, Serializable::class.java)!! + customGrouping as CustomStreamGrouping + } else { + throw UnsupportedOperationException("Grouping type $grouping isn't currently supported by Kumulus") + } kGrouping.prepare(this.context, GlobalStreamId(component, stream), context.getComponentTasks(component)) - groupingStateMap[stream] = (groupingStateMap[stream] ?: mutableMapOf()).also { - it[component] = kGrouping - } + groupingStateMap[stream] = + (groupingStateMap[stream] ?: mutableMapOf()).also { + it[component] = kGrouping + } } } this.groupingStateMap = groupingStateMap isReady.set(true) } - override fun toString(): String { - return "[Component $componentId->$taskId]" - } + override fun toString(): String = "[Component $componentId->$taskId]" } -abstract class KumulusMessage(val component: KumulusComponent) +abstract class KumulusMessage( + val component: KumulusComponent, +) abstract class PrepareMessage( component: KumulusComponent, - val collector: KumulusCollector + val collector: KumulusCollector, ) : KumulusMessage(component) -class SpoutPrepareMessage(component: KumulusComponent, collector: KumulusSpoutCollector) : - PrepareMessage(component, collector) +class SpoutPrepareMessage( + component: KumulusComponent, + collector: KumulusSpoutCollector, +) : PrepareMessage(component, collector) -class BoltPrepareMessage(component: KumulusComponent, collector: KumulusBoltCollector) : - PrepareMessage(component, collector) +class BoltPrepareMessage( + component: KumulusComponent, + collector: KumulusBoltCollector, +) : PrepareMessage(component, collector) -class ExecuteMessage(component: KumulusComponent, val tuple: KumulusTuple, val isLate: AtomicBoolean = AtomicBoolean(false)) : - KumulusMessage(component) +class ExecuteMessage( + component: KumulusComponent, + val tuple: KumulusTuple, + val isLate: AtomicBoolean = AtomicBoolean(false), +) : KumulusMessage(component) class AckMessage( spout: KumulusSpout, val spoutMessageId: Any?, val ack: Boolean, val timeoutComponents: List, - val failedComponents: List + val failedComponents: List, ) : KumulusMessage(spout) diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt index 9dfcc95..f08cd88 100644 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusFailureNotificationSpout.kt @@ -9,5 +9,8 @@ interface KumulusFailureNotificationSpout { /** * Spout fail hook that includes a list of failed bolts (bolts that did not failed the msgId instead of acking) */ - fun messageIdFailure(msgId: Any?, failedComponents: List) + fun messageIdFailure( + msgId: Any?, + failedComponents: List, + ) } diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt index 3699c3a..3d698a9 100644 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusSpout.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus.component -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.spout.SpoutOutputCollector import org.apache.storm.task.TopologyContext import org.apache.storm.topology.IRichSpout @@ -13,7 +13,7 @@ import java.util.concurrent.atomic.AtomicBoolean class KumulusSpout( config: Map, context: TopologyContext, - componentInstance: IRichSpout + componentInstance: IRichSpout, ) : KumulusComponent(config, context) { companion object { private val logger = KotlinLogging.logger {} @@ -27,7 +27,9 @@ class KumulusSpout( val queue = LinkedBlockingQueue() fun prepare(collector: KumulusSpoutCollector) { - logger.debug { "Created spout '$componentId' with taskId $taskId (index: ${context.thisTaskIndex}). Object hashcode: ${this.hashCode()}" } + logger.debug { + "Created spout '$componentId' with taskId $taskId (index: ${context.thisTaskIndex}). Object hashcode: ${this.hashCode()}" + } spout.open(config, context, SpoutOutputCollector(collector)) super.prepare() } @@ -43,7 +45,7 @@ class KumulusSpout( private fun fail( msgId: Any?, timeoutComponents: List, - failedComponents: List + failedComponents: List, ) { if (spout is KumulusTimeoutNotificationSpout) { spout.messageIdFailure(msgId, failedComponents, timeoutComponents) @@ -96,26 +98,28 @@ class KumulusSpout( } private fun mainLoopMethod(acker: KumulusAcker) { - queue.poll()?.also { ackMessage -> - if (ackMessage.ack) { - ack(ackMessage.spoutMessageId) - } else { - fail(ackMessage.spoutMessageId, ackMessage.timeoutComponents, ackMessage.failedComponents) - } - }.let { - if (it == null && isReady.get()) { - if (acker.waitForSpoutAvailability()) { - if (inUse.compareAndSet(false, true)) { - try { - if (isReady.get()) { - nextTuple() + queue + .poll() + ?.also { ackMessage -> + if (ackMessage.ack) { + ack(ackMessage.spoutMessageId) + } else { + fail(ackMessage.spoutMessageId, ackMessage.timeoutComponents, ackMessage.failedComponents) + } + }.let { + if (it == null && isReady.get()) { + if (acker.waitForSpoutAvailability()) { + if (inUse.compareAndSet(false, true)) { + try { + if (isReady.get()) { + nextTuple() + } + } finally { + inUse.set(false) } - } finally { - inUse.set(false) } } } } - } } } diff --git a/src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt b/src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt index c506e43..f4719e8 100644 --- a/src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt +++ b/src/main/kotlin/org/xyro/kumulus/component/KumulusTimeoutNotificationSpout.kt @@ -10,5 +10,9 @@ interface KumulusTimeoutNotificationSpout { /** * Spout fail hook that includes a list of timeout bolts (bolts that did not ack/fail the msgId) */ - fun messageIdFailure(msgId: Any?, failedComponents: List, timeoutComponents: List) + fun messageIdFailure( + msgId: Any?, + failedComponents: List, + timeoutComponents: List, + ) } diff --git a/src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt b/src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt index 57c1902..9509ff9 100644 --- a/src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt +++ b/src/main/kotlin/org/xyro/kumulus/component/TupleImpl.kt @@ -28,7 +28,7 @@ open class TupleImpl : Tuple { throw IllegalArgumentException( "Tuple created with wrong number of fields. " + "Expected " + schema.size() + " fields but got " + - values.size + " fields" + values.size + " fields", ) } } @@ -36,144 +36,75 @@ open class TupleImpl : Tuple { constructor(context: GeneralTopologyContext, values: List, taskId: Int, streamId: String) : this(context, values, taskId, streamId, MessageId.makeUnanchored(), null) - override fun size(): Int { - return values.size - } + override fun size(): Int = values.size - override fun fieldIndex(field: String): Int { - return fields.fieldIndex(field) - } + override fun fieldIndex(field: String): Int = fields.fieldIndex(field) - override operator fun contains(field: String): Boolean { - return fields.contains(field) - } + override operator fun contains(field: String): Boolean = fields.contains(field) - override fun getValue(i: Int): Any { - return values[i] - } + override fun getValue(i: Int): Any = values[i] - override fun getString(i: Int): String { - return values[i] as String - } + override fun getString(i: Int): String = values[i] as String - override fun getInteger(i: Int): Int? { - return values[i] as Int - } + override fun getInteger(i: Int): Int? = values[i] as Int - override fun getLong(i: Int): Long? { - return values[i] as Long - } + override fun getLong(i: Int): Long? = values[i] as Long - override fun getBoolean(i: Int): Boolean? { - return values[i] as Boolean - } + override fun getBoolean(i: Int): Boolean? = values[i] as Boolean - override fun getShort(i: Int): Short? { - return values[i] as Short - } + override fun getShort(i: Int): Short? = values[i] as Short - override fun getByte(i: Int): Byte? { - return values[i] as Byte - } + override fun getByte(i: Int): Byte? = values[i] as Byte - override fun getDouble(i: Int): Double? { - return values[i] as Double - } + override fun getDouble(i: Int): Double? = values[i] as Double - override fun getFloat(i: Int): Float? { - return values[i] as Float - } + override fun getFloat(i: Int): Float? = values[i] as Float - override fun getBinary(i: Int): ByteArray { - return values[i] as ByteArray - } + override fun getBinary(i: Int): ByteArray = values[i] as ByteArray - override fun getValueByField(field: String): Any { - return values[fieldIndex(field)] - } + override fun getValueByField(field: String): Any = values[fieldIndex(field)] - override fun getStringByField(field: String): String { - return values[fieldIndex(field)] as String - } + override fun getStringByField(field: String): String = values[fieldIndex(field)] as String - override fun getIntegerByField(field: String): Int? { - return values[fieldIndex(field)] as Int - } + override fun getIntegerByField(field: String): Int? = values[fieldIndex(field)] as Int - override fun getLongByField(field: String): Long? { - return values[fieldIndex(field)] as Long - } + override fun getLongByField(field: String): Long? = values[fieldIndex(field)] as Long - override fun getBooleanByField(field: String): Boolean? { - return values[fieldIndex(field)] as Boolean - } + override fun getBooleanByField(field: String): Boolean? = values[fieldIndex(field)] as Boolean - override fun getShortByField(field: String): Short? { - return values[fieldIndex(field)] as Short - } + override fun getShortByField(field: String): Short? = values[fieldIndex(field)] as Short - override fun getByteByField(field: String): Byte? { - return values[fieldIndex(field)] as Byte - } + override fun getByteByField(field: String): Byte? = values[fieldIndex(field)] as Byte - override fun getDoubleByField(field: String): Double? { - return values[fieldIndex(field)] as Double - } + override fun getDoubleByField(field: String): Double? = values[fieldIndex(field)] as Double - override fun getFloatByField(field: String): Float? { - return values[fieldIndex(field)] as Float - } + override fun getFloatByField(field: String): Float? = values[fieldIndex(field)] as Float - override fun getBinaryByField(field: String): ByteArray { - return values[fieldIndex(field)] as ByteArray - } + override fun getBinaryByField(field: String): ByteArray = values[fieldIndex(field)] as ByteArray - override fun getValues(): List { - return values - } + override fun getValues(): List = values - override fun getFields(): Fields { - return context.getComponentOutputFields(sourceComponent, sourceStreamId) - } + override fun getFields(): Fields = context.getComponentOutputFields(sourceComponent, sourceStreamId) - override fun select(selector: Fields): List { - return fields.select(selector, values) - } + override fun select(selector: Fields): List = fields.select(selector, values) @Deprecated("", ReplaceWith("sourceGlobalStreamId")) - override fun getSourceGlobalStreamid(): GlobalStreamId { - return sourceGlobalStreamId - } + override fun getSourceGlobalStreamid(): GlobalStreamId = sourceGlobalStreamId - override fun getSourceGlobalStreamId(): GlobalStreamId { - return GlobalStreamId(sourceComponent, streamId) - } + override fun getSourceGlobalStreamId(): GlobalStreamId = GlobalStreamId(sourceComponent, streamId) - override fun getSourceComponent(): String { - return context.getComponentId(taskId) - } + override fun getSourceComponent(): String = context.getComponentId(taskId) - override fun getSourceTask(): Int { - return taskId - } + override fun getSourceTask(): Int = taskId - override fun getSourceStreamId(): String { - return streamId - } + override fun getSourceStreamId(): String = streamId - override fun getMessageId(): MessageId { - return id - } + override fun getMessageId(): MessageId = id - override fun toString(): String { - return """source: $sourceComponent:$taskId, stream: $streamId, id: $id, $values [spoutMessageId: $spoutMessageId]""" - } + override fun toString(): String = + """source: $sourceComponent:$taskId, stream: $streamId, id: $id, $values [spoutMessageId: $spoutMessageId]""" - override fun equals(other: Any?): Boolean { - return this === other - } + override fun equals(other: Any?): Boolean = this === other - override fun hashCode(): Int { - return System.identityHashCode(this) - } + override fun hashCode(): Int = System.identityHashCode(this) } diff --git a/src/main/kotlin/org/xyro/kumulus/grouping/AllGrouping.kt b/src/main/kotlin/org/xyro/kumulus/grouping/AllGrouping.kt index a8763f8..3160e47 100644 --- a/src/main/kotlin/org/xyro/kumulus/grouping/AllGrouping.kt +++ b/src/main/kotlin/org/xyro/kumulus/grouping/AllGrouping.kt @@ -7,11 +7,16 @@ import org.apache.storm.task.WorkerTopologyContext class AllGrouping : CustomStreamGrouping { private lateinit var tasks: List - override fun prepare(context: WorkerTopologyContext, stream: GlobalStreamId, targetTasks: List) { + override fun prepare( + context: WorkerTopologyContext, + stream: GlobalStreamId, + targetTasks: List, + ) { this.tasks = targetTasks.toList() } - override fun chooseTasks(taskId: Int, values: List): List { - return tasks - } + override fun chooseTasks( + taskId: Int, + values: List, + ): List = tasks } diff --git a/src/main/kotlin/org/xyro/kumulus/grouping/FieldsGrouping.kt b/src/main/kotlin/org/xyro/kumulus/grouping/FieldsGrouping.kt index 49db83c..4892000 100644 --- a/src/main/kotlin/org/xyro/kumulus/grouping/FieldsGrouping.kt +++ b/src/main/kotlin/org/xyro/kumulus/grouping/FieldsGrouping.kt @@ -7,15 +7,22 @@ import kotlin.math.absoluteValue class FieldsGrouping( private val groupingFields: List, - private val outputFields: List + private val outputFields: List, ) : CustomStreamGrouping { private lateinit var tasks: List - override fun prepare(context: WorkerTopologyContext, stream: GlobalStreamId, targetTasks: List) { + override fun prepare( + context: WorkerTopologyContext, + stream: GlobalStreamId, + targetTasks: List, + ) { this.tasks = targetTasks.toList() } - override fun chooseTasks(taskId: Int, values: List): List { + override fun chooseTasks( + taskId: Int, + values: List, + ): List { var groupingHashes = 0L groupingFields.forEach { gField -> diff --git a/src/main/kotlin/org/xyro/kumulus/topology/KumulusTopologyBuilder.kt b/src/main/kotlin/org/xyro/kumulus/topology/KumulusTopologyBuilder.kt index ca3bf59..c684496 100644 --- a/src/main/kotlin/org/xyro/kumulus/topology/KumulusTopologyBuilder.kt +++ b/src/main/kotlin/org/xyro/kumulus/topology/KumulusTopologyBuilder.kt @@ -33,17 +33,28 @@ class KumulusTopologyBuilder { private val bolts: LinkedHashMap = LinkedHashMap() private val stateSpouts: LinkedHashMap = LinkedHashMap() - fun setSpout(id: String, spout: IRichSpout): KumulusSpoutDeclarer { + fun setSpout( + id: String, + spout: IRichSpout, + ): KumulusSpoutDeclarer { validateUnusedId(id) return registerSpout(id, spout, null) } - fun setSpout(id: String, spout: IRichSpout, parallelismHint: Number): KumulusSpoutDeclarer { + fun setSpout( + id: String, + spout: IRichSpout, + parallelismHint: Number, + ): KumulusSpoutDeclarer { validateUnusedId(id) return registerSpout(id, spout, parallelismHint) } - private fun registerSpout(id: String, spout: IRichSpout, parallelismHint: Number?): KumulusSpoutDeclarer { + private fun registerSpout( + id: String, + spout: IRichSpout, + parallelismHint: Number?, + ): KumulusSpoutDeclarer { val common = ComponentCommon() common.set_inputs(mutableMapOf()) initCommon(common, spout, parallelismHint) @@ -53,57 +64,78 @@ class KumulusTopologyBuilder { return KumulusSpoutDeclarer(spouts[id]!!) } - fun setBolt(id: String, bolt: IRichBolt): KumulusBoltDeclarer { - return setBoltInternal(id, bolt, null) - } + fun setBolt( + id: String, + bolt: IRichBolt, + ): KumulusBoltDeclarer = setBoltInternal(id, bolt, null) - private fun setBoltInternal(id: String, bolt: IRichBolt, parallelismHint: Number?): KumulusBoltDeclarer { + private fun setBoltInternal( + id: String, + bolt: IRichBolt, + parallelismHint: Number?, + ): KumulusBoltDeclarer { validateUnusedId(id) return registerBolt(id, bolt, parallelismHint) } - fun setBolt(id: String, bolt: IWindowedBolt): KumulusBoltDeclarer { - return setBoltInternal(id, WindowedBoltExecutor(bolt), null) - } + fun setBolt( + id: String, + bolt: IWindowedBolt, + ): KumulusBoltDeclarer = setBoltInternal(id, WindowedBoltExecutor(bolt), null) - fun setBolt(id: String, bolt: IWindowedBolt, parallelismHint: Number): KumulusBoltDeclarer { - return setBolt(id, WindowedBoltExecutor(bolt), parallelismHint) - } + fun setBolt( + id: String, + bolt: IWindowedBolt, + parallelismHint: Number, + ): KumulusBoltDeclarer = setBolt(id, WindowedBoltExecutor(bolt), parallelismHint) - fun setBolt(id: String, bolt: IStatefulBolt): KumulusBoltDeclarer { - return setBoltInternal(id, StatefulBoltExecutor(bolt), null) - } + fun setBolt( + id: String, + bolt: IStatefulBolt, + ): KumulusBoltDeclarer = setBoltInternal(id, StatefulBoltExecutor(bolt), null) - fun setBolt(id: String, bolt: IStatefulBolt, parallelismHint: Number): KumulusBoltDeclarer { - return setBolt(id, StatefulBoltExecutor(bolt), parallelismHint) - } + fun setBolt( + id: String, + bolt: IStatefulBolt, + parallelismHint: Number, + ): KumulusBoltDeclarer = setBolt(id, StatefulBoltExecutor(bolt), parallelismHint) - fun setBolt(id: String, bolt: IStatefulWindowedBolt): KumulusBoltDeclarer { - return setBoltInternal(id, StatefulWindowedBoltExecutor(bolt), null) - } + fun setBolt( + id: String, + bolt: IStatefulWindowedBolt, + ): KumulusBoltDeclarer = setBoltInternal(id, StatefulWindowedBoltExecutor(bolt), null) fun setBolt( id: String, bolt: IStatefulWindowedBolt, - parallelismHint: Number - ): KumulusBoltDeclarer { - return setBolt(id, StatefulWindowedBoltExecutor(bolt), parallelismHint) - } + parallelismHint: Number, + ): KumulusBoltDeclarer = setBolt(id, StatefulWindowedBoltExecutor(bolt), parallelismHint) - fun setBolt(id: String, bolt: IBasicBolt): KumulusBoltDeclarer { - return setBoltInternal(id, BasicBoltExecutor(bolt), null) - } + fun setBolt( + id: String, + bolt: IBasicBolt, + ): KumulusBoltDeclarer = setBoltInternal(id, BasicBoltExecutor(bolt), null) - fun setBolt(id: String, bolt: IBasicBolt, parallelismHint: Number): KumulusBoltDeclarer { - return setBolt(id, BasicBoltExecutor(bolt), parallelismHint) - } + fun setBolt( + id: String, + bolt: IBasicBolt, + parallelismHint: Number, + ): KumulusBoltDeclarer = setBolt(id, BasicBoltExecutor(bolt), parallelismHint) - fun setBolt(id: String, bolt: IRichBolt, parallelismHint: Number): KumulusBoltDeclarer { + fun setBolt( + id: String, + bolt: IRichBolt, + parallelismHint: Number, + ): KumulusBoltDeclarer { validateUnusedId(id) return registerBolt(id, bolt, parallelismHint) } - private fun registerBolt(id: String, bolt: IRichBolt, parallelismHint: Number?): KumulusBoltDeclarer { + private fun registerBolt( + id: String, + bolt: IRichBolt, + parallelismHint: Number?, + ): KumulusBoltDeclarer { val common = ComponentCommon() common.set_inputs(mutableMapOf()) initCommon(common, bolt, parallelismHint) @@ -113,15 +145,20 @@ class KumulusTopologyBuilder { return KumulusBoltDeclarer(bolts[id]!!) } - fun createTopology(): KumulusTopologyDefinition { - return KumulusTopologyDefinition(spouts.toMap(), bolts.toMap()) - } + fun createTopology(): KumulusTopologyDefinition = KumulusTopologyDefinition(spouts.toMap(), bolts.toMap()) - fun setStateSpout(id: String, spout: IRichStateSpout) { + fun setStateSpout( + id: String, + spout: IRichStateSpout, + ) { setStateSpout(id, spout, 1) } - fun setStateSpout(id: String, spout: IRichStateSpout, parallelismHint: Number) { + fun setStateSpout( + id: String, + spout: IRichStateSpout, + parallelismHint: Number, + ) { validateUnusedId(id) stateSpouts[id] = spout throw UnsupportedOperationException("State spouts are not supported in KumulusTopologyBuilder") @@ -134,7 +171,11 @@ class KumulusTopologyBuilder { // Kumulus runs in-process and does not have worker-level hooks. } - private fun initCommon(common: ComponentCommon, component: Any, parallelismHint: Number?) { + private fun initCommon( + common: ComponentCommon, + component: Any, + parallelismHint: Number?, + ) { if (parallelismHint != null) { val parallelism = parallelismHint.toInt() if (parallelism < 1) { @@ -143,11 +184,12 @@ class KumulusTopologyBuilder { common.set_parallelism_hint(parallelism) } - val config = when (component) { - is IRichBolt -> component.componentConfiguration - is IRichSpout -> component.componentConfiguration - else -> null - } + val config = + when (component) { + is IRichBolt -> component.componentConfiguration + is IRichSpout -> component.componentConfiguration + else -> null + } if (config != null) { common.set_json_conf(JSONValue.toJSONString(config)) } @@ -180,27 +222,28 @@ class KumulusTopologyBuilder { data class KumulusTopologyDefinition( val spouts: Map, - val bolts: Map + val bolts: Map, ) data class KumulusDeclaredSpout( val id: String, val spout: IRichSpout, - val common: ComponentCommon + val common: ComponentCommon, ) data class KumulusDeclaredBolt( val id: String, val bolt: IRichBolt, - val common: ComponentCommon + val common: ComponentCommon, ) class KumulusSpoutDeclarer internal constructor( - private val registration: KumulusDeclaredSpout + private val registration: KumulusDeclaredSpout, ) { - fun addConfiguration(configKey: String, configValue: Any?): KumulusSpoutDeclarer { - return addConfigurations(mapOf(configKey to configValue)) - } + fun addConfiguration( + configKey: String, + configValue: Any?, + ): KumulusSpoutDeclarer = addConfigurations(mapOf(configKey to configValue)) fun addConfigurations(conf: Map): KumulusSpoutDeclarer { if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { @@ -218,68 +261,103 @@ class KumulusSpoutDeclarer internal constructor( } class KumulusBoltDeclarer internal constructor( - private val registration: KumulusDeclaredBolt + private val registration: KumulusDeclaredBolt, ) { - fun fieldsGrouping(componentId: String, fields: Fields): KumulusBoltDeclarer { - return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields) - } + fun fieldsGrouping( + componentId: String, + fields: Fields, + ): KumulusBoltDeclarer = fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields) - fun fieldsGrouping(componentId: String, streamId: String, fields: Fields): KumulusBoltDeclarer { - return grouping( + fun fieldsGrouping( + componentId: String, + streamId: String, + fields: Fields, + ): KumulusBoltDeclarer = + grouping( componentId, streamId, - Grouping().apply { set_fields(fields.toList()) } + Grouping().apply { set_fields(fields.toList()) }, ) - } - fun shuffleGrouping(componentId: String): KumulusBoltDeclarer { - return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID) - } + fun shuffleGrouping(componentId: String): KumulusBoltDeclarer = shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID) - fun shuffleGrouping(componentId: String, streamId: String): KumulusBoltDeclarer { - return grouping(componentId, streamId, Grouping().apply { set_shuffle(NullStruct()) }) - } + fun shuffleGrouping( + componentId: String, + streamId: String, + ): KumulusBoltDeclarer = + grouping( + componentId, + streamId, + Grouping().apply { + set_shuffle(NullStruct()) + }, + ) - fun localOrShuffleGrouping(componentId: String): KumulusBoltDeclarer { - return localOrShuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID) - } + fun localOrShuffleGrouping(componentId: String): KumulusBoltDeclarer = localOrShuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID) - fun localOrShuffleGrouping(componentId: String, streamId: String): KumulusBoltDeclarer { - return grouping(componentId, streamId, Grouping().apply { set_local_or_shuffle(NullStruct()) }) - } + fun localOrShuffleGrouping( + componentId: String, + streamId: String, + ): KumulusBoltDeclarer = + grouping( + componentId, + streamId, + Grouping().apply { + set_local_or_shuffle(NullStruct()) + }, + ) - fun noneGrouping(componentId: String): KumulusBoltDeclarer { - return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID) - } + fun noneGrouping(componentId: String): KumulusBoltDeclarer = noneGrouping(componentId, Utils.DEFAULT_STREAM_ID) - fun noneGrouping(componentId: String, streamId: String): KumulusBoltDeclarer { - return grouping(componentId, streamId, Grouping().apply { set_none(NullStruct()) }) - } + fun noneGrouping( + componentId: String, + streamId: String, + ): KumulusBoltDeclarer = + grouping( + componentId, + streamId, + Grouping().apply { + set_none(NullStruct()) + }, + ) - fun allGrouping(componentId: String): KumulusBoltDeclarer { - return allGrouping(componentId, Utils.DEFAULT_STREAM_ID) - } + fun allGrouping(componentId: String): KumulusBoltDeclarer = allGrouping(componentId, Utils.DEFAULT_STREAM_ID) - fun allGrouping(componentId: String, streamId: String): KumulusBoltDeclarer { - return grouping(componentId, streamId, Grouping().apply { set_all(NullStruct()) }) - } + fun allGrouping( + componentId: String, + streamId: String, + ): KumulusBoltDeclarer = + grouping( + componentId, + streamId, + Grouping().apply { + set_all(NullStruct()) + }, + ) - fun directGrouping(componentId: String): KumulusBoltDeclarer { - return directGrouping(componentId, Utils.DEFAULT_STREAM_ID) - } + fun directGrouping(componentId: String): KumulusBoltDeclarer = directGrouping(componentId, Utils.DEFAULT_STREAM_ID) - fun directGrouping(componentId: String, streamId: String): KumulusBoltDeclarer { - return grouping(componentId, streamId, Grouping().apply { set_direct(NullStruct()) }) - } + fun directGrouping( + componentId: String, + streamId: String, + ): KumulusBoltDeclarer = + grouping( + componentId, + streamId, + Grouping().apply { + set_direct(NullStruct()) + }, + ) - fun customGrouping(componentId: String, grouping: CustomStreamGrouping): KumulusBoltDeclarer { - return customGrouping(componentId, Utils.DEFAULT_STREAM_ID, grouping) - } + fun customGrouping( + componentId: String, + grouping: CustomStreamGrouping, + ): KumulusBoltDeclarer = customGrouping(componentId, Utils.DEFAULT_STREAM_ID, grouping) fun customGrouping( componentId: String, streamId: String, - grouping: CustomStreamGrouping + grouping: CustomStreamGrouping, ): KumulusBoltDeclarer { if (grouping !is Serializable) { throw IllegalArgumentException("Custom grouping must be serializable for Kumulus") @@ -287,22 +365,28 @@ class KumulusBoltDeclarer internal constructor( return this.grouping( componentId, streamId, - Grouping().apply { set_custom_serialized(Utils.javaSerialize(grouping)) } + Grouping().apply { set_custom_serialized(Utils.javaSerialize(grouping)) }, ) } - fun grouping(id: GlobalStreamId, grouping: Grouping): KumulusBoltDeclarer { + fun grouping( + id: GlobalStreamId, + grouping: Grouping, + ): KumulusBoltDeclarer { registration.common.put_to_inputs(id, grouping) return this } - fun grouping(componentId: String, streamId: String, grouping: Grouping): KumulusBoltDeclarer { - return grouping(GlobalStreamId(componentId, streamId), grouping) - } + fun grouping( + componentId: String, + streamId: String, + grouping: Grouping, + ): KumulusBoltDeclarer = grouping(GlobalStreamId(componentId, streamId), grouping) - fun addConfiguration(configKey: String, configValue: Any?): KumulusBoltDeclarer { - return addConfigurations(mapOf(configKey to configValue)) - } + fun addConfiguration( + configKey: String, + configValue: Any?, + ): KumulusBoltDeclarer = addConfigurations(mapOf(configKey to configValue)) fun addConfigurations(conf: Map): KumulusBoltDeclarer { if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { @@ -326,23 +410,37 @@ private class KumulusOutputFieldsCollector : OutputFieldsDeclarer { declare(false, fields) } - override fun declare(direct: Boolean, fields: Fields) { + override fun declare( + direct: Boolean, + fields: Fields, + ) { declareStream(Utils.DEFAULT_STREAM_ID, direct, fields) } - override fun declareStream(streamId: String, fields: Fields) { + override fun declareStream( + streamId: String, + fields: Fields, + ) { declareStream(streamId, false, fields) } - override fun declareStream(streamId: String, direct: Boolean, fields: Fields) { - streams[streamId] = StreamInfo().apply { - set_output_fields(fields.toList()) - set_direct(direct) - } + override fun declareStream( + streamId: String, + direct: Boolean, + fields: Fields, + ) { + streams[streamId] = + StreamInfo().apply { + set_output_fields(fields.toList()) + set_direct(direct) + } } } -private fun mergeWithExistingJsonConfig(common: ComponentCommon, update: Map): MutableMap { +private fun mergeWithExistingJsonConfig( + common: ComponentCommon, + update: Map, +): MutableMap { val existing = parseJsonObject(common._json_conf) val merged = HashMap(existing) merged.putAll(update) diff --git a/src/test/kotlin/org/xyro/kumulus/KumulusStormTransformerTest.kt b/src/test/kotlin/org/xyro/kumulus/KumulusStormTransformerTest.kt index eb85f19..6747d33 100644 --- a/src/test/kotlin/org/xyro/kumulus/KumulusStormTransformerTest.kt +++ b/src/test/kotlin/org/xyro/kumulus/KumulusStormTransformerTest.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.HdrHistogram.Histogram import org.apache.storm.Config import org.apache.storm.Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS @@ -53,58 +53,67 @@ class KumulusStormTransformerTest { val failingBolt = TestBasicBolt(failing = true) - val unanchoringBolt = object : IRichBolt { - lateinit var collector: OutputCollector + val unanchoringBolt = + object : IRichBolt { + lateinit var collector: OutputCollector - var thisTaskIndex: Int = 0 - var lastIndex: Int = 0 + var thisTaskIndex: Int = 0 + var lastIndex: Int = 0 - override fun prepare(stormConf: MutableMap?, context: TopologyContext?, collector: OutputCollector?) { - this.collector = collector!! - this.thisTaskIndex = context!!.thisTaskIndex - } + override fun prepare( + stormConf: MutableMap?, + context: TopologyContext?, + collector: OutputCollector?, + ) { + this.collector = collector!! + this.thisTaskIndex = context!!.thisTaskIndex + } - override fun execute(input: Tuple?) { - try { - if (input!!.sourceStreamId == Constants.SYSTEM_TICK_STREAM_ID) { - if (thisTaskIndex == 0) { - logger.info { "Got tick tuple (last seen index $lastIndex)" } + override fun execute(input: Tuple?) { + try { + if (input!!.sourceStreamId == Constants.SYSTEM_TICK_STREAM_ID) { + if (thisTaskIndex == 0) { + logger.info { "Got tick tuple (last seen index $lastIndex)" } + } + return } - return + lastIndex = input.getValueByField("index") as Int + collector.emit(input.values) + } finally { + collector.ack(input) } - lastIndex = input.getValueByField("index") as Int - collector.emit(input.values) - } finally { - collector.ack(input) } - } - override fun cleanup() {} + override fun cleanup() {} - override fun getComponentConfiguration(): MutableMap { - return mutableMapOf(TOPOLOGY_TICK_TUPLE_FREQ_SECS to 1) - } + override fun getComponentConfiguration(): MutableMap = mutableMapOf(TOPOLOGY_TICK_TUPLE_FREQ_SECS to 1) - override fun declareOutputFields(declarer: OutputFieldsDeclarer?) { - declarer?.declare(Fields("index", "nano-time")) + override fun declareOutputFields(declarer: OutputFieldsDeclarer?) { + declarer?.declare(Fields("index", "nano-time")) + } } - } val parallelism = 1 val maxPending = 1 builder.setSpout("spout", spout) - builder.setBolt("bolt", bolt, parallelism) + builder + .setBolt("bolt", bolt, parallelism) .shuffleGrouping("spout") - builder.setBolt("bolt2", bolt, parallelism) + builder + .setBolt("bolt2", bolt, parallelism) .shuffleGrouping("bolt") - builder.setBolt("bolt3", bolt, parallelism) + builder + .setBolt("bolt3", bolt, parallelism) .shuffleGrouping("bolt2") - builder.setBolt(SINK_BOLT_NAME, bolt, 1) + builder + .setBolt(SINK_BOLT_NAME, bolt, 1) .shuffleGrouping("bolt3") - builder.setBolt("unanchoring_bolt", unanchoringBolt, parallelism) + builder + .setBolt("unanchoring_bolt", unanchoringBolt, parallelism) .shuffleGrouping("bolt2") - builder.setBolt("failing_bolt", failingBolt, parallelism) + builder + .setBolt("failing_bolt", failingBolt, parallelism) .shuffleGrouping("unanchoring_bolt") config[Config.TOPOLOGY_DISRUPTOR_BATCH_SIZE] = 1 @@ -136,7 +145,7 @@ class KumulusStormTransformerTest { kumulusTopology.onBusyBoltHook = { comp, _, busyNanos, _ -> busyTimeMap.compute( - comp + comp, ) { _, v -> when (v) { null -> busyNanos @@ -153,14 +162,16 @@ class KumulusStormTransformerTest { logger.info { "Max spout pending: $maxPending" } kumulusTopology.stop() - busyTimeMap.map { (bolt, waitNanos) -> - bolt to waitNanos - }.sortedBy { - it.second - }.reversed().forEach { (bolt, waitNanos) -> - val waitMillis = waitNanos.toDouble() / 1000 / 1000 - println("Component $bolt waited a total of ${waitMillis}ms during the test execution") - } + busyTimeMap + .map { (bolt, waitNanos) -> + bolt to waitNanos + }.sortedBy { + it.second + }.reversed() + .forEach { (bolt, waitNanos) -> + val waitMillis = waitNanos.toDouble() / 1000 / 1000 + println("Component $bolt waited a total of ${waitMillis}ms during the test execution") + } } else { val cluster = LocalCluster() cluster.submitTopology("testtopology", config, topology) @@ -183,33 +194,43 @@ class KumulusStormTransformerTest { var connectNext = "spout" for (i in 1..1000) { val boltName = "bolt-$i" - builder.setBolt( - boltName, + builder + .setBolt( + boltName, + object : BaseBasicBolt() { + override fun execute( + input: Tuple, + collector: BasicOutputCollector, + ) { + collector.emit(input.select(spoutFields)) + } + + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { + declarer.declare(spoutFields) + } + }, + ).noneGrouping(connectNext) + connectNext = boltName + } + + builder + .setBolt( + "end_bolt", object : BaseBasicBolt() { - override fun execute(input: Tuple, collector: BasicOutputCollector) { - collector.emit(input.select(spoutFields)) + override fun execute( + input: Tuple, + collector: BasicOutputCollector, + ) { + val values = input.select(spoutFields)!! + val nanoTime = values[1] as Long + logger.info { "Took: ${(System.nanoTime() - nanoTime) / 1000 / 1000.0}ms" } } + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { declarer.declare(spoutFields) } - } + }, ).noneGrouping(connectNext) - connectNext = boltName - } - - builder.setBolt( - "end_bolt", - object : BaseBasicBolt() { - override fun execute(input: Tuple, collector: BasicOutputCollector) { - val values = input.select(spoutFields)!! - val nanoTime = values[1] as Long - logger.info { "Took: ${(System.nanoTime() - nanoTime) / 1000 / 1000.0}ms" } - } - override fun declareOutputFields(declarer: OutputFieldsDeclarer) { - declarer.declare(spoutFields) - } - } - ).noneGrouping(connectNext) val kumulusTopology = KumulusStormTransformer.initializeTopology(builder.createTopology()!!, config, "testtopology") @@ -236,61 +257,75 @@ class KumulusStormTransformerTest { val size = 100 - val boltDeclarer = builder.setBolt( - "join", - object : BaseBasicBolt() { - var pending: Int = size - var currentMsgId: Any? = null - - override fun execute(input: Tuple, collector: BasicOutputCollector) { - val jobId = input.getInteger(0) - if (jobId != currentMsgId) { - pending = size - currentMsgId = jobId - } - pending -= 1 - if (pending == 0) { - collector.emit(input.select(spoutFields)) - } - } - - override fun declareOutputFields(declarer: OutputFieldsDeclarer) { - declarer.declare(spoutFields) - } - } - ) - - for (i in 1..size) { - val boltName = "bolt-$i" + val boltDeclarer = builder.setBolt( - boltName, + "join", object : BaseBasicBolt() { - override fun execute(input: Tuple, collector: BasicOutputCollector) { - for (j in 0..9) { + var pending: Int = size + var currentMsgId: Any? = null + + override fun execute( + input: Tuple, + collector: BasicOutputCollector, + ) { + val jobId = input.getInteger(0) + if (jobId != currentMsgId) { + pending = size + currentMsgId = jobId + } + pending -= 1 + if (pending == 0) { collector.emit(input.select(spoutFields)) } } + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { declarer.declare(spoutFields) } - } - ).noneGrouping("spout") + }, + ) + + for (i in 1..size) { + val boltName = "bolt-$i" + builder + .setBolt( + boltName, + object : BaseBasicBolt() { + override fun execute( + input: Tuple, + collector: BasicOutputCollector, + ) { + for (j in 0..9) { + collector.emit(input.select(spoutFields)) + } + } + + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { + declarer.declare(spoutFields) + } + }, + ).noneGrouping("spout") boltDeclarer.noneGrouping(boltName) } - builder.setBolt( - "end_bolt", - object : BaseBasicBolt() { - override fun execute(input: Tuple, collector: BasicOutputCollector) { - val values = input.select(spoutFields)!! - val nanoTime = values[1] as Long - logger.info { "Took: ${(System.nanoTime() - nanoTime) / 1000 / 1000.0}ms" } - } - override fun declareOutputFields(declarer: OutputFieldsDeclarer) { - declarer.declare(spoutFields) - } - } - ).noneGrouping("join") + builder + .setBolt( + "end_bolt", + object : BaseBasicBolt() { + override fun execute( + input: Tuple, + collector: BasicOutputCollector, + ) { + val values = input.select(spoutFields)!! + val nanoTime = values[1] as Long + logger.info { "Took: ${(System.nanoTime() - nanoTime) / 1000 / 1000.0}ms" } + } + + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { + declarer.declare(spoutFields) + } + }, + ).noneGrouping("join") val kumulusTopology = KumulusStormTransformer.initializeTopology(builder.createTopology()!!, config, "testtopology") @@ -317,7 +352,11 @@ class KumulusStormTransformerTest { } } - override fun open(conf: MutableMap?, context: TopologyContext?, collector: SpoutOutputCollector?) { + override fun open( + conf: MutableMap?, + context: TopologyContext?, + collector: SpoutOutputCollector?, + ) { this.collector = collector start.compareAndSet(0L, System.currentTimeMillis()) } @@ -356,20 +395,28 @@ class KumulusStormTransformerTest { } } - class TestBasicBolt(private val failing: Boolean = false) : BaseBasicBolt() { + class TestBasicBolt( + private val failing: Boolean = false, + ) : BaseBasicBolt() { lateinit var context: TopologyContext private lateinit var histogram: Histogram private var count = 0 - override fun prepare(stormConf: MutableMap?, context: TopologyContext?) { + override fun prepare( + stormConf: MutableMap?, + context: TopologyContext?, + ) { this.context = context!! histogram = Histogram(4) super.prepare(stormConf, context) } - override fun execute(input: Tuple?, collector: BasicOutputCollector?) { + override fun execute( + input: Tuple?, + collector: BasicOutputCollector?, + ) { val index: Int = input?.getValueByField("index") as Int val tookNanos = System.nanoTime() - input.getValueByField("nano-time") as Long @@ -387,7 +434,7 @@ class KumulusStormTransformerTest { logger.info { StringBuilder( "[index: $index] Latency histogram values for " + - "${context.thisComponentId}/${context.thisTaskId}:\n" + "${context.thisComponentId}/${context.thisTaskId}:\n", ).also { sb -> LOG_PERCENTILES.forEach { percentile -> val duration = histogram.getValueAtPercentile(percentile) @@ -406,9 +453,7 @@ class KumulusStormTransformerTest { } } - private fun toMillis(i: Long): Double { - return i / 1000.0 - } + private fun toMillis(i: Long): Double = i / 1000.0 override fun declareOutputFields(declarer: OutputFieldsDeclarer?) { declarer?.declare(Fields("index", "nano-time")) diff --git a/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderApiParityTest.kt b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderApiParityTest.kt index 5000df6..ae4baf4 100644 --- a/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderApiParityTest.kt +++ b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderApiParityTest.kt @@ -16,27 +16,26 @@ class KumulusTopologyBuilderApiParityTest { val extraInNative = nativeApi - stormApi if (missingFromNative.isNotEmpty() || extraInNative.isNotEmpty()) { - val message = buildString { - appendLine("KumulusTopologyBuilder API mismatch against Storm TopologyBuilder.") - appendLine("Missing in native builder:") - appendLine(formatSignatureList(missingFromNative)) - appendLine("Extra in native builder:") - appendLine(formatSignatureList(extraInNative)) - } + val message = + buildString { + appendLine("KumulusTopologyBuilder API mismatch against Storm TopologyBuilder.") + appendLine("Missing in native builder:") + appendLine(formatSignatureList(missingFromNative)) + appendLine("Extra in native builder:") + appendLine(formatSignatureList(extraInNative)) + } fail(message) } } - private fun publicInstanceMethodSignatures(clazz: Class<*>): Set { - return clazz.declaredMethods + private fun publicInstanceMethodSignatures(clazz: Class<*>): Set = + clazz.declaredMethods .filter { Modifier.isPublic(it.modifiers) && !Modifier.isStatic(it.modifiers) } .filterNot { it.isSynthetic || it.isBridge } .map { method -> val parameterTypes = method.parameterTypes.joinToString(",") { it.name } "${method.name}($parameterTypes)" - } - .toSet() - } + }.toSet() private fun formatSignatureList(signatures: Set): String { if (signatures.isEmpty()) { diff --git a/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderConfigurationParityTest.kt b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderConfigurationParityTest.kt index 8a4dc66..d6d7166 100644 --- a/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderConfigurationParityTest.kt +++ b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderConfigurationParityTest.kt @@ -26,14 +26,22 @@ class KumulusTopologyBuilderConfigurationParityTest { stormDeclarer.addConfiguration("k1", "v1") stormDeclarer.addConfiguration("k2", 2) stormDeclarer.addConfigurations(mapOf("k3" to true, "k1" to "override")) - val stormJson = stormBuilder.createTopology()._spouts["spout"]!!._common._json_conf + val stormJson = + stormBuilder + .createTopology() + ._spouts["spout"]!! + ._common._json_conf val nativeBuilder = KumulusTopologyBuilder() val nativeDeclarer = nativeBuilder.setSpout("spout", ConfigurableSpout(componentConfig)) nativeDeclarer.addConfiguration("k1", "v1") nativeDeclarer.addConfiguration("k2", 2) nativeDeclarer.addConfigurations(mapOf("k3" to true, "k1" to "override")) - val nativeJson = nativeBuilder.createTopology().spouts["spout"]!!.common._json_conf + val nativeJson = + nativeBuilder + .createTopology() + .spouts["spout"]!! + .common._json_conf assertEquals(parseJson(stormJson), parseJson(nativeJson)) } @@ -47,14 +55,22 @@ class KumulusTopologyBuilderConfigurationParityTest { stormDeclarer.addConfiguration("k1", "v1") stormDeclarer.addConfiguration("k2", 2) stormDeclarer.addConfigurations(mapOf("k3" to true, "k1" to "override")) - val stormJson = stormBuilder.createTopology()._bolts["bolt"]!!._common._json_conf + val stormJson = + stormBuilder + .createTopology() + ._bolts["bolt"]!! + ._common._json_conf val nativeBuilder = KumulusTopologyBuilder() val nativeDeclarer = nativeBuilder.setBolt("bolt", ConfigurableBolt(componentConfig)) nativeDeclarer.addConfiguration("k1", "v1") nativeDeclarer.addConfiguration("k2", 2) nativeDeclarer.addConfigurations(mapOf("k3" to true, "k1" to "override")) - val nativeJson = nativeBuilder.createTopology().bolts["bolt"]!!.common._json_conf + val nativeJson = + nativeBuilder + .createTopology() + .bolts["bolt"]!! + .common._json_conf assertEquals(parseJson(stormJson), parseJson(nativeJson)) } @@ -98,20 +114,26 @@ class KumulusTopologyBuilderConfigurationParityTest { } private class ConfigurableSpout( - private val componentConfig: MutableMap + private val componentConfig: MutableMap, ) : IRichSpout { override fun open( conf: MutableMap?, context: TopologyContext?, - collector: SpoutOutputCollector? + collector: SpoutOutputCollector?, ) = Unit override fun close() = Unit + override fun activate() = Unit + override fun deactivate() = Unit + override fun nextTuple() = Unit + override fun ack(msgId: Any?) = Unit + override fun fail(msgId: Any?) = Unit + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { declarer.declare(Fields("f")) } @@ -120,16 +142,18 @@ class KumulusTopologyBuilderConfigurationParityTest { } private class ConfigurableBolt( - private val componentConfig: MutableMap + private val componentConfig: MutableMap, ) : IRichBolt { override fun prepare( stormConf: MutableMap?, context: TopologyContext?, - collector: OutputCollector? + collector: OutputCollector?, ) = Unit override fun execute(input: Tuple) = Unit + override fun cleanup() = Unit + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { declarer.declare(Fields("f")) } diff --git a/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderTest.kt b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderTest.kt index e8e4f63..8aea83f 100644 --- a/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderTest.kt +++ b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderTest.kt @@ -12,13 +12,15 @@ class KumulusTopologyBuilderTest { val config: MutableMap = mutableMapOf() builder.setSpout("spout", DummySpout()) - builder.setBolt( - "bolt", - DummyBolt { - it.declareStream("stream", Fields("num")) - } - ).noneGrouping("spout") - builder.setBolt("bolt2", DummyBolt()) + builder + .setBolt( + "bolt", + DummyBolt { + it.declareStream("stream", Fields("num")) + }, + ).noneGrouping("spout") + builder + .setBolt("bolt2", DummyBolt()) .fieldsGrouping("bolt", "stream", Fields("num")) KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") @@ -30,7 +32,8 @@ class KumulusTopologyBuilderTest { val config: MutableMap = mutableMapOf() builder.setSpout("spout", DummySpout()) - builder.setBolt("bolt", DummyBolt()) + builder + .setBolt("bolt", DummyBolt()) .noneGrouping("missing-bolt") KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") diff --git a/src/test/kotlin/org/xyro/kumulus/TestAllowExtraAckingMode.kt b/src/test/kotlin/org/xyro/kumulus/TestAllowExtraAckingMode.kt index d82d9ea..b008023 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestAllowExtraAckingMode.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestAllowExtraAckingMode.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.Config import org.apache.storm.task.OutputCollector import org.apache.storm.task.TopologyContext @@ -30,12 +30,14 @@ class TestAllowExtraAckingMode { builder.setSpout("spout2", TestSpout()) builder.setSpout("spout3", TestSpout()) - builder.setBolt("acking-bolt", TestBolt()) + builder + .setBolt("acking-bolt", TestBolt()) .allGrouping("spout") .allGrouping("spout2") .allGrouping("spout3") - builder.setBolt("acking-bolt2", TestBolt()) + builder + .setBolt("acking-bolt2", TestBolt()) .allGrouping("spout") .allGrouping("spout2") .allGrouping("spout3") @@ -54,8 +56,14 @@ class TestAllowExtraAckingMode { } class TestSpout : DummySpout({ it.declare(Fields("id")) }) { - override fun fail(msgId: Any?) { inFlight.decrementAndGet() } - override fun ack(msgId: Any?) { inFlight.decrementAndGet() } + override fun fail(msgId: Any?) { + inFlight.decrementAndGet() + } + + override fun ack(msgId: Any?) { + inFlight.decrementAndGet() + } + override fun nextTuple() { val messageId = UUID.randomUUID().toString() collector.emit(listOf(messageId), messageId) @@ -89,11 +97,18 @@ class TestAllowExtraAckingMode { collector.ack(input) // extra ack } - override fun prepare(p0: MutableMap?, p1: TopologyContext?, p2: OutputCollector) { + override fun prepare( + p0: MutableMap?, + p1: TopologyContext?, + p2: OutputCollector, + ) { this.collector = p2 } + override fun cleanup() = Unit + override fun getComponentConfiguration(): MutableMap = mutableMapOf() + override fun declareOutputFields(p0: OutputFieldsDeclarer) = Unit } diff --git a/src/test/kotlin/org/xyro/kumulus/TestAnchoringBehavior.kt b/src/test/kotlin/org/xyro/kumulus/TestAnchoringBehavior.kt index ab780d4..6f0567f 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestAnchoringBehavior.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestAnchoringBehavior.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.Config import org.apache.storm.spout.SpoutOutputCollector import org.apache.storm.task.OutputCollector @@ -27,13 +27,16 @@ class TestAnchoringBehavior { builder.setSpout("spout", LatencyDeltaSpout()) - builder.setBolt("unanchoring-bolt", UnanchoringBolt()) + builder + .setBolt("unanchoring-bolt", UnanchoringBolt()) .noneGrouping("spout") - builder.setBolt("delay-unanchored-bolt", DelayBolt()) + builder + .setBolt("delay-unanchored-bolt", DelayBolt()) .noneGrouping("unanchoring-bolt") - builder.setBolt("unanchored-bolt-2", DummyBolt()) + builder + .setBolt("unanchored-bolt-2", DummyBolt()) .noneGrouping("spout") .noneGrouping("delay-unanchored-bolt") val kumulusTopology = @@ -51,13 +54,18 @@ class TestAnchoringBehavior { assertTrue { avgDelay < 10 } } - class LatencyDeltaSpout : DummySpout({ - it.declare(Fields("id")) - }) { + class LatencyDeltaSpout : + DummySpout({ + it.declare(Fields("id")) + }) { private var index: Int = 0 private var lastCall: Long? = 0 - override fun open(conf: MutableMap?, context: TopologyContext?, collector: SpoutOutputCollector?) { + override fun open( + conf: MutableMap?, + context: TopologyContext?, + collector: SpoutOutputCollector?, + ) { super.open(conf, context, collector) this.index = 0 this.lastCall = null @@ -87,25 +95,31 @@ class TestAnchoringBehavior { collector.ack(input) } - override fun prepare(p0: MutableMap?, p1: TopologyContext?, p2: OutputCollector?) { + override fun prepare( + p0: MutableMap?, + p1: TopologyContext?, + p2: OutputCollector?, + ) { this.collector = p2!! } override fun cleanup() = Unit - override fun getComponentConfiguration(): MutableMap { - return mutableMapOf() - } + override fun getComponentConfiguration(): MutableMap = mutableMapOf() override fun declareOutputFields(p0: OutputFieldsDeclarer) { p0.declare(Fields("id")) } } - class DelayBolt : DummyBolt({ - it.declare(Fields("id")) - }) { - override fun execute(input: Tuple, collector: BasicOutputCollector) { + class DelayBolt : + DummyBolt({ + it.declare(Fields("id")) + }) { + override fun execute( + input: Tuple, + collector: BasicOutputCollector, + ) { Thread.sleep(5000) collector.emit(input.values) } diff --git a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt index f089fcb..3742901 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.Config import org.apache.storm.spout.SpoutOutputCollector import org.apache.storm.task.OutputCollector @@ -29,10 +29,12 @@ class TestDroppingStaleMessages { builder.setSpout("spout", LatencyDeltaSpout()) - builder.setBolt("unanchoring-bolt", UnanchoringBolt()) + builder + .setBolt("unanchoring-bolt", UnanchoringBolt()) .noneGrouping("spout") - builder.setBolt("delay-unanchored-bolt", StuckBolt()) + builder + .setBolt("delay-unanchored-bolt", StuckBolt()) .noneGrouping("unanchoring-bolt") val kumulusTopology = KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") @@ -52,13 +54,18 @@ class TestDroppingStaleMessages { assertTrue { lateHookCalled } } - class LatencyDeltaSpout : DummySpout({ - it.declare(Fields("id")) - }) { + class LatencyDeltaSpout : + DummySpout({ + it.declare(Fields("id")) + }) { private var index: Int = 0 private var lastCall: Long? = 0 - override fun open(conf: MutableMap?, context: TopologyContext?, collector: SpoutOutputCollector?) { + override fun open( + conf: MutableMap?, + context: TopologyContext?, + collector: SpoutOutputCollector?, + ) { super.open(conf, context, collector) this.index = 0 this.lastCall = null @@ -88,25 +95,31 @@ class TestDroppingStaleMessages { collector.ack(input) } - override fun prepare(p0: MutableMap?, p1: TopologyContext?, p2: OutputCollector?) { + override fun prepare( + p0: MutableMap?, + p1: TopologyContext?, + p2: OutputCollector?, + ) { this.collector = p2!! } override fun cleanup() = Unit - override fun getComponentConfiguration(): MutableMap { - return mutableMapOf() - } + override fun getComponentConfiguration(): MutableMap = mutableMapOf() override fun declareOutputFields(p0: OutputFieldsDeclarer) { p0.declare(Fields("id")) } } - class StuckBolt : DummyBolt({ - it.declare(Fields("id")) - }) { - override fun execute(input: Tuple, collector: BasicOutputCollector) { + class StuckBolt : + DummyBolt({ + it.declare(Fields("id")) + }) { + override fun execute( + input: Tuple, + collector: BasicOutputCollector, + ) { logger.info { "StuckBolt: started" } while (true) { Thread.sleep(50) diff --git a/src/test/kotlin/org/xyro/kumulus/TestExecuteException.kt b/src/test/kotlin/org/xyro/kumulus/TestExecuteException.kt index d095134..9ef4ba5 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestExecuteException.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestExecuteException.kt @@ -22,7 +22,8 @@ class TestExecuteException { config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L builder.setSpout("spout", TestSpout()) - builder.setBolt("execute-exception-bolt", TestExecuteExceptionBolt()) + builder + .setBolt("execute-exception-bolt", TestExecuteExceptionBolt()) .noneGrouping("spout") val kumulusTopology = KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") @@ -53,16 +54,22 @@ class TestExecuteException { } class TesExceptiontSpout : DummySpout({ it.declare(Fields()) }) { - override fun nextTuple() { - throw RuntimeException("This exception should be thrown") - } + override fun nextTuple(): Unit = throw RuntimeException("This exception should be thrown") } class TestExecuteExceptionBolt : IRichBolt { override fun execute(input: Tuple) = throw RuntimeException("This exception should be thrown") - override fun prepare(p0: MutableMap?, p1: TopologyContext?, p2: OutputCollector) {} + + override fun prepare( + p0: MutableMap?, + p1: TopologyContext?, + p2: OutputCollector, + ) {} + override fun cleanup() = Unit + override fun getComponentConfiguration(): MutableMap = mutableMapOf() + override fun declareOutputFields(p0: OutputFieldsDeclarer) = Unit } diff --git a/src/test/kotlin/org/xyro/kumulus/TestMultipleSpoutsMaxPendingLimit.kt b/src/test/kotlin/org/xyro/kumulus/TestMultipleSpoutsMaxPendingLimit.kt index 31df1bb..7bcfe4c 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestMultipleSpoutsMaxPendingLimit.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestMultipleSpoutsMaxPendingLimit.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.Config import org.apache.storm.task.OutputCollector import org.apache.storm.task.TopologyContext @@ -31,7 +31,8 @@ class TestMultipleSpoutsMaxPendingLimit { builder.setSpout("spout2", TestSpout()) builder.setSpout("spout3", TestSpout()) - builder.setBolt("acking-bolt", TestBolt()) + builder + .setBolt("acking-bolt", TestBolt()) .allGrouping("spout") .allGrouping("spout2") .allGrouping("spout3") @@ -60,7 +61,8 @@ class TestMultipleSpoutsMaxPendingLimit { builder.setSpout("spout", TestSpout()) - builder.setBolt("sleeping-bolt", SleepingBolt(), 4) + builder + .setBolt("sleeping-bolt", SleepingBolt(), 4) .shuffleGrouping("spout") val kumulusTopology = KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") @@ -94,12 +96,17 @@ class TestMultipleSpoutsMaxPendingLimit { } class SleepingBolt : BaseBasicBolt() { - override fun execute(input: Tuple, collector: BasicOutputCollector) { + override fun execute( + input: Tuple, + collector: BasicOutputCollector, + ) { inFlight.incrementAndGet() Thread.sleep(100) inFlight.decrementAndGet() } + override fun declareOutputFields(declarer: OutputFieldsDeclarer) = Unit + companion object { val inFlight = AtomicInteger(0) } @@ -113,11 +120,18 @@ class TestMultipleSpoutsMaxPendingLimit { collector.ack(input) } - override fun prepare(p0: MutableMap?, p1: TopologyContext?, p2: OutputCollector) { + override fun prepare( + p0: MutableMap?, + p1: TopologyContext?, + p2: OutputCollector, + ) { this.collector = p2 } + override fun cleanup() = Unit + override fun getComponentConfiguration(): MutableMap = mutableMapOf() + override fun declareOutputFields(p0: OutputFieldsDeclarer) = Unit } diff --git a/src/test/kotlin/org/xyro/kumulus/TestPrepareException.kt b/src/test/kotlin/org/xyro/kumulus/TestPrepareException.kt index e44d590..e1c360e 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestPrepareException.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestPrepareException.kt @@ -23,7 +23,8 @@ class TestPrepareException { config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L builder.setSpout("spout", TestSpout()) - builder.setBolt("prepare-exception-bolt", TestBolt(0)) + builder + .setBolt("prepare-exception-bolt", TestBolt(0)) .noneGrouping("spout") val kumulusTopology = KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") @@ -40,7 +41,8 @@ class TestPrepareException { config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L builder.setSpout("spout", TestSpout()) - builder.setBolt("prepare-exception-bolt", TestBolt(30)) + builder + .setBolt("prepare-exception-bolt", TestBolt(30)) .noneGrouping("spout") val kumulusTopology = KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") @@ -53,14 +55,24 @@ class TestPrepareException { } } - class TestBolt(private val prepareDelaySecs: Int) : IRichBolt { + class TestBolt( + private val prepareDelaySecs: Int, + ) : IRichBolt { override fun execute(input: Tuple) = Unit - override fun prepare(p0: MutableMap?, p1: TopologyContext?, p2: OutputCollector) { + + override fun prepare( + p0: MutableMap?, + p1: TopologyContext?, + p2: OutputCollector, + ) { Thread.sleep((prepareDelaySecs * 1000).toLong()) throw TestException() } + override fun cleanup() = Unit + override fun getComponentConfiguration(): MutableMap = mutableMapOf() + override fun declareOutputFields(p0: OutputFieldsDeclarer) = Unit } diff --git a/src/test/kotlin/org/xyro/kumulus/TestSingleAcking.kt b/src/test/kotlin/org/xyro/kumulus/TestSingleAcking.kt index 7ce5e5b..d44828a 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestSingleAcking.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestSingleAcking.kt @@ -1,6 +1,6 @@ package org.xyro.kumulus -import mu.KotlinLogging +import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.storm.Config import org.apache.storm.spout.SpoutOutputCollector import org.apache.storm.task.TopologyContext @@ -24,9 +24,11 @@ class TestSingleAcking { config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L builder.setSpout("spout", TestSpout()) - builder.setBolt("bolt1", TestBolt()) + builder + .setBolt("bolt1", TestBolt()) .noneGrouping("spout") - builder.setBolt("bolt2", TestBolt()) + builder + .setBolt("bolt2", TestBolt()) .noneGrouping("spout") val kumulusTopology = KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") @@ -39,7 +41,11 @@ class TestSingleAcking { private var index: Int = 0 private val emitted = mutableSetOf() - override fun open(conf: MutableMap?, context: TopologyContext?, collector: SpoutOutputCollector?) { + override fun open( + conf: MutableMap?, + context: TopologyContext?, + collector: SpoutOutputCollector?, + ) { super.open(conf, context, collector) this.index = 0 } @@ -69,7 +75,11 @@ class TestSingleAcking { } class TestBolt : BaseBasicBolt() { - override fun execute(p0: Tuple?, p1: BasicOutputCollector?) = Unit + override fun execute( + p0: Tuple?, + p1: BasicOutputCollector?, + ) = Unit + override fun declareOutputFields(p0: OutputFieldsDeclarer?) = Unit } diff --git a/src/test/kotlin/org/xyro/kumulus/TestTimeoutSpoutHook.kt b/src/test/kotlin/org/xyro/kumulus/TestTimeoutSpoutHook.kt index eb596c4..4c93bb5 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestTimeoutSpoutHook.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestTimeoutSpoutHook.kt @@ -41,9 +41,11 @@ class TestTimeoutSpoutHook { config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L builder.setSpout("spout", TestSpout()) - builder.setBolt("timeout-bolt", TestBolt(true)) + builder + .setBolt("timeout-bolt", TestBolt(true)) .noneGrouping("spout") - builder.setBolt("acking-bolt", TestBolt(false)) + builder + .setBolt("acking-bolt", TestBolt(false)) .noneGrouping("spout") val kumulusTopology = KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") @@ -70,9 +72,11 @@ class TestTimeoutSpoutHook { config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L builder.setSpout("spout", TestSpout()) - builder.setBolt("fail-bolt", TestFailingBolt()) + builder + .setBolt("fail-bolt", TestFailingBolt()) .noneGrouping("spout") - builder.setBolt("acking-bolt", TestBolt(false)) + builder + .setBolt("acking-bolt", TestBolt(false)) .noneGrouping("spout") val kumulusTopology = KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") @@ -89,10 +93,16 @@ class TestTimeoutSpoutHook { assertTrue { missingBolts.get().isEmpty() } } - class TestSpout : DummySpout({ it.declare(Fields("id")) }), KumulusTimeoutNotificationSpout { + class TestSpout : + DummySpout({ it.declare(Fields("id")) }), + KumulusTimeoutNotificationSpout { private var index: Int = 0 - override fun open(conf: MutableMap?, context: TopologyContext?, collector: SpoutOutputCollector?) { + override fun open( + conf: MutableMap?, + context: TopologyContext?, + collector: SpoutOutputCollector?, + ) { super.open(conf, context, collector) this.index = 0 } @@ -106,7 +116,11 @@ class TestTimeoutSpoutHook { done.countDown() } - override fun messageIdFailure(msgId: Any?, failedComponents: List, timeoutComponents: List) { + override fun messageIdFailure( + msgId: Any?, + failedComponents: List, + timeoutComponents: List, + ) { missingBolts.set(timeoutComponents) failedBolts.set(failedComponents) } @@ -117,7 +131,9 @@ class TestTimeoutSpoutHook { } } - class TestBolt(private val shouldTimeout: Boolean) : IRichBolt { + class TestBolt( + private val shouldTimeout: Boolean, + ) : IRichBolt { private lateinit var collector: OutputCollector override fun execute(input: Tuple) { @@ -126,16 +142,27 @@ class TestTimeoutSpoutHook { } } - override fun prepare(p0: MutableMap?, p1: TopologyContext?, p2: OutputCollector) { + override fun prepare( + p0: MutableMap?, + p1: TopologyContext?, + p2: OutputCollector, + ) { this.collector = p2 } + override fun cleanup() = Unit + override fun getComponentConfiguration(): MutableMap = mutableMapOf() + override fun declareOutputFields(p0: OutputFieldsDeclarer) = Unit } class TestFailingBolt : BaseBasicBolt() { - override fun execute(p0: Tuple?, p1: BasicOutputCollector?) = throw FailedException() + override fun execute( + p0: Tuple?, + p1: BasicOutputCollector?, + ) = throw FailedException() + override fun declareOutputFields(p0: OutputFieldsDeclarer?) = Unit } diff --git a/src/test/kotlin/org/xyro/kumulus/TestTopologyValidation.kt b/src/test/kotlin/org/xyro/kumulus/TestTopologyValidation.kt index 92a6762..3b58d6a 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestTopologyValidation.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestTopologyValidation.kt @@ -18,7 +18,8 @@ class TestTopologyValidation { val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() builder.setSpout("spout", DummySpout()) - builder.setBolt("bolt", DummyBolt()) + builder + .setBolt("bolt", DummyBolt()) .noneGrouping("missing-bolt") KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") } @@ -31,7 +32,8 @@ class TestTopologyValidation { builder.setBolt("bolt", DummyBolt()) - builder.setBolt("bolt2", DummyBolt()) + builder + .setBolt("bolt2", DummyBolt()) .noneGrouping("bolt", "missing-stream") KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") } @@ -42,14 +44,16 @@ class TestTopologyValidation { val config: MutableMap = mutableMapOf() builder.setSpout("spout", DummySpout()) - builder.setBolt( - "bolt", - DummyBolt({ - it.declareStream("stream", Fields("num")) - }) - ).noneGrouping("spout") + builder + .setBolt( + "bolt", + DummyBolt({ + it.declareStream("stream", Fields("num")) + }), + ).noneGrouping("spout") - builder.setBolt("bolt2", DummyBolt()) + builder + .setBolt("bolt2", DummyBolt()) .fieldsGrouping("bolt", "stream", Fields("num", "non-existing-field")) KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") } @@ -60,21 +64,24 @@ class TestTopologyValidation { val config: MutableMap = mutableMapOf() builder.setSpout("spout", DummySpout()) - builder.setBolt( - "bolt", - DummyBolt({ - it.declareStream("stream", Fields("num")) - }) - ).noneGrouping("spout") + builder + .setBolt( + "bolt", + DummyBolt({ + it.declareStream("stream", Fields("num")) + }), + ).noneGrouping("spout") - builder.setBolt("bolt2", DummyBolt()) + builder + .setBolt("bolt2", DummyBolt()) .fieldsGrouping("bolt", "stream", Fields("num")) KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") } } open class DummySpout : BaseRichSpout { - private val declare: (declarer: OutputFieldsDeclarer) -> Unit + @Transient + private var declare: (declarer: OutputFieldsDeclarer) -> Unit = {} protected lateinit var collector: SpoutOutputCollector constructor() : this({}) @@ -84,23 +91,33 @@ open class DummySpout : BaseRichSpout { override fun nextTuple() {} - override fun open(conf: MutableMap?, context: TopologyContext?, collector: SpoutOutputCollector?) { + override fun open( + conf: MutableMap?, + context: TopologyContext?, + collector: SpoutOutputCollector?, + ) { this.collector = collector!! } + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { declare(declarer) } } open class DummyBolt : BaseBasicBolt { - private val declare: (declarer: OutputFieldsDeclarer) -> Unit + @Transient + private var declare: (declarer: OutputFieldsDeclarer) -> Unit = {} constructor() : this({}) constructor(declare: (declarer: OutputFieldsDeclarer) -> Unit) : super() { this.declare = declare } - override fun execute(input: Tuple, collector: BasicOutputCollector) {} + override fun execute( + input: Tuple, + collector: BasicOutputCollector, + ) {} + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { declare(declarer) }