Skip to content

Commit 49afd33

Browse files
committed
Lint code
1 parent 7f1b634 commit 49afd33

2 files changed

Lines changed: 13 additions & 13 deletions

File tree

src/main/kotlin/org/xyro/kumulus/KumulusTopology.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -301,12 +301,13 @@ class KumulusTopology(
301301
logger.trace { "Component ${c.componentId}/${c.taskId} is currently busy" }
302302
var shouldEnqueue = true
303303
if (message is ExecuteMessage) {
304-
val messageWaitStartTime = c.waitStart.get()
305-
if (messageWaitStartTime > 0){
304+
val messageWaitStartTime = c.waitStart.get()
305+
if (messageWaitStartTime > 0) {
306306
val delay = System.nanoTime() - messageWaitStartTime
307307
if ((delay >= lateMessageMaxWaitInNanos) &&
308-
lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)) {
309-
if (!message.isLate.get()){
308+
lateMessagesStreamsToDrop.contains(message.tuple.kTuple.sourceStreamId)
309+
) {
310+
if (!message.isLate.get()) {
310311
message.isLate.set(true)
311312
onLateMessageHook?.let { onLateMessageHook ->
312313
try {
@@ -328,7 +329,7 @@ class KumulusTopology(
328329
}
329330
}
330331

331-
if (lateMessagesShouldDrop){
332+
if (lateMessagesShouldDrop) {
332333
shouldEnqueue = false
333334
}
334335
}
@@ -337,7 +338,7 @@ class KumulusTopology(
337338
c.waitStart.compareAndSet(0, System.nanoTime())
338339
}
339340

340-
if (shouldEnqueue){
341+
if (shouldEnqueue) {
341342
if (queuePushbackWait <= 0L) {
342343
boltExecutionPool.enqueue(message)
343344
} else {

src/test/kotlin/org/xyro/kumulus/TestDroppingStaleMessages.kt

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ class TestDroppingStaleMessages {
2222
val config: MutableMap<String, Any> = mutableMapOf()
2323
config[Config.TOPOLOGY_MAX_SPOUT_PENDING] = 1L
2424
config[KumulusTopology.CONF_THREAD_POOL_CORE_SIZE] = 5L
25-
config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] = true;
26-
config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] = setOf("default");
27-
config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] = 1;
25+
config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_SHOULD_DROP] = true
26+
config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_STREAMS_NAME] = setOf("default")
27+
config[KumulusTopology.CONF_LATE_MESSAGES_DROPPING_MAX_WAIT_SECONDS] = 1
2828

2929
builder.setSpout("spout", LatencyDeltaSpout())
3030

@@ -34,7 +34,6 @@ class TestDroppingStaleMessages {
3434
builder.setBolt("delay-unanchored-bolt", StuckBolt())
3535
.noneGrouping("unanchoring-bolt")
3636

37-
3837
val stormTopology = builder.createTopology()!!
3938
val kumulusTopology =
4039
KumulusStormTransformer.initializeTopology(stormTopology, config, "test")
@@ -50,8 +49,8 @@ class TestDroppingStaleMessages {
5049
Thread.sleep(5000)
5150
kumulusTopology.stop()
5251

53-
logger.info { "Dropped ${lateHookCalled} messages" }
54-
assertTrue { lateHookCalled}
52+
logger.info { "Dropped $lateHookCalled messages" }
53+
assertTrue { lateHookCalled }
5554
}
5655

5756
class LatencyDeltaSpout : DummySpout({
@@ -110,7 +109,7 @@ class TestDroppingStaleMessages {
110109
}) {
111110
override fun execute(input: Tuple, collector: BasicOutputCollector) {
112111
logger.info { "StuckBolt: started" }
113-
while (true){
112+
while (true) {
114113
Thread.sleep(50)
115114
}
116115
}

0 commit comments

Comments
 (0)