From 0f604fc192d386d9009da536477b6d4c62c66a33 Mon Sep 17 00:00:00 2001 From: Re'em Bensimhon Date: Fri, 27 Feb 2026 18:13:18 +0200 Subject: [PATCH] Added missing tests, fixed unanchored spout emits bug --- .../kumulus/collector/KumulusCollector.kt | 2 +- .../xyro/kumulus/TestUnanchoredSpoutEmit.kt | 71 +++++++++++++++++++ .../kumulus/TestUnserializableBoltField.kt | 50 +++++++++++++ 3 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 src/test/kotlin/org/xyro/kumulus/TestUnanchoredSpoutEmit.kt create mode 100644 src/test/kotlin/org/xyro/kumulus/TestUnserializableBoltField.kt diff --git a/src/main/kotlin/org/xyro/kumulus/collector/KumulusCollector.kt b/src/main/kotlin/org/xyro/kumulus/collector/KumulusCollector.kt index fe9cf07..e5234c2 100644 --- a/src/main/kotlin/org/xyro/kumulus/collector/KumulusCollector.kt +++ b/src/main/kotlin/org/xyro/kumulus/collector/KumulusCollector.kt @@ -101,6 +101,6 @@ abstract class KumulusCollector( throw RuntimeException("Bolts wrong emit method called for ${component.componentId}/${component.taskId}") } acker.startTree(component, messageId) - return componentEmit(streamId, tuple, messageId!!) + return componentEmit(streamId, tuple, messageId) } } diff --git a/src/test/kotlin/org/xyro/kumulus/TestUnanchoredSpoutEmit.kt b/src/test/kotlin/org/xyro/kumulus/TestUnanchoredSpoutEmit.kt new file mode 100644 index 0000000..2d18ae5 --- /dev/null +++ b/src/test/kotlin/org/xyro/kumulus/TestUnanchoredSpoutEmit.kt @@ -0,0 +1,71 @@ +package org.xyro.kumulus + +import org.apache.storm.Config +import org.apache.storm.topology.BasicOutputCollector +import org.apache.storm.topology.OutputFieldsDeclarer +import org.apache.storm.topology.base.BaseBasicBolt +import org.apache.storm.tuple.Fields +import org.apache.storm.tuple.Tuple +import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.test.assertTrue + +class TestUnanchoredSpoutEmit { + @Test(timeout = 5_000) + fun testSpoutEmitWithoutMessageIdDoesNotCrashTopology() { + TestBolt.messageSeenByBolt = CountDownLatch(1) + TestSpout.emitted.set(false) + + val builder = KumulusTopologyBuilder() + val config: MutableMap = mutableMapOf() + config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L + config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 2L + + builder.setSpout("spout", TestSpout()) + builder + .setBolt("bolt", TestBolt()) + .shuffleGrouping("spout") + + val kumulusTopology = + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") + + kumulusTopology.prepare(5, TimeUnit.SECONDS) + kumulusTopology.start(block = false) + + assertTrue(TestBolt.messageSeenByBolt.await(20, TimeUnit.SECONDS), "Unanchored tuple should be delivered to bolt") + kumulusTopology.stop() + assertTrue(TestSpout.emitted.get(), "Spout should have emitted at least one tuple") + } + + private class TestSpout : DummySpout({ it.declare(Fields("id")) }) { + override fun nextTuple() { + if (emitted.compareAndSet(false, true)) { + collector.emit(listOf(1), null) + } + } + + companion object { + val emitted = AtomicBoolean(false) + } + } + + private class TestBolt : BaseBasicBolt() { + override fun execute( + input: Tuple, + collector: BasicOutputCollector, + ) { + if (input.sourceComponent == "spout") { + messageSeenByBolt.countDown() + } + } + + override fun declareOutputFields(declarer: OutputFieldsDeclarer) = Unit + + companion object { + var messageSeenByBolt = CountDownLatch(1) + } + } +} diff --git a/src/test/kotlin/org/xyro/kumulus/TestUnserializableBoltField.kt b/src/test/kotlin/org/xyro/kumulus/TestUnserializableBoltField.kt new file mode 100644 index 0000000..9e8b29a --- /dev/null +++ b/src/test/kotlin/org/xyro/kumulus/TestUnserializableBoltField.kt @@ -0,0 +1,50 @@ +package org.xyro.kumulus + +import org.apache.storm.Config +import org.apache.storm.task.OutputCollector +import org.apache.storm.task.TopologyContext +import org.apache.storm.topology.IRichBolt +import org.apache.storm.topology.OutputFieldsDeclarer +import org.apache.storm.tuple.Fields +import org.apache.storm.tuple.Tuple +import org.junit.Test +import org.xyro.kumulus.topology.KumulusTopologyBuilder + +class TestUnserializableBoltField { + @Test(expected = RuntimeException::class, timeout = 5_000) + fun testCreateTopologyWithUnserializableBoltField() { + val builder = KumulusTopologyBuilder() + val config: MutableMap = mutableMapOf() + config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L + config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 2L + + builder.setSpout("spout", TestSpout()) + builder + .setBolt("unserializable-bolt", UnserializableFieldBolt()) + .shuffleGrouping("spout") + + KumulusStormTransformer.initializeTopology(builder.createTopology(), config, "test") + } + + class TestSpout : DummySpout({ it.declare(Fields()) }) { + override fun nextTuple() = Unit + } + + class UnserializableFieldBolt : IRichBolt { + private val unserializable = Thread() + + override fun prepare( + p0: MutableMap?, + p1: TopologyContext?, + p2: OutputCollector?, + ) = Unit + + override fun execute(input: Tuple?) = Unit + + override fun cleanup() = Unit + + override fun declareOutputFields(declarer: OutputFieldsDeclarer?) = Unit + + override fun getComponentConfiguration(): MutableMap = mutableMapOf() + } +}