From 03de4ff163b2c44859f590cc9c649f9e9b6967f4 Mon Sep 17 00:00:00 2001 From: Re'em Bensimhon Date: Fri, 13 Feb 2026 15:21:13 +0200 Subject: [PATCH] Add native TopologyBuilder parity, validations, and tests - Implement KumulusTopologyBuilder public API parity with Storm TopologyBuilder, including additional setBolt overloads, setStateSpout/addWorkerHook stubs, and IBasicBolt compatibility. - Align native builder validations with Storm behavior (duplicate component IDs, positive parallelism, null worker hook handling). - Fix native builder config serialization/merging to mirror Storm declarers, including topology.kryo.register rejection and JSON conf merge semantics. - Add reflection-based API parity and configuration parity tests for the native builder. - Add duplicate-ID/validation tests and migrate existing tests (except KumulusStormTransformerTest) from Storm TopologyBuilder to KumulusTopologyBuilder. --- README.md | 18 + .../xyro/kumulus/KumulusStormTransformer.kt | 163 +++++--- .../topology/KumulusTopologyBuilder.kt | 362 ++++++++++++++++++ .../kumulus/KumulusStormTransformerTest.kt | 16 +- .../KumulusTopologyBuilderApiParityTest.kt | 47 +++ ...sTopologyBuilderConfigurationParityTest.kt | 139 +++++++ .../kumulus/KumulusTopologyBuilderTest.kt | 75 ++++ .../xyro/kumulus/TestAllowExtraAckingMode.kt | 7 +- .../org/xyro/kumulus/TestAnchoringBehavior.kt | 7 +- .../xyro/kumulus/TestDroppingStaleMessages.kt | 7 +- .../org/xyro/kumulus/TestExecuteException.kt | 13 +- .../TestMultipleSpoutsMaxPendingLimit.kt | 13 +- .../org/xyro/kumulus/TestPrepareException.kt | 13 +- .../org/xyro/kumulus/TestSingleAcking.kt | 7 +- .../org/xyro/kumulus/TestTimeoutSpoutHook.kt | 13 +- .../xyro/kumulus/TestTopologyValidation.kt | 24 +- 16 files changed, 810 insertions(+), 114 deletions(-) create mode 100644 src/main/kotlin/org/xyro/kumulus/topology/KumulusTopologyBuilder.kt create mode 100644 src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderApiParityTest.kt create mode 100644 src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderConfigurationParityTest.kt create mode 100644 src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderTest.kt diff --git a/README.md b/README.md index 3f185b0..3ae143e 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,24 @@ kumulusTopology.prepare(); kumulusTopology.start(true); ``` +Kumulus also provides a native builder with a Storm-like API: + +```kotlin +val builder = org.xyro.kumulus.topology.KumulusTopologyBuilder() + +val config: MutableMap = mutableMapOf() + +builder.setSpout("spout", Spout()) +builder.setBolt("bolt", Bolt()) + .shuffleGrouping("spout") + +val kumulusTopology = KumulusStormTransformer.initializeTopology( + builder.createTopology(), config, "topology_name" +) +kumulusTopology.prepare() +kumulusTopology.start(true) +``` + ## Benchmark Latency histograms produced by passing 100,000 (10% warm-up) tiny tuples into the fairly simple topology defined in KumulusStormTransformerTest: diff --git a/src/main/kotlin/org/xyro/kumulus/KumulusStormTransformer.kt b/src/main/kotlin/org/xyro/kumulus/KumulusStormTransformer.kt index 579f8b7..6223b3e 100644 --- a/src/main/kotlin/org/xyro/kumulus/KumulusStormTransformer.kt +++ b/src/main/kotlin/org/xyro/kumulus/KumulusStormTransformer.kt @@ -5,8 +5,9 @@ import org.apache.storm.Config import org.apache.storm.Constants import org.apache.storm.generated.Bolt import org.apache.storm.generated.ComponentCommon -import org.apache.storm.generated.GlobalStreamId +import org.apache.storm.generated.ComponentObject import org.apache.storm.generated.SpoutSpec +import org.apache.storm.generated.StateSpoutSpec import org.apache.storm.generated.StormTopology import org.apache.storm.metric.api.IMetric import org.apache.storm.task.TopologyContext @@ -23,6 +24,7 @@ import org.apache.storm.utils.Utils import org.xyro.kumulus.component.KumulusBolt import org.xyro.kumulus.component.KumulusComponent import org.xyro.kumulus.component.KumulusSpout +import org.xyro.kumulus.topology.KumulusTopologyDefinition import java.io.Serializable @Suppress("unused") @@ -37,22 +39,65 @@ class KumulusStormTransformer { @Suppress("UNCHECKED_CAST") @JvmStatic fun initializeTopology(topology: StormTopology, rawConfig: MutableMap, stormId: String): KumulusTopology { - val boltField = StormTopology::class.java.getDeclaredField("bolts")!! - boltField.isAccessible = true - val serializedBoltsMap: Map = boltField.get(topology) as Map - val boltsMap: Map = serializedBoltsMap.entries.associate { (id, bolt) -> - val boltObject = bolt._bolt_object!! - id to (Utils.javaDeserialize(boltObject._serialized_java, Serializable::class.java) as IComponent) + val serializedBoltsMap = readBolts(topology) + val serializedSpoutsMap = readSpouts(topology) + + val boltsMap = serializedBoltsMap.entries.associate { (id, bolt) -> + val boltObject = bolt._bolt_object + ?: throw IllegalArgumentException("Bolt '$id' has no serialized bolt object") + id to ( + Utils.javaDeserialize(boltObject._serialized_java, Serializable::class.java) as? IRichBolt + ?: throw IllegalArgumentException("Bolt '$id' is not an IRichBolt") + ) } - val spoutField = StormTopology::class.java.getDeclaredField("spouts")!! - spoutField.isAccessible = true - val serializedSpoutsMap: Map = spoutField.get(topology) as Map - val spoutsMap: Map = serializedSpoutsMap.entries.associate { (id, spout) -> - val spoutObject = spout._spout_object!! - id to (Utils.javaDeserialize(spoutObject._serialized_java, Serializable::class.java) as IComponent) + val spoutsMap = serializedSpoutsMap.entries.associate { (id, spout) -> + val spoutObject = spout._spout_object + ?: throw IllegalArgumentException("Spout '$id' has no serialized spout object") + id to ( + Utils.javaDeserialize(spoutObject._serialized_java, Serializable::class.java) as? IRichSpout + ?: throw IllegalArgumentException("Spout '$id' is not an IRichSpout") + ) } + val componentCommonMap = + serializedSpoutsMap.mapValues { (_, spec) -> spec._common!! } + + serializedBoltsMap.mapValues { (_, bolt) -> bolt._common!! } + + return initializeTopology(topology, spoutsMap, boltsMap, componentCommonMap, rawConfig, stormId) + } + + /** + * Initialize a Kumulus topology from Kumulus native topology definition. + * @param topology the native topology to transform + * @param rawConfig the Storm configuration + * @param stormId the Storm topology ID string + */ + @JvmStatic + fun initializeTopology( + topology: KumulusTopologyDefinition, + rawConfig: MutableMap, + stormId: String + ): KumulusTopology { + val spoutsMap = topology.spouts.mapValues { (_, declaration) -> declaration.spout } + val boltsMap = topology.bolts.mapValues { (_, declaration) -> declaration.bolt } + val componentCommonMap = + topology.spouts.mapValues { (_, declaration) -> declaration.common } + + topology.bolts.mapValues { (_, declaration) -> declaration.common } + + val stormTopology = buildStormTopology(topology) + return initializeTopology(stormTopology, spoutsMap, boltsMap, componentCommonMap, rawConfig, stormId) + } + + @Suppress("LongMethod") + private fun initializeTopology( + topology: StormTopology, + spoutsMap: Map, + boltsMap: Map, + componentCommons: Map, + rawConfig: MutableMap, + stormId: String + ): KumulusTopology { val taskToComponent = mutableMapOf() val componentToSortedTasks = mutableMapOf>() @@ -73,21 +118,12 @@ class KumulusStormTransformer { val executorData: Map = LinkedHashMap() val registeredMetrics: Map>> = LinkedHashMap() - val getter: (String) -> ComponentCommon = { id -> - if (topology._bolts?.containsKey(id)!!) { - topology._bolts[id]!!._common - } else { - topology._spouts[id]!!._common - } - } + val getter: (String) -> ComponentCommon = { id -> componentCommons[id]!! } - val componentMaps = listOf(spoutsMap, boltsMap) + val componentMaps = listOf(spoutsMap as Map, boltsMap as Map) val kComponents: MutableList = mutableListOf() - val kComponentInputs: MutableMap, org.apache.storm.generated.Grouping> = - mutableMapOf() - var id = 1 for (componentMap in componentMaps) { for ((name) in componentMap) { @@ -117,10 +153,6 @@ class KumulusStormTransformer { streamToFields[stream] = Fields(streamInfo?._output_fields) } - componentCommon._inputs?.forEach { - kComponentInputs[name to it.key] = it.value - } - id++ } } @@ -131,21 +163,7 @@ class KumulusStormTransformer { } componentToSortedTasks.forEach { componentId: String, taskIds: List -> - val (componentObjectSerialized, componentCommon) = - when { - topology._spouts!!.containsKey(componentId) -> - topology._spouts[componentId]!! - .let { it._spout_object!! to it._common!! } - topology._bolts!!.containsKey(componentId) -> - topology._bolts[componentId]!! - .let { it._bolt_object!! to it._common!! } - componentId == Constants.SYSTEM_COMPONENT_ID -> - null to null - else -> - throw Exception( - "Component name '$componentId' was not found in underlying topology object" - ) - } + val componentCommon = componentCommons[componentId] taskIds.forEach { taskId -> val componentInstance = @@ -156,11 +174,22 @@ class KumulusStormTransformer { // Declared hard-coded } }) - } else { + } else if (spoutsMap.containsKey(componentId)) { + val component = spoutsMap[componentId]!! + Utils.javaDeserialize( + Utils.javaSerialize(component as Serializable), + Serializable::class.java + ) + } else if (boltsMap.containsKey(componentId)) { + val component = boltsMap[componentId]!! Utils.javaDeserialize( - componentObjectSerialized!!._serialized_java, + Utils.javaSerialize(component as Serializable), Serializable::class.java ) + } else { + throw Exception( + "Component name '$componentId' was not found in underlying topology object" + ) } val context = TopologyContext( @@ -213,6 +242,50 @@ class KumulusStormTransformer { return KumulusTopology(kComponents, config) } + @Suppress("UNCHECKED_CAST") + private fun readSpouts(topology: StormTopology): Map { + topology._spouts?.let { return it } + val spoutField = StormTopology::class.java.getDeclaredField("spouts")!! + spoutField.isAccessible = true + return spoutField.get(topology) as Map + } + + @Suppress("UNCHECKED_CAST") + private fun readBolts(topology: StormTopology): Map { + topology._bolts?.let { return it } + val boltField = StormTopology::class.java.getDeclaredField("bolts")!! + boltField.isAccessible = true + return boltField.get(topology) as Map + } + + private fun buildStormTopology(topology: KumulusTopologyDefinition): StormTopology { + val spouts = topology.spouts.mapValues { (_, declaration) -> + SpoutSpec().apply { + _spout_object = serializeComponentObject(declaration.spout as Serializable) + _common = declaration.common.deepCopy() + } + }.toMutableMap() + + val bolts = topology.bolts.mapValues { (_, declaration) -> + Bolt().apply { + _bolt_object = serializeComponentObject(declaration.bolt as Serializable) + _common = declaration.common.deepCopy() + } + }.toMutableMap() + + return StormTopology().apply { + set_spouts(spouts) + set_bolts(bolts) + set_state_spouts(mutableMapOf()) + } + } + + private fun serializeComponentObject(component: Serializable): ComponentObject { + return ComponentObject().apply { + set_serialized_java(Utils.javaSerialize(component)) + } + } + private fun validateTopology(components: MutableList) { components.forEach { src -> (src as? KumulusBolt)?.apply { diff --git a/src/main/kotlin/org/xyro/kumulus/topology/KumulusTopologyBuilder.kt b/src/main/kotlin/org/xyro/kumulus/topology/KumulusTopologyBuilder.kt new file mode 100644 index 0000000..ca3bf59 --- /dev/null +++ b/src/main/kotlin/org/xyro/kumulus/topology/KumulusTopologyBuilder.kt @@ -0,0 +1,362 @@ +package org.xyro.kumulus.topology + +import org.apache.storm.Config +import org.apache.storm.generated.ComponentCommon +import org.apache.storm.generated.GlobalStreamId +import org.apache.storm.generated.Grouping +import org.apache.storm.generated.NullStruct +import org.apache.storm.generated.StreamInfo +import org.apache.storm.grouping.CustomStreamGrouping +import org.apache.storm.hooks.IWorkerHook +import org.apache.storm.shade.org.json.simple.JSONValue +import org.apache.storm.state.State +import org.apache.storm.topology.BasicBoltExecutor +import org.apache.storm.topology.IBasicBolt +import org.apache.storm.topology.IRichBolt +import org.apache.storm.topology.IRichSpout +import org.apache.storm.topology.IRichStateSpout +import org.apache.storm.topology.IStatefulBolt +import org.apache.storm.topology.IStatefulWindowedBolt +import org.apache.storm.topology.IWindowedBolt +import org.apache.storm.topology.OutputFieldsDeclarer +import org.apache.storm.topology.StatefulBoltExecutor +import org.apache.storm.topology.StatefulWindowedBoltExecutor +import org.apache.storm.topology.WindowedBoltExecutor +import org.apache.storm.tuple.Fields +import org.apache.storm.utils.Utils +import java.io.Serializable +import java.util.LinkedHashMap + +@Suppress("unused", "MemberVisibilityCanBePrivate") +class KumulusTopologyBuilder { + private val spouts: LinkedHashMap = LinkedHashMap() + private val bolts: LinkedHashMap = LinkedHashMap() + private val stateSpouts: LinkedHashMap = LinkedHashMap() + + fun setSpout(id: String, spout: IRichSpout): KumulusSpoutDeclarer { + validateUnusedId(id) + return registerSpout(id, spout, null) + } + + fun setSpout(id: String, spout: IRichSpout, parallelismHint: Number): KumulusSpoutDeclarer { + validateUnusedId(id) + return registerSpout(id, spout, parallelismHint) + } + + private fun registerSpout(id: String, spout: IRichSpout, parallelismHint: Number?): KumulusSpoutDeclarer { + val common = ComponentCommon() + common.set_inputs(mutableMapOf()) + initCommon(common, spout, parallelismHint) + common.set_streams(declareStreams(spout)) + + spouts[id] = KumulusDeclaredSpout(id, spout, common) + return KumulusSpoutDeclarer(spouts[id]!!) + } + + fun setBolt(id: String, bolt: IRichBolt): KumulusBoltDeclarer { + return setBoltInternal(id, bolt, null) + } + + private fun setBoltInternal(id: String, bolt: IRichBolt, parallelismHint: Number?): KumulusBoltDeclarer { + validateUnusedId(id) + return registerBolt(id, bolt, parallelismHint) + } + + fun setBolt(id: String, bolt: IWindowedBolt): KumulusBoltDeclarer { + return setBoltInternal(id, WindowedBoltExecutor(bolt), null) + } + + fun setBolt(id: String, bolt: IWindowedBolt, parallelismHint: Number): KumulusBoltDeclarer { + return setBolt(id, WindowedBoltExecutor(bolt), parallelismHint) + } + + fun setBolt(id: String, bolt: IStatefulBolt): KumulusBoltDeclarer { + return setBoltInternal(id, StatefulBoltExecutor(bolt), null) + } + + fun setBolt(id: String, bolt: IStatefulBolt, parallelismHint: Number): KumulusBoltDeclarer { + return setBolt(id, StatefulBoltExecutor(bolt), parallelismHint) + } + + fun setBolt(id: String, bolt: IStatefulWindowedBolt): KumulusBoltDeclarer { + return setBoltInternal(id, StatefulWindowedBoltExecutor(bolt), null) + } + + fun setBolt( + id: String, + bolt: IStatefulWindowedBolt, + parallelismHint: Number + ): KumulusBoltDeclarer { + return setBolt(id, StatefulWindowedBoltExecutor(bolt), parallelismHint) + } + + fun setBolt(id: String, bolt: IBasicBolt): KumulusBoltDeclarer { + return setBoltInternal(id, BasicBoltExecutor(bolt), null) + } + + fun setBolt(id: String, bolt: IBasicBolt, parallelismHint: Number): KumulusBoltDeclarer { + return setBolt(id, BasicBoltExecutor(bolt), parallelismHint) + } + + fun setBolt(id: String, bolt: IRichBolt, parallelismHint: Number): KumulusBoltDeclarer { + validateUnusedId(id) + return registerBolt(id, bolt, parallelismHint) + } + + private fun registerBolt(id: String, bolt: IRichBolt, parallelismHint: Number?): KumulusBoltDeclarer { + val common = ComponentCommon() + common.set_inputs(mutableMapOf()) + initCommon(common, bolt, parallelismHint) + common.set_streams(declareStreams(bolt)) + + bolts[id] = KumulusDeclaredBolt(id, bolt, common) + return KumulusBoltDeclarer(bolts[id]!!) + } + + fun createTopology(): KumulusTopologyDefinition { + return KumulusTopologyDefinition(spouts.toMap(), bolts.toMap()) + } + + fun setStateSpout(id: String, spout: IRichStateSpout) { + setStateSpout(id, spout, 1) + } + + fun setStateSpout(id: String, spout: IRichStateSpout, parallelismHint: Number) { + validateUnusedId(id) + stateSpouts[id] = spout + throw UnsupportedOperationException("State spouts are not supported in KumulusTopologyBuilder") + } + + fun addWorkerHook(hook: IWorkerHook?) { + if (hook == null) { + throw IllegalArgumentException("WorkerHook must not be null.") + } + // Kumulus runs in-process and does not have worker-level hooks. + } + + private fun initCommon(common: ComponentCommon, component: Any, parallelismHint: Number?) { + if (parallelismHint != null) { + val parallelism = parallelismHint.toInt() + if (parallelism < 1) { + throw IllegalArgumentException("Parallelism must be positive.") + } + common.set_parallelism_hint(parallelism) + } + + val config = when (component) { + is IRichBolt -> component.componentConfiguration + is IRichSpout -> component.componentConfiguration + else -> null + } + if (config != null) { + common.set_json_conf(JSONValue.toJSONString(config)) + } + } + + private fun validateUnusedId(id: String) { + if (bolts.containsKey(id)) { + throw IllegalArgumentException("Bolt has already been declared for id$id") + } + if (spouts.containsKey(id)) { + throw IllegalArgumentException("Spout has already been declared for id$id") + } + if (stateSpouts.containsKey(id)) { + throw IllegalArgumentException("State spout has already been declared for id$id") + } + } + + private fun declareStreams(component: Any): MutableMap { + val declarer = KumulusOutputFieldsCollector() + + when (component) { + is IRichBolt -> component.declareOutputFields(declarer) + is IRichSpout -> component.declareOutputFields(declarer) + else -> throw IllegalArgumentException("Component type ${component::class.qualifiedName} is unsupported") + } + + return declarer.streams + } +} + +data class KumulusTopologyDefinition( + val spouts: Map, + val bolts: Map +) + +data class KumulusDeclaredSpout( + val id: String, + val spout: IRichSpout, + val common: ComponentCommon +) + +data class KumulusDeclaredBolt( + val id: String, + val bolt: IRichBolt, + val common: ComponentCommon +) + +class KumulusSpoutDeclarer internal constructor( + private val registration: KumulusDeclaredSpout +) { + fun addConfiguration(configKey: String, configValue: Any?): KumulusSpoutDeclarer { + return addConfigurations(mapOf(configKey to configValue)) + } + + fun addConfigurations(conf: Map): KumulusSpoutDeclarer { + if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { + throw IllegalArgumentException("Cannot set serializations for a component using fluent API") + } + val merged = mergeWithExistingJsonConfig(registration.common, conf) + registration.common.set_json_conf(JSONValue.toJSONString(merged)) + return this + } + + fun setNumTasks(numTasks: Int): KumulusSpoutDeclarer { + registration.common.set_parallelism_hint(numTasks.coerceAtLeast(1)) + return this + } +} + +class KumulusBoltDeclarer internal constructor( + private val registration: KumulusDeclaredBolt +) { + fun fieldsGrouping(componentId: String, fields: Fields): KumulusBoltDeclarer { + return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields) + } + + fun fieldsGrouping(componentId: String, streamId: String, fields: Fields): KumulusBoltDeclarer { + return grouping( + componentId, + streamId, + Grouping().apply { set_fields(fields.toList()) } + ) + } + + fun shuffleGrouping(componentId: String): KumulusBoltDeclarer { + return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID) + } + + fun shuffleGrouping(componentId: String, streamId: String): KumulusBoltDeclarer { + return grouping(componentId, streamId, Grouping().apply { set_shuffle(NullStruct()) }) + } + + fun localOrShuffleGrouping(componentId: String): KumulusBoltDeclarer { + return localOrShuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID) + } + + fun localOrShuffleGrouping(componentId: String, streamId: String): KumulusBoltDeclarer { + return grouping(componentId, streamId, Grouping().apply { set_local_or_shuffle(NullStruct()) }) + } + + fun noneGrouping(componentId: String): KumulusBoltDeclarer { + return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID) + } + + fun noneGrouping(componentId: String, streamId: String): KumulusBoltDeclarer { + return grouping(componentId, streamId, Grouping().apply { set_none(NullStruct()) }) + } + + fun allGrouping(componentId: String): KumulusBoltDeclarer { + return allGrouping(componentId, Utils.DEFAULT_STREAM_ID) + } + + fun allGrouping(componentId: String, streamId: String): KumulusBoltDeclarer { + return grouping(componentId, streamId, Grouping().apply { set_all(NullStruct()) }) + } + + fun directGrouping(componentId: String): KumulusBoltDeclarer { + return directGrouping(componentId, Utils.DEFAULT_STREAM_ID) + } + + fun directGrouping(componentId: String, streamId: String): KumulusBoltDeclarer { + return grouping(componentId, streamId, Grouping().apply { set_direct(NullStruct()) }) + } + + fun customGrouping(componentId: String, grouping: CustomStreamGrouping): KumulusBoltDeclarer { + return customGrouping(componentId, Utils.DEFAULT_STREAM_ID, grouping) + } + + fun customGrouping( + componentId: String, + streamId: String, + grouping: CustomStreamGrouping + ): KumulusBoltDeclarer { + if (grouping !is Serializable) { + throw IllegalArgumentException("Custom grouping must be serializable for Kumulus") + } + return this.grouping( + componentId, + streamId, + Grouping().apply { set_custom_serialized(Utils.javaSerialize(grouping)) } + ) + } + + fun grouping(id: GlobalStreamId, grouping: Grouping): KumulusBoltDeclarer { + registration.common.put_to_inputs(id, grouping) + return this + } + + fun grouping(componentId: String, streamId: String, grouping: Grouping): KumulusBoltDeclarer { + return grouping(GlobalStreamId(componentId, streamId), grouping) + } + + fun addConfiguration(configKey: String, configValue: Any?): KumulusBoltDeclarer { + return addConfigurations(mapOf(configKey to configValue)) + } + + fun addConfigurations(conf: Map): KumulusBoltDeclarer { + if (conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { + throw IllegalArgumentException("Cannot set serializations for a component using fluent API") + } + val merged = mergeWithExistingJsonConfig(registration.common, conf) + registration.common.set_json_conf(JSONValue.toJSONString(merged)) + return this + } + + fun setNumTasks(numTasks: Int): KumulusBoltDeclarer { + registration.common.set_parallelism_hint(numTasks.coerceAtLeast(1)) + return this + } +} + +private class KumulusOutputFieldsCollector : OutputFieldsDeclarer { + val streams: MutableMap = mutableMapOf() + + override fun declare(fields: Fields) { + declare(false, fields) + } + + override fun declare(direct: Boolean, fields: Fields) { + declareStream(Utils.DEFAULT_STREAM_ID, direct, fields) + } + + override fun declareStream(streamId: String, fields: Fields) { + declareStream(streamId, false, fields) + } + + override fun declareStream(streamId: String, direct: Boolean, fields: Fields) { + streams[streamId] = StreamInfo().apply { + set_output_fields(fields.toList()) + set_direct(direct) + } + } +} + +private fun mergeWithExistingJsonConfig(common: ComponentCommon, update: Map): MutableMap { + val existing = parseJsonObject(common._json_conf) + val merged = HashMap(existing) + merged.putAll(update) + return merged +} + +@Suppress("UNCHECKED_CAST") +private fun parseJsonObject(json: String?): MutableMap { + if (json == null) { + return mutableMapOf() + } + return try { + (JSONValue.parseWithException(json) as Map).toMutableMap() + } catch (e: Exception) { + throw RuntimeException(e) + } +} diff --git a/src/test/kotlin/org/xyro/kumulus/KumulusStormTransformerTest.kt b/src/test/kotlin/org/xyro/kumulus/KumulusStormTransformerTest.kt index dd5022d..eb85f19 100644 --- a/src/test/kotlin/org/xyro/kumulus/KumulusStormTransformerTest.kt +++ b/src/test/kotlin/org/xyro/kumulus/KumulusStormTransformerTest.kt @@ -1,3 +1,5 @@ +package org.xyro.kumulus + import mu.KotlinLogging import org.HdrHistogram.Histogram import org.apache.storm.Config @@ -11,14 +13,14 @@ import org.apache.storm.topology.BasicOutputCollector import org.apache.storm.topology.FailedException import org.apache.storm.topology.IRichBolt import org.apache.storm.topology.OutputFieldsDeclarer +import org.apache.storm.topology.TopologyBuilder import org.apache.storm.topology.base.BaseBasicBolt import org.apache.storm.topology.base.BaseRichSpout import org.apache.storm.tuple.Fields import org.apache.storm.tuple.Tuple +import org.junit.Assert.assertEquals import org.junit.Ignore import org.junit.Test -import org.xyro.kumulus.KumulusStormTransformer -import org.xyro.kumulus.KumulusTopology import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -39,7 +41,9 @@ class KumulusStormTransformerTest { @Test fun test1() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = TopologyBuilder() + + assertEquals("org.apache.storm.topology.TopologyBuilder", builder.javaClass.name) val config: MutableMap = mutableMapOf() @@ -110,7 +114,7 @@ class KumulusStormTransformerTest { config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = maxPending config[Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS] = 1 - config[org.xyro.kumulus.KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 1 + config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 1 val topology = builder.createTopology()!! @@ -169,7 +173,7 @@ class KumulusStormTransformerTest { @Test @Ignore fun test2() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = TopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1 @@ -221,7 +225,7 @@ class KumulusStormTransformerTest { @Test @Ignore fun test3() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = TopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1 diff --git a/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderApiParityTest.kt b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderApiParityTest.kt new file mode 100644 index 0000000..5000df6 --- /dev/null +++ b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderApiParityTest.kt @@ -0,0 +1,47 @@ +package org.xyro.kumulus + +import org.apache.storm.topology.TopologyBuilder +import org.junit.Assert.fail +import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder +import java.lang.reflect.Modifier + +class KumulusTopologyBuilderApiParityTest { + @Test + fun testPublicApiMatchesStormTopologyBuilder() { + val stormApi = publicInstanceMethodSignatures(TopologyBuilder::class.java) + val nativeApi = publicInstanceMethodSignatures(KumulusTopologyBuilder::class.java) + + val missingFromNative = stormApi - nativeApi + val extraInNative = nativeApi - stormApi + + if (missingFromNative.isNotEmpty() || extraInNative.isNotEmpty()) { + val message = buildString { + appendLine("KumulusTopologyBuilder API mismatch against Storm TopologyBuilder.") + appendLine("Missing in native builder:") + appendLine(formatSignatureList(missingFromNative)) + appendLine("Extra in native builder:") + appendLine(formatSignatureList(extraInNative)) + } + fail(message) + } + } + + private fun publicInstanceMethodSignatures(clazz: Class<*>): Set { + return clazz.declaredMethods + .filter { Modifier.isPublic(it.modifiers) && !Modifier.isStatic(it.modifiers) } + .filterNot { it.isSynthetic || it.isBridge } + .map { method -> + val parameterTypes = method.parameterTypes.joinToString(",") { it.name } + "${method.name}($parameterTypes)" + } + .toSet() + } + + private fun formatSignatureList(signatures: Set): String { + if (signatures.isEmpty()) { + return " (none)" + } + return signatures.sorted().joinToString(separator = "\n") { " $it" } + } +} diff --git a/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderConfigurationParityTest.kt b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderConfigurationParityTest.kt new file mode 100644 index 0000000..8a4dc66 --- /dev/null +++ b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderConfigurationParityTest.kt @@ -0,0 +1,139 @@ +package org.xyro.kumulus + +import org.apache.storm.Config +import org.apache.storm.shade.org.json.simple.JSONValue +import org.apache.storm.spout.SpoutOutputCollector +import org.apache.storm.task.OutputCollector +import org.apache.storm.task.TopologyContext +import org.apache.storm.topology.IRichBolt +import org.apache.storm.topology.IRichSpout +import org.apache.storm.topology.OutputFieldsDeclarer +import org.apache.storm.topology.TopologyBuilder +import org.apache.storm.tuple.Fields +import org.apache.storm.tuple.Tuple +import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class KumulusTopologyBuilderConfigurationParityTest { + @Test + fun testSpoutAddConfigurationParityWithStorm() { + val componentConfig = mutableMapOf("existing" to "value", "n" to 1) + + val stormBuilder = TopologyBuilder() + val stormDeclarer = stormBuilder.setSpout("spout", ConfigurableSpout(componentConfig)) + stormDeclarer.addConfiguration("k1", "v1") + stormDeclarer.addConfiguration("k2", 2) + stormDeclarer.addConfigurations(mapOf("k3" to true, "k1" to "override")) + val stormJson = stormBuilder.createTopology()._spouts["spout"]!!._common._json_conf + + val nativeBuilder = KumulusTopologyBuilder() + val nativeDeclarer = nativeBuilder.setSpout("spout", ConfigurableSpout(componentConfig)) + nativeDeclarer.addConfiguration("k1", "v1") + nativeDeclarer.addConfiguration("k2", 2) + nativeDeclarer.addConfigurations(mapOf("k3" to true, "k1" to "override")) + val nativeJson = nativeBuilder.createTopology().spouts["spout"]!!.common._json_conf + + assertEquals(parseJson(stormJson), parseJson(nativeJson)) + } + + @Test + fun testBoltAddConfigurationParityWithStorm() { + val componentConfig = mutableMapOf("existing" to "value", "n" to 1) + + val stormBuilder = TopologyBuilder() + val stormDeclarer = stormBuilder.setBolt("bolt", ConfigurableBolt(componentConfig)) + stormDeclarer.addConfiguration("k1", "v1") + stormDeclarer.addConfiguration("k2", 2) + stormDeclarer.addConfigurations(mapOf("k3" to true, "k1" to "override")) + val stormJson = stormBuilder.createTopology()._bolts["bolt"]!!._common._json_conf + + val nativeBuilder = KumulusTopologyBuilder() + val nativeDeclarer = nativeBuilder.setBolt("bolt", ConfigurableBolt(componentConfig)) + nativeDeclarer.addConfiguration("k1", "v1") + nativeDeclarer.addConfiguration("k2", 2) + nativeDeclarer.addConfigurations(mapOf("k3" to true, "k1" to "override")) + val nativeJson = nativeBuilder.createTopology().bolts["bolt"]!!.common._json_conf + + assertEquals(parseJson(stormJson), parseJson(nativeJson)) + } + + @Test + fun testSpoutRejectsKryoRegisterLikeStorm() { + val stormBuilder = TopologyBuilder() + val stormDeclarer = stormBuilder.setSpout("spout", ConfigurableSpout(mutableMapOf())) + assertFailsWith { + stormDeclarer.addConfigurations(mapOf(Config.TOPOLOGY_KRYO_REGISTER to "x")) + } + + val nativeBuilder = KumulusTopologyBuilder() + val nativeDeclarer = nativeBuilder.setSpout("spout", ConfigurableSpout(mutableMapOf())) + assertFailsWith { + nativeDeclarer.addConfigurations(mapOf(Config.TOPOLOGY_KRYO_REGISTER to "x")) + } + } + + @Test + fun testBoltRejectsKryoRegisterLikeStorm() { + val stormBuilder = TopologyBuilder() + val stormDeclarer = stormBuilder.setBolt("bolt", ConfigurableBolt(mutableMapOf())) + assertFailsWith { + stormDeclarer.addConfigurations(mapOf(Config.TOPOLOGY_KRYO_REGISTER to "x")) + } + + val nativeBuilder = KumulusTopologyBuilder() + val nativeDeclarer = nativeBuilder.setBolt("bolt", ConfigurableBolt(mutableMapOf())) + assertFailsWith { + nativeDeclarer.addConfigurations(mapOf(Config.TOPOLOGY_KRYO_REGISTER to "x")) + } + } + + @Suppress("UNCHECKED_CAST") + private fun parseJson(json: String?): Map { + if (json == null) { + return emptyMap() + } + return JSONValue.parseWithException(json) as Map + } + + private class ConfigurableSpout( + private val componentConfig: MutableMap + ) : IRichSpout { + override fun open( + conf: MutableMap?, + context: TopologyContext?, + collector: SpoutOutputCollector? + ) = Unit + + override fun close() = Unit + override fun activate() = Unit + override fun deactivate() = Unit + override fun nextTuple() = Unit + override fun ack(msgId: Any?) = Unit + override fun fail(msgId: Any?) = Unit + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { + declarer.declare(Fields("f")) + } + + override fun getComponentConfiguration(): MutableMap = componentConfig.toMutableMap() + } + + private class ConfigurableBolt( + private val componentConfig: MutableMap + ) : IRichBolt { + override fun prepare( + stormConf: MutableMap?, + context: TopologyContext?, + collector: OutputCollector? + ) = Unit + + override fun execute(input: Tuple) = Unit + override fun cleanup() = Unit + override fun declareOutputFields(declarer: OutputFieldsDeclarer) { + declarer.declare(Fields("f")) + } + + override fun getComponentConfiguration(): MutableMap = componentConfig.toMutableMap() + } +} diff --git a/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderTest.kt b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderTest.kt new file mode 100644 index 0000000..e8e4f63 --- /dev/null +++ b/src/test/kotlin/org/xyro/kumulus/KumulusTopologyBuilderTest.kt @@ -0,0 +1,75 @@ +package org.xyro.kumulus + +import org.apache.storm.tuple.Fields +import org.junit.Test +import org.xyro.kumulus.KumulusStormTransformer.KumulusTopologyValidationException +import org.xyro.kumulus.topology.KumulusTopologyBuilder + +class KumulusTopologyBuilderTest { + @Test + fun testNativeTopologyBuilderHappyPath() { + val builder = KumulusTopologyBuilder() + val config: MutableMap = mutableMapOf() + + builder.setSpout("spout", DummySpout()) + builder.setBolt( + "bolt", + DummyBolt { + it.declareStream("stream", Fields("num")) + } + ).noneGrouping("spout") + builder.setBolt("bolt2", DummyBolt()) + .fieldsGrouping("bolt", "stream", Fields("num")) + + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") + } + + @Test(expected = KumulusTopologyValidationException::class) + fun testNativeTopologyBuilderValidation() { + val builder = KumulusTopologyBuilder() + val config: MutableMap = mutableMapOf() + + builder.setSpout("spout", DummySpout()) + builder.setBolt("bolt", DummyBolt()) + .noneGrouping("missing-bolt") + + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") + } + + @Test(expected = IllegalArgumentException::class) + fun testNativeTopologyBuilderRejectsDuplicateComponentId() { + val builder = KumulusTopologyBuilder() + + builder.setSpout("component", DummySpout()) + builder.setBolt("component", DummyBolt()) + } + + @Test(expected = IllegalArgumentException::class) + fun testNativeTopologyBuilderRejectsDuplicateComponentIdReverseOrder() { + val builder = KumulusTopologyBuilder() + + builder.setBolt("component", DummyBolt()) + builder.setSpout("component", DummySpout()) + } + + @Test(expected = IllegalArgumentException::class) + fun testNativeTopologyBuilderRejectsNonPositiveSpoutParallelism() { + val builder = KumulusTopologyBuilder() + + builder.setSpout("spout", DummySpout(), 0) + } + + @Test(expected = IllegalArgumentException::class) + fun testNativeTopologyBuilderRejectsNonPositiveBoltParallelism() { + val builder = KumulusTopologyBuilder() + + builder.setBolt("bolt", DummyBolt(), 0) + } + + @Test(expected = IllegalArgumentException::class) + fun testNativeTopologyBuilderRejectsNullWorkerHook() { + val builder = KumulusTopologyBuilder() + + builder.addWorkerHook(null) + } +} diff --git a/src/test/kotlin/org/xyro/kumulus/TestAllowExtraAckingMode.kt b/src/test/kotlin/org/xyro/kumulus/TestAllowExtraAckingMode.kt index 18404f6..d82d9ea 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestAllowExtraAckingMode.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestAllowExtraAckingMode.kt @@ -9,6 +9,7 @@ import org.apache.storm.topology.OutputFieldsDeclarer import org.apache.storm.tuple.Fields import org.apache.storm.tuple.Tuple import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder import java.util.UUID import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger @@ -17,7 +18,7 @@ import kotlin.test.assertTrue class TestAllowExtraAckingMode { @Test fun testMultipleSpouts() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = MAX_SPOUT_PENDING @@ -38,10 +39,8 @@ class TestAllowExtraAckingMode { .allGrouping("spout") .allGrouping("spout2") .allGrouping("spout3") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(10, TimeUnit.SECONDS) kumulusTopology.start() Thread.sleep(5000) diff --git a/src/test/kotlin/org/xyro/kumulus/TestAnchoringBehavior.kt b/src/test/kotlin/org/xyro/kumulus/TestAnchoringBehavior.kt index 8a26746..ab780d4 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestAnchoringBehavior.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestAnchoringBehavior.kt @@ -11,6 +11,7 @@ import org.apache.storm.topology.OutputFieldsDeclarer import org.apache.storm.tuple.Fields import org.apache.storm.tuple.Tuple import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong import kotlin.test.assertTrue @@ -18,7 +19,7 @@ import kotlin.test.assertTrue class TestAnchoringBehavior { @Test fun testLatentUnanchoredBolt() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L @@ -35,10 +36,8 @@ class TestAnchoringBehavior { builder.setBolt("unanchored-bolt-2", DummyBolt()) .noneGrouping("spout") .noneGrouping("delay-unanchored-bolt") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(10, TimeUnit.SECONDS) kumulusTopology.start(block = false) diff --git a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt index 9fb4fa9..f089fcb 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt @@ -11,6 +11,7 @@ import org.apache.storm.topology.OutputFieldsDeclarer import org.apache.storm.tuple.Fields import org.apache.storm.tuple.Tuple import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong import kotlin.test.assertTrue @@ -18,7 +19,7 @@ import kotlin.test.assertTrue class TestDroppingStaleMessages { @Test fun testLateBolt() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L @@ -33,10 +34,8 @@ class TestDroppingStaleMessages { builder.setBolt("delay-unanchored-bolt", StuckBolt()) .noneGrouping("unanchoring-bolt") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") var lateHookCalled = false diff --git a/src/test/kotlin/org/xyro/kumulus/TestExecuteException.kt b/src/test/kotlin/org/xyro/kumulus/TestExecuteException.kt index 9622023..d095134 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestExecuteException.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestExecuteException.kt @@ -8,12 +8,13 @@ import org.apache.storm.topology.OutputFieldsDeclarer import org.apache.storm.tuple.Fields import org.apache.storm.tuple.Tuple import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder import java.util.concurrent.TimeUnit class TestExecuteException { @Test(expected = KumulusTopology.KumulusTopologyCrashedException::class, timeout = 5000) fun testBoltExecuteException() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L @@ -23,17 +24,15 @@ class TestExecuteException { builder.setSpout("spout", TestSpout()) builder.setBolt("execute-exception-bolt", TestExecuteExceptionBolt()) .noneGrouping("spout") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(2, TimeUnit.SECONDS) kumulusTopology.start(true) } @Test(expected = KumulusTopology.KumulusTopologyCrashedException::class, timeout = 5000) fun testSpoutNextTupleException() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L @@ -41,10 +40,8 @@ class TestExecuteException { config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L builder.setSpout("spout", TesExceptiontSpout()) - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(2, TimeUnit.SECONDS) kumulusTopology.start(true) } diff --git a/src/test/kotlin/org/xyro/kumulus/TestMultipleSpoutsMaxPendingLimit.kt b/src/test/kotlin/org/xyro/kumulus/TestMultipleSpoutsMaxPendingLimit.kt index 42f9c39..31df1bb 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestMultipleSpoutsMaxPendingLimit.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestMultipleSpoutsMaxPendingLimit.kt @@ -11,6 +11,7 @@ import org.apache.storm.topology.base.BaseBasicBolt import org.apache.storm.tuple.Fields import org.apache.storm.tuple.Tuple import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder import java.util.UUID import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger @@ -19,7 +20,7 @@ import kotlin.test.assertTrue class TestMultipleSpoutsMaxPendingLimit { @Test fun testMultipleSpouts() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L @@ -34,10 +35,8 @@ class TestMultipleSpoutsMaxPendingLimit { .allGrouping("spout") .allGrouping("spout2") .allGrouping("spout3") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(10, TimeUnit.SECONDS) kumulusTopology.start() Thread.sleep(5000) @@ -52,7 +51,7 @@ class TestMultipleSpoutsMaxPendingLimit { @Test fun testMaxSpoutPending() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L @@ -63,10 +62,8 @@ class TestMultipleSpoutsMaxPendingLimit { builder.setBolt("sleeping-bolt", SleepingBolt(), 4) .shuffleGrouping("spout") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(10, TimeUnit.SECONDS) kumulusTopology.start() diff --git a/src/test/kotlin/org/xyro/kumulus/TestPrepareException.kt b/src/test/kotlin/org/xyro/kumulus/TestPrepareException.kt index fbaa459..e44d590 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestPrepareException.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestPrepareException.kt @@ -8,13 +8,14 @@ import org.apache.storm.topology.OutputFieldsDeclarer import org.apache.storm.tuple.Fields import org.apache.storm.tuple.Tuple import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException class TestPrepareException { @Test(expected = KumulusTopology.KumulusTopologyCrashedException::class, timeout = 5000) fun testPrepareException() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L @@ -24,16 +25,14 @@ class TestPrepareException { builder.setSpout("spout", TestSpout()) builder.setBolt("prepare-exception-bolt", TestBolt(0)) .noneGrouping("spout") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(2, TimeUnit.SECONDS) } @Test(expected = TimeoutException::class, timeout = 10_000) fun testLongPrepare() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L @@ -43,10 +42,8 @@ class TestPrepareException { builder.setSpout("spout", TestSpout()) builder.setBolt("prepare-exception-bolt", TestBolt(30)) .noneGrouping("spout") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(2, TimeUnit.SECONDS) } diff --git a/src/test/kotlin/org/xyro/kumulus/TestSingleAcking.kt b/src/test/kotlin/org/xyro/kumulus/TestSingleAcking.kt index c3bd622..7ce5e5b 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestSingleAcking.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestSingleAcking.kt @@ -10,12 +10,13 @@ import org.apache.storm.topology.base.BaseBasicBolt import org.apache.storm.tuple.Fields import org.apache.storm.tuple.Tuple import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder import java.util.concurrent.TimeUnit class TestSingleAcking { @Test fun testSingleAckPerTreeTopology() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 5L @@ -27,10 +28,8 @@ class TestSingleAcking { .noneGrouping("spout") builder.setBolt("bolt2", TestBolt()) .noneGrouping("spout") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(10, TimeUnit.SECONDS) kumulusTopology.start(false) Thread.sleep(5000) diff --git a/src/test/kotlin/org/xyro/kumulus/TestTimeoutSpoutHook.kt b/src/test/kotlin/org/xyro/kumulus/TestTimeoutSpoutHook.kt index 27327e9..eb596c4 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestTimeoutSpoutHook.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestTimeoutSpoutHook.kt @@ -14,6 +14,7 @@ import org.apache.storm.tuple.Tuple import org.junit.Before import org.junit.Test import org.xyro.kumulus.component.KumulusTimeoutNotificationSpout +import org.xyro.kumulus.topology.KumulusTopologyBuilder import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean @@ -32,7 +33,7 @@ class TestTimeoutSpoutHook { @Test fun testTopologyTimeout() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L @@ -44,10 +45,8 @@ class TestTimeoutSpoutHook { .noneGrouping("spout") builder.setBolt("acking-bolt", TestBolt(false)) .noneGrouping("spout") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(10, TimeUnit.SECONDS) kumulusTopology.start(block = false) done.await() @@ -63,7 +62,7 @@ class TestTimeoutSpoutHook { @Test fun testTopologyFailure() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L @@ -75,10 +74,8 @@ class TestTimeoutSpoutHook { .noneGrouping("spout") builder.setBolt("acking-bolt", TestBolt(false)) .noneGrouping("spout") - - val stormTopology = builder.createTopology()!! val kumulusTopology = - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") kumulusTopology.prepare(10, TimeUnit.SECONDS) kumulusTopology.start(block = false) done.await() diff --git a/src/test/kotlin/org/xyro/kumulus/TestTopologyValidation.kt b/src/test/kotlin/org/xyro/kumulus/TestTopologyValidation.kt index 923c37b..92a6762 100644 --- a/src/test/kotlin/org/xyro/kumulus/TestTopologyValidation.kt +++ b/src/test/kotlin/org/xyro/kumulus/TestTopologyValidation.kt @@ -10,22 +10,22 @@ import org.apache.storm.tuple.Fields import org.apache.storm.tuple.Tuple import org.junit.Test import org.xyro.kumulus.KumulusStormTransformer.KumulusTopologyValidationException +import org.xyro.kumulus.topology.KumulusTopologyBuilder class TestTopologyValidation { @Test(expected = KumulusTopologyValidationException::class) fun testMissingTargetBolt() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() builder.setSpout("spout", DummySpout()) builder.setBolt("bolt", DummyBolt()) .noneGrouping("missing-bolt") - val stormTopology = builder.createTopology()!! - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") } @Test(expected = KumulusTopologyValidationException::class) fun testMissingTargetStream() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() builder.setSpout("spout", DummySpout()) @@ -33,14 +33,12 @@ class TestTopologyValidation { builder.setBolt("bolt2", DummyBolt()) .noneGrouping("bolt", "missing-stream") - - val stormTopology = builder.createTopology()!! - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") } @Test(expected = KumulusTopologyValidationException::class) fun testMissingTargetField() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() builder.setSpout("spout", DummySpout()) @@ -53,14 +51,12 @@ class TestTopologyValidation { builder.setBolt("bolt2", DummyBolt()) .fieldsGrouping("bolt", "stream", Fields("num", "non-existing-field")) - - val stormTopology = builder.createTopology()!! - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") } @Test fun testOkay() { - val builder = org.apache.storm.topology.TopologyBuilder() + val builder = KumulusTopologyBuilder() val config: MutableMap = mutableMapOf() builder.setSpout("spout", DummySpout()) @@ -73,9 +69,7 @@ class TestTopologyValidation { builder.setBolt("bolt2", DummyBolt()) .fieldsGrouping("bolt", "stream", Fields("num")) - - val stormTopology = builder.createTopology()!! - KumulusStormTransformer.initializeTopology(stormTopology, config, "test") + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") } }