11package org.xyro.kumulus
22
3- import mu .KotlinLogging
3+ import io.github.oshai.kotlinlogging .KotlinLogging
44import org.apache.storm.shade.org.eclipse.jetty.util.ConcurrentHashSet
55import org.apache.storm.tuple.Tuple
66import org.xyro.kumulus.component.KumulusComponent
@@ -16,7 +16,7 @@ class KumulusAcker(
1616 private val maxSpoutPending : Long ,
1717 private val allowExtraAcking : Boolean ,
1818 private val messageTimeoutMillis : Long ,
19- private val spoutAvailabilityCheckTimeout : Long
19+ private val spoutAvailabilityCheckTimeout : Long ,
2020) {
2121 companion object {
2222 private val logger = KotlinLogging .logger {}
@@ -34,7 +34,10 @@ class KumulusAcker(
3434 }
3535 }
3636
37- fun startTree (component : KumulusSpout , messageId : Any? ) {
37+ fun startTree (
38+ component : KumulusSpout ,
39+ messageId : Any? ,
40+ ) {
3841 logger.debug { " startTree() -> component: $component , messageId: $messageId " }
3942 if (messageId == null ) {
4043 notifySpout(component, messageId, listOf ())
@@ -71,20 +74,25 @@ class KumulusAcker(
7174 messageState.spout,
7275 messageId,
7376 removedState.pendingTasks.map { it.key },
74- removedState.failedTasks.toList()
77+ removedState.failedTasks.toList(),
7578 )
7679 decrementPending()
7780 }
7881 }
7982 },
80- messageTimeoutMillis, TimeUnit .MILLISECONDS
83+ messageTimeoutMillis,
84+ TimeUnit .MILLISECONDS ,
8185 )
8286 }
8387 }
8488 }
8589 }
8690
87- fun expandTrees (component : KumulusComponent , dest : Int , tuple : KumulusTuple ) {
91+ fun expandTrees (
92+ component : KumulusComponent ,
93+ dest : Int ,
94+ tuple : KumulusTuple ,
95+ ) {
8896 logger.debug { " expandTrees() -> component: $component , dest: $dest , tuple: $tuple " }
8997 (tuple.kTuple as TupleImpl ).spoutMessageId?.let { messageId ->
9098 if (allowExtraAcking && state[messageId] == null ) {
@@ -97,7 +105,10 @@ class KumulusAcker(
97105 }
98106 }
99107
100- fun fail (component : KumulusComponent , input : Tuple ? ) {
108+ fun fail (
109+ component : KumulusComponent ,
110+ input : Tuple ? ,
111+ ) {
101112 logger.debug { " fail() -> component: $component , input: $input " }
102113 (input as TupleImpl ).spoutMessageId?.let { messageId ->
103114 val messageState = state[messageId]
@@ -112,15 +123,19 @@ class KumulusAcker(
112123 }
113124 }
114125
115- fun ack (component : KumulusComponent , input : Tuple ? ) {
126+ fun ack (
127+ component : KumulusComponent ,
128+ input : Tuple ? ,
129+ ) {
116130 logger.debug { " ack() -> component: $component , input: $input " }
117131 (input as TupleImpl ).spoutMessageId?.let { messageId ->
118132 val messageState = state[messageId]
119133 if (allowExtraAcking && state[messageId] == null ) {
120134 return
121135 }
122- if (messageState == null )
136+ if (messageState == null ) {
123137 error(" State missing for messageId $messageId while acking tuple in $component . Tuple: $input " )
138+ }
124139 checkComplete(messageState, component, input)
125140 }
126141 }
@@ -144,11 +159,13 @@ class KumulusAcker(
144159 }
145160 }
146161
147- fun getPendingCount (): Long {
148- return this .currentPending.get()
149- }
162+ fun getPendingCount (): Long = this .currentPending.get()
150163
151- private fun checkComplete (messageState : MessageState , component : KumulusComponent , input : Tuple ? ) {
164+ private fun checkComplete (
165+ messageState : MessageState ,
166+ component : KumulusComponent ,
167+ input : Tuple ? ,
168+ ) {
152169 (input as TupleImpl ).spoutMessageId?.let { spoutMessageId ->
153170 val removedTask = messageState.pendingTasks.remove(component.taskId)
154171 if (removedTask == null ) {
@@ -174,32 +191,41 @@ class KumulusAcker(
174191 }
175192 }
176193
177- private fun debugMessage (component : KumulusComponent , spoutMessageId : Any , messageState : MessageState ) {
194+ private fun debugMessage (
195+ component : KumulusComponent ,
196+ spoutMessageId : Any ,
197+ messageState : MessageState ,
198+ ) {
178199 logger.debug {
179200 " Pending task from $component for message $spoutMessageId was completed. " +
180- " Current pending tuples are:" + messageState.pendingTasks.let {
181- if (it.isEmpty()) {
182- " Empty\n "
183- } else {
184- val sb = StringBuilder (" \n " )
185- it.forEach { (k, v) ->
186- sb.append(" $k : $v \n " )
201+ " Current pending tuples are:" +
202+ messageState.pendingTasks.let {
203+ if (it.isEmpty()) {
204+ " Empty\n "
205+ } else {
206+ val sb = StringBuilder (" \n " )
207+ it.forEach { (k, v) ->
208+ sb.append(" $k : $v \n " )
209+ }
210+ sb.toString()
187211 }
188- sb.toString()
189212 }
190- }
191213 }
192214 }
193215
194- private fun notifySpout (spout : KumulusSpout , spoutMessageId : Any? , failedTasks : List <Int >) {
216+ private fun notifySpout (
217+ spout : KumulusSpout ,
218+ spoutMessageId : Any? ,
219+ failedTasks : List <Int >,
220+ ) {
195221 this .notifySpout(spout, spoutMessageId, listOf (), failedTasks)
196222 }
197223
198224 private fun notifySpout (
199225 spout : KumulusSpout ,
200226 spoutMessageId : Any? ,
201227 timeoutTasks : List <Int >,
202- failedTasks : List <Int >
228+ failedTasks : List <Int >,
203229 ) {
204230 emitter.completeMessageProcessing(spout, spoutMessageId, timeoutTasks, failedTasks)
205231 }
@@ -209,7 +235,9 @@ class KumulusAcker(
209235 synchronized(waitObject) {
210236 val currentPending = currentPending.decrementAndGet()
211237 if (currentPending >= maxSpoutPending) {
212- logger.error { " Max spout pending must have exceeded limit of $maxSpoutPending , current after decrement is $currentPending " }
238+ logger.error {
239+ " Max spout pending must have exceeded limit of $maxSpoutPending , current after decrement is $currentPending "
240+ }
213241 assert (false ) {
214242 " Max spout pending must have exceeded limit of $maxSpoutPending , current after decrement is $currentPending "
215243 }
@@ -224,7 +252,7 @@ class KumulusAcker(
224252 }
225253
226254 inner class MessageState (
227- val spout : KumulusSpout
255+ val spout : KumulusSpout ,
228256 ) {
229257 val pendingTasks = ConcurrentHashMap <Int , Tuple >()
230258 val failedTasks = ConcurrentHashSet <Int >()
0 commit comments