Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ kumulusTopology.prepare();
kumulusTopology.start(true);
```

Kumulus also provides a native builder with a Storm-like API:

```kotlin
val builder = org.xyro.kumulus.topology.KumulusTopologyBuilder()

val config: MutableMap<String, Any> = mutableMapOf()

builder.setSpout("spout", Spout())
builder.setBolt("bolt", Bolt())
.shuffleGrouping("spout")

val kumulusTopology = KumulusStormTransformer.initializeTopology(
builder.createTopology(), config, "topology_name"
)
kumulusTopology.prepare()
kumulusTopology.start(true)
```

## Benchmark

Latency histograms produced by passing 100,000 (10% warm-up) tiny tuples into the fairly simple topology defined in KumulusStormTransformerTest:
Expand Down
163 changes: 118 additions & 45 deletions src/main/kotlin/org/xyro/kumulus/KumulusStormTransformer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import org.apache.storm.Config
import org.apache.storm.Constants
import org.apache.storm.generated.Bolt
import org.apache.storm.generated.ComponentCommon
import org.apache.storm.generated.GlobalStreamId
import org.apache.storm.generated.ComponentObject
import org.apache.storm.generated.SpoutSpec
import org.apache.storm.generated.StateSpoutSpec
import org.apache.storm.generated.StormTopology
import org.apache.storm.metric.api.IMetric
import org.apache.storm.task.TopologyContext
Expand All @@ -23,6 +24,7 @@ import org.apache.storm.utils.Utils
import org.xyro.kumulus.component.KumulusBolt
import org.xyro.kumulus.component.KumulusComponent
import org.xyro.kumulus.component.KumulusSpout
import org.xyro.kumulus.topology.KumulusTopologyDefinition
import java.io.Serializable

@Suppress("unused")
Expand All @@ -37,22 +39,65 @@ class KumulusStormTransformer {
@Suppress("UNCHECKED_CAST")
@JvmStatic
fun initializeTopology(topology: StormTopology, rawConfig: MutableMap<String, Any>, stormId: String): KumulusTopology {
val boltField = StormTopology::class.java.getDeclaredField("bolts")!!
boltField.isAccessible = true
val serializedBoltsMap: Map<String, Bolt> = boltField.get(topology) as Map<String, Bolt>
val boltsMap: Map<String, IComponent> = serializedBoltsMap.entries.associate { (id, bolt) ->
val boltObject = bolt._bolt_object!!
id to (Utils.javaDeserialize<Serializable>(boltObject._serialized_java, Serializable::class.java) as IComponent)
val serializedBoltsMap = readBolts(topology)
val serializedSpoutsMap = readSpouts(topology)

val boltsMap = serializedBoltsMap.entries.associate { (id, bolt) ->
val boltObject = bolt._bolt_object
?: throw IllegalArgumentException("Bolt '$id' has no serialized bolt object")
id to (
Utils.javaDeserialize<Serializable>(boltObject._serialized_java, Serializable::class.java) as? IRichBolt
?: throw IllegalArgumentException("Bolt '$id' is not an IRichBolt")
)
}

val spoutField = StormTopology::class.java.getDeclaredField("spouts")!!
spoutField.isAccessible = true
val serializedSpoutsMap: Map<String, SpoutSpec> = spoutField.get(topology) as Map<String, SpoutSpec>
val spoutsMap: Map<String, IComponent> = serializedSpoutsMap.entries.associate { (id, spout) ->
val spoutObject = spout._spout_object!!
id to (Utils.javaDeserialize<Serializable>(spoutObject._serialized_java, Serializable::class.java) as IComponent)
val spoutsMap = serializedSpoutsMap.entries.associate { (id, spout) ->
val spoutObject = spout._spout_object
?: throw IllegalArgumentException("Spout '$id' has no serialized spout object")
id to (
Utils.javaDeserialize<Serializable>(spoutObject._serialized_java, Serializable::class.java) as? IRichSpout
?: throw IllegalArgumentException("Spout '$id' is not an IRichSpout")
)
}

val componentCommonMap =
serializedSpoutsMap.mapValues { (_, spec) -> spec._common!! } +
serializedBoltsMap.mapValues { (_, bolt) -> bolt._common!! }

return initializeTopology(topology, spoutsMap, boltsMap, componentCommonMap, rawConfig, stormId)
}

/**
* Initialize a Kumulus topology from Kumulus native topology definition.
* @param topology the native topology to transform
* @param rawConfig the Storm configuration
* @param stormId the Storm topology ID string
*/
@JvmStatic
fun initializeTopology(
topology: KumulusTopologyDefinition,
rawConfig: MutableMap<String, Any>,
stormId: String
): KumulusTopology {
val spoutsMap = topology.spouts.mapValues { (_, declaration) -> declaration.spout }
val boltsMap = topology.bolts.mapValues { (_, declaration) -> declaration.bolt }
val componentCommonMap =
topology.spouts.mapValues { (_, declaration) -> declaration.common } +
topology.bolts.mapValues { (_, declaration) -> declaration.common }

val stormTopology = buildStormTopology(topology)
return initializeTopology(stormTopology, spoutsMap, boltsMap, componentCommonMap, rawConfig, stormId)
}

@Suppress("LongMethod")
private fun initializeTopology(
topology: StormTopology,
spoutsMap: Map<String, IRichSpout>,
boltsMap: Map<String, IRichBolt>,
componentCommons: Map<String, ComponentCommon>,
rawConfig: MutableMap<String, Any>,
stormId: String
): KumulusTopology {
val taskToComponent = mutableMapOf<Int, String>()

val componentToSortedTasks = mutableMapOf<String, List<Int>>()
Expand All @@ -73,21 +118,12 @@ class KumulusStormTransformer {
val executorData: Map<String, Any> = LinkedHashMap()
val registeredMetrics: Map<Int, Map<Int, Map<String, IMetric>>> = LinkedHashMap()

val getter: (String) -> ComponentCommon = { id ->
if (topology._bolts?.containsKey(id)!!) {
topology._bolts[id]!!._common
} else {
topology._spouts[id]!!._common
}
}
val getter: (String) -> ComponentCommon = { id -> componentCommons[id]!! }

val componentMaps = listOf(spoutsMap, boltsMap)
val componentMaps = listOf(spoutsMap as Map<String, IComponent>, boltsMap as Map<String, IComponent>)

val kComponents: MutableList<KumulusComponent> = mutableListOf()

val kComponentInputs: MutableMap<Pair<String, GlobalStreamId>, org.apache.storm.generated.Grouping> =
mutableMapOf()

var id = 1
for (componentMap in componentMaps) {
for ((name) in componentMap) {
Expand Down Expand Up @@ -117,10 +153,6 @@ class KumulusStormTransformer {
streamToFields[stream] = Fields(streamInfo?._output_fields)
}

componentCommon._inputs?.forEach {
kComponentInputs[name to it.key] = it.value
}

id++
}
}
Expand All @@ -131,21 +163,7 @@ class KumulusStormTransformer {
}

componentToSortedTasks.forEach { componentId: String, taskIds: List<Int> ->
val (componentObjectSerialized, componentCommon) =
when {
topology._spouts!!.containsKey(componentId) ->
topology._spouts[componentId]!!
.let { it._spout_object!! to it._common!! }
topology._bolts!!.containsKey(componentId) ->
topology._bolts[componentId]!!
.let { it._bolt_object!! to it._common!! }
componentId == Constants.SYSTEM_COMPONENT_ID ->
null to null
else ->
throw Exception(
"Component name '$componentId' was not found in underlying topology object"
)
}
val componentCommon = componentCommons[componentId]

taskIds.forEach { taskId ->
val componentInstance =
Expand All @@ -156,11 +174,22 @@ class KumulusStormTransformer {
// Declared hard-coded
}
})
} else {
} else if (spoutsMap.containsKey(componentId)) {
val component = spoutsMap[componentId]!!
Utils.javaDeserialize<Serializable>(
Utils.javaSerialize(component as Serializable),
Serializable::class.java
)
} else if (boltsMap.containsKey(componentId)) {
val component = boltsMap[componentId]!!
Utils.javaDeserialize<Serializable>(
componentObjectSerialized!!._serialized_java,
Utils.javaSerialize(component as Serializable),
Serializable::class.java
)
} else {
throw Exception(
"Component name '$componentId' was not found in underlying topology object"
)
}

val context = TopologyContext(
Expand Down Expand Up @@ -213,6 +242,50 @@ class KumulusStormTransformer {
return KumulusTopology(kComponents, config)
}

@Suppress("UNCHECKED_CAST")
private fun readSpouts(topology: StormTopology): Map<String, SpoutSpec> {
topology._spouts?.let { return it }
val spoutField = StormTopology::class.java.getDeclaredField("spouts")!!
spoutField.isAccessible = true
return spoutField.get(topology) as Map<String, SpoutSpec>
}

@Suppress("UNCHECKED_CAST")
private fun readBolts(topology: StormTopology): Map<String, Bolt> {
topology._bolts?.let { return it }
val boltField = StormTopology::class.java.getDeclaredField("bolts")!!
boltField.isAccessible = true
return boltField.get(topology) as Map<String, Bolt>
}

private fun buildStormTopology(topology: KumulusTopologyDefinition): StormTopology {
val spouts = topology.spouts.mapValues { (_, declaration) ->
SpoutSpec().apply {
_spout_object = serializeComponentObject(declaration.spout as Serializable)
_common = declaration.common.deepCopy()
}
}.toMutableMap()

val bolts = topology.bolts.mapValues { (_, declaration) ->
Bolt().apply {
_bolt_object = serializeComponentObject(declaration.bolt as Serializable)
_common = declaration.common.deepCopy()
}
}.toMutableMap()

return StormTopology().apply {
set_spouts(spouts)
set_bolts(bolts)
set_state_spouts(mutableMapOf<String, StateSpoutSpec>())
}
}

private fun serializeComponentObject(component: Serializable): ComponentObject {
return ComponentObject().apply {
set_serialized_java(Utils.javaSerialize(component))
}
}

private fun validateTopology(components: MutableList<KumulusComponent>) {
components.forEach { src ->
(src as? KumulusBolt)?.apply {
Expand Down
Loading