Skip to content

Commit baf55ce

Browse files
committed
feat: update Redis publisher and subscriber for improved payload handling and add debug logging
1 parent 0de2c8d commit baf55ce

4 files changed

Lines changed: 38 additions & 14 deletions

File tree

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66

77
allprojects {
88
group = "org.sayandev"
9-
version = "1.8.9.76"
9+
version = "1.8.9.82"
1010
description = "A modular Kotlin framework for Minecraft: JE"
1111

1212
plugins.apply("maven-publish")

stickynote-bukkit/stickynote-bukkit-nms/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,7 @@ accessors {
465465
methodInferred("getType", "1.16.5")
466466
methodInferred("getSender", "1.16.5")
467467
method(Byte::class, "func_179841_c")
468+
field(Byte::class, "field_179842_b")
468469
fieldInferred("message", "1.16.5")
469470
}
470471
mapClass(ClientboundSetPlayerTeamPacket) {

stickynote-core/src/main/kotlin/org/sayandev/stickynote/core/messaging/publisher/RedisPublisher.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package org.sayandev.stickynote.core.messaging.publisher
33
import kotlinx.coroutines.CompletableDeferred
44
import kotlinx.coroutines.CoroutineDispatcher
55
import kotlinx.coroutines.delay
6+
import org.sayandev.stickynote.core.coroutine.dispatcher.AsyncDispatcher
67
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asJson
78
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asPayloadWrapper
89
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.typedPayload
10+
import org.sayandev.stickynote.core.messaging.subscriber.Subscriber
11+
import org.sayandev.stickynote.core.messaging.subscriber.Subscriber.Companion
912
import org.sayandev.stickynote.core.utils.CoroutineUtils.launch
1013
import redis.clients.jedis.JedisPool
1114
import redis.clients.jedis.JedisPubSub
@@ -75,7 +78,7 @@ abstract class RedisPublisher<P, S>(
7578
launch(dispatcher) {
7679
delay(TIMEOUT_SECONDS * 1000L)
7780
if (result.isActive) {
78-
result.completeExceptionally(IllegalStateException("Sent payload has not been responded in $TIMEOUT_SECONDS seconds. Payload: $payload"))
81+
result.completeExceptionally(IllegalStateException("Sent payload has not been responded in $TIMEOUT_SECONDS seconds. Payload: $payload (channel: ${id()}"))
7982
}
8083
payloads.remove(payload.uniqueId)
8184
}
@@ -95,5 +98,14 @@ abstract class RedisPublisher<P, S>(
9598

9699
companion object {
97100
const val TIMEOUT_SECONDS = 5L
101+
102+
init {
103+
launch(AsyncDispatcher("pub-debug-memory", 1)) {
104+
while (true) {
105+
delay(30_000)
106+
println("Current payload amount (pub): ${HANDLER_LIST.sumOf { it.payloads.size } }")
107+
}
108+
}
109+
}
98110
}
99111
}

stickynote-core/src/main/kotlin/org/sayandev/stickynote/core/messaging/subscriber/RedisSubscriber.kt

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ package org.sayandev.stickynote.core.messaging.subscriber
22

33
import kotlinx.coroutines.CompletableDeferred
44
import kotlinx.coroutines.ExperimentalCoroutinesApi
5+
import kotlinx.coroutines.delay
56
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper
67
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asJson
78
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.asPayloadWrapper
89
import org.sayandev.stickynote.core.messaging.publisher.PayloadWrapper.Companion.typedPayload
910
import org.sayandev.stickynote.core.messaging.publisher.Publisher
10-
import org.sayandev.stickynote.core.utils.CoroutineUtils.awaitWithTimeout
11+
import org.sayandev.stickynote.core.messaging.publisher.RedisPublisher
1112
import org.sayandev.stickynote.core.utils.CoroutineUtils.launch
1213
import redis.clients.jedis.Jedis
1314
import redis.clients.jedis.JedisPool
@@ -28,7 +29,7 @@ abstract class RedisSubscriber<P, S>(
2829

2930
val channel = "$namespace:$name"
3031
val subJedis: Jedis = redis.resource
31-
val pubJedis: Jedis = redis.resource
32+
// val pubJedis: Jedis = redis.resource
3233

3334
init {
3435
val pubSub = object : JedisPubSub() {
@@ -45,7 +46,7 @@ abstract class RedisSubscriber<P, S>(
4546
val result = (HANDLER_LIST.find {it.namespace == this@RedisSubscriber.namespace && it.name == this@RedisSubscriber.name } as Subscriber<P, S>)
4647
.onSubscribe(payloadWrapper.typedPayload(payloadClass))
4748
result.await()
48-
publishWithTimeout(
49+
publish(
4950
PayloadWrapper(
5051
payloadWrapper.uniqueId,
5152
result.getCompleted(),
@@ -64,7 +65,7 @@ abstract class RedisSubscriber<P, S>(
6465
payloadWrapper.typedPayload(payloadClass)
6566
)
6667
if (payloadWrapper.target == "PROCESSED") return@launch;
67-
publishWithTimeout(
68+
publish(
6869
PayloadWrapper(
6970
payloadWrapper.uniqueId,
7071
result?.getCompleted() ?: payloadWrapper.payload,
@@ -75,22 +76,32 @@ abstract class RedisSubscriber<P, S>(
7576
)
7677
}
7778
}
78-
79-
PayloadWrapper.State.RESPOND -> {}
79+
PayloadWrapper.State.RESPOND -> {
80+
/*launch(dispatcher) {
81+
(HANDLER_LIST.find { publisher -> publisher.id() == channel } as? Subscriber<P, S>)
82+
?.onSubscribe(payloadWrapper.typedPayload(// TODO: Result class))
83+
}*/
84+
}
8085
}
8186
}
8287
};
8388
Thread({ subJedis.subscribe(pubSub, channel) }, "redis-sub-sub-thread-${channel}-${UUID.randomUUID().toString().split("-").first()}").start()
8489
}
8590

86-
private suspend fun publishWithTimeout(payload: PayloadWrapper<*>) {
87-
val deferred = CompletableDeferred<Unit>()
91+
private suspend fun publish(payload: PayloadWrapper<*>) {
92+
val publication = CompletableDeferred<Unit>()
8893
launch(dispatcher) {
89-
pubJedis.publish(channel.toByteArray(), payload.asJson().toByteArray())
90-
deferred.complete(Unit)
94+
delay(TIMEOUT_SECONDS * 1000)
95+
if (publication.isActive) {
96+
publication.completeExceptionally(IllegalStateException("Failed to publish payload in subscriber after ${RedisPublisher.TIMEOUT_SECONDS} seconds. Payload: $payload (channel: ${id()})"))
97+
}
9198
}
92-
deferred.awaitWithTimeout(TIMEOUT_SECONDS * 1000L) {
93-
logger.warning("failed to publish payload `${payload}` within $TIMEOUT_SECONDS seconds.")
99+
100+
val localJedis = redis.resource
101+
try {
102+
localJedis.publish(channel.toByteArray(), payload.asJson().toByteArray())
103+
} finally {
104+
localJedis.close()
94105
}
95106
}
96107

0 commit comments

Comments
 (0)