Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
</developers>

<properties>
<kotlin.version>1.5.31</kotlin.version>
<kotlin.version>2.3.10</kotlin.version>
<storm.version>1.2.2</storm.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.class>org.xyro.kumulus.MainKt</main.class>
Expand All @@ -50,11 +50,6 @@
<version>${kotlin.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-common</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
Expand All @@ -68,15 +63,15 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.github.microutils</groupId>
<artifactId>kotlin-logging</artifactId>
<version>1.4.6</version>
<groupId>io.github.oshai</groupId>
<artifactId>kotlin-logging-jvm</artifactId>
<version>7.0.3</version>
</dependency>
<!-- tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.7.0</version>
<version>6.0.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -88,7 +83,7 @@
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
<version>2.1.9</version>
<version>2.2.2</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down Expand Up @@ -132,7 +127,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<version>3.15.0</version>
<executions>
<!-- Replacing default-compile as it is treated specially by maven -->
<execution>
Expand Down Expand Up @@ -163,7 +158,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<version>3.5.0</version>
<configuration>
<archive>
<manifest>
Expand All @@ -176,7 +171,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<version>3.4.0</version>
<executions>
<execution>
<id>attach-sources</id>
Expand All @@ -189,7 +184,7 @@
<plugin>
<groupId>org.jetbrains.dokka</groupId>
<artifactId>dokka-maven-plugin</artifactId>
<version>2.0.0</version>
<version>2.1.0</version>
<executions>
<execution>
<phase>pre-site</phase>
Expand All @@ -202,7 +197,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<version>3.2.8</version>
<executions>
<execution>
<id>sign-artifacts</id>
Expand All @@ -219,7 +214,7 @@
<plugin>
<groupId>com.github.gantsign.maven</groupId>
<artifactId>ktlint-maven-plugin</artifactId>
<version>1.7.0</version>
<version>3.5.0</version>
<executions>
<execution>
<goals>
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/org/xyro/kumulus/ExecutionPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger

class ExecutionPool(
size: Int,
private val threadFun: (KumulusMessage) -> Unit
private val threadFun: (KumulusMessage) -> Unit,
) {
// uncapped, memory for in-flight tuples should be taken into account and factored into max-spout-pending
private val mainQueue = LinkedBlockingQueue<KumulusMessage>()
Expand Down
82 changes: 55 additions & 27 deletions src/main/kotlin/org/xyro/kumulus/KumulusAcker.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.xyro.kumulus

import mu.KotlinLogging
import io.github.oshai.kotlinlogging.KotlinLogging
import org.apache.storm.shade.org.eclipse.jetty.util.ConcurrentHashSet
import org.apache.storm.tuple.Tuple
import org.xyro.kumulus.component.KumulusComponent
Expand All @@ -16,7 +16,7 @@ class KumulusAcker(
private val maxSpoutPending: Long,
private val allowExtraAcking: Boolean,
private val messageTimeoutMillis: Long,
private val spoutAvailabilityCheckTimeout: Long
private val spoutAvailabilityCheckTimeout: Long,
) {
companion object {
private val logger = KotlinLogging.logger {}
Expand All @@ -34,7 +34,10 @@ class KumulusAcker(
}
}

fun startTree(component: KumulusSpout, messageId: Any?) {
fun startTree(
component: KumulusSpout,
messageId: Any?,
) {
logger.debug { "startTree() -> component: $component, messageId: $messageId" }
if (messageId == null) {
notifySpout(component, messageId, listOf())
Expand Down Expand Up @@ -71,20 +74,25 @@ class KumulusAcker(
messageState.spout,
messageId,
removedState.pendingTasks.map { it.key },
removedState.failedTasks.toList()
removedState.failedTasks.toList(),
)
decrementPending()
}
}
},
messageTimeoutMillis, TimeUnit.MILLISECONDS
messageTimeoutMillis,
TimeUnit.MILLISECONDS,
)
}
}
}
}

fun expandTrees(component: KumulusComponent, dest: Int, tuple: KumulusTuple) {
fun expandTrees(
component: KumulusComponent,
dest: Int,
tuple: KumulusTuple,
) {
logger.debug { "expandTrees() -> component: $component, dest: $dest, tuple: $tuple" }
(tuple.kTuple as TupleImpl).spoutMessageId?.let { messageId ->
if (allowExtraAcking && state[messageId] == null) {
Expand All @@ -97,7 +105,10 @@ class KumulusAcker(
}
}

fun fail(component: KumulusComponent, input: Tuple?) {
fun fail(
component: KumulusComponent,
input: Tuple?,
) {
logger.debug { "fail() -> component: $component, input: $input" }
(input as TupleImpl).spoutMessageId?.let { messageId ->
val messageState = state[messageId]
Expand All @@ -112,15 +123,19 @@ class KumulusAcker(
}
}

fun ack(component: KumulusComponent, input: Tuple?) {
fun ack(
component: KumulusComponent,
input: Tuple?,
) {
logger.debug { "ack() -> component: $component, input: $input" }
(input as TupleImpl).spoutMessageId?.let { messageId ->
val messageState = state[messageId]
if (allowExtraAcking && state[messageId] == null) {
return
}
if (messageState == null)
if (messageState == null) {
error("State missing for messageId $messageId while acking tuple in $component. Tuple: $input")
}
checkComplete(messageState, component, input)
}
}
Expand All @@ -144,11 +159,13 @@ class KumulusAcker(
}
}

fun getPendingCount(): Long {
return this.currentPending.get()
}
fun getPendingCount(): Long = this.currentPending.get()

private fun checkComplete(messageState: MessageState, component: KumulusComponent, input: Tuple?) {
private fun checkComplete(
messageState: MessageState,
component: KumulusComponent,
input: Tuple?,
) {
(input as TupleImpl).spoutMessageId?.let { spoutMessageId ->
val removedTask = messageState.pendingTasks.remove(component.taskId)
if (removedTask == null) {
Expand All @@ -174,32 +191,41 @@ class KumulusAcker(
}
}

private fun debugMessage(component: KumulusComponent, spoutMessageId: Any, messageState: MessageState) {
private fun debugMessage(
component: KumulusComponent,
spoutMessageId: Any,
messageState: MessageState,
) {
logger.debug {
"Pending task from $component for message $spoutMessageId was completed. " +
"Current pending tuples are:" + messageState.pendingTasks.let {
if (it.isEmpty()) {
" Empty\n"
} else {
val sb = StringBuilder("\n")
it.forEach { (k, v) ->
sb.append("$k: $v\n")
"Current pending tuples are:" +
messageState.pendingTasks.let {
if (it.isEmpty()) {
" Empty\n"
} else {
val sb = StringBuilder("\n")
it.forEach { (k, v) ->
sb.append("$k: $v\n")
}
sb.toString()
}
sb.toString()
}
}
}
}

private fun notifySpout(spout: KumulusSpout, spoutMessageId: Any?, failedTasks: List<Int>) {
private fun notifySpout(
spout: KumulusSpout,
spoutMessageId: Any?,
failedTasks: List<Int>,
) {
this.notifySpout(spout, spoutMessageId, listOf(), failedTasks)
}

private fun notifySpout(
spout: KumulusSpout,
spoutMessageId: Any?,
timeoutTasks: List<Int>,
failedTasks: List<Int>
failedTasks: List<Int>,
) {
emitter.completeMessageProcessing(spout, spoutMessageId, timeoutTasks, failedTasks)
}
Expand All @@ -209,7 +235,9 @@ class KumulusAcker(
synchronized(waitObject) {
val currentPending = currentPending.decrementAndGet()
if (currentPending >= maxSpoutPending) {
logger.error { "Max spout pending must have exceeded limit of $maxSpoutPending, current after decrement is $currentPending" }
logger.error {
"Max spout pending must have exceeded limit of $maxSpoutPending, current after decrement is $currentPending"
}
assert(false) {
"Max spout pending must have exceeded limit of $maxSpoutPending, current after decrement is $currentPending"
}
Expand All @@ -224,7 +252,7 @@ class KumulusAcker(
}

inner class MessageState(
val spout: KumulusSpout
val spout: KumulusSpout,
) {
val pendingTasks = ConcurrentHashMap<Int, Tuple>()
val failedTasks = ConcurrentHashSet<Int>()
Expand Down
10 changes: 8 additions & 2 deletions src/main/kotlin/org/xyro/kumulus/KumulusEmitter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ import org.xyro.kumulus.component.KumulusSpout

interface KumulusEmitter {
fun getDestinations(tasks: List<Int>): List<KumulusComponent>
fun execute(destComponent: KumulusComponent, kumulusTuple: KumulusTuple)

fun execute(
destComponent: KumulusComponent,
kumulusTuple: KumulusTuple,
)

fun completeMessageProcessing(
spout: KumulusSpout,
spoutMessageId: Any?,
timeoutTasks: List<Int>,
failedTasks: List<Int>
failedTasks: List<Int>,
)

fun throwException(t: Throwable)
}
Loading