@@ -13,7 +13,10 @@ import org.sayandev.stickynote.core.utils.CoroutineUtils.launch
1313import redis.clients.jedis.Jedis
1414import redis.clients.jedis.JedisPool
1515import redis.clients.jedis.JedisPubSub
16+ import redis.clients.jedis.exceptions.JedisException
1617import java.util.*
18+ import java.util.concurrent.atomic.AtomicBoolean
19+ import java.util.logging.Level
1720import java.util.logging.Logger
1821import kotlin.coroutines.CoroutineContext
1922
@@ -28,78 +31,129 @@ abstract class RedisSubscriber<P, S>(
2831) : Subscriber<P, S>(namespace, name) {
2932
3033 val channel = " $namespace :$name "
31- val subJedis: Jedis = redis.resource
32- // val pubJedis: Jedis = redis.resource
34+ private var subJedis: Jedis = redis.resource
35+ private var subscriberThread: Thread ? = null
36+ private val isSubscribed = AtomicBoolean (false )
37+ private val shouldReconnect = AtomicBoolean (true )
38+ private val pubSub = createPubSub()
3339
3440 init {
35- val pubSub = object : JedisPubSub () {
41+ startSubscriber()
42+ }
43+
44+ private fun createPubSub (): JedisPubSub {
45+ return object : JedisPubSub () {
3646 override fun onMessage (channel : String , message : String ) {
3747 if (channel != this @RedisSubscriber.channel) return
38- val payloadWrapper = message.asPayloadWrapper<P >()
39-
40- when (payloadWrapper.state) {
41- PayloadWrapper .State .PROXY -> {
42- val isVelocity = runCatching { Class .forName(" com.velocitypowered.api.proxy.ProxyServer" ) != null }.isSuccess
43- if (! isVelocity) return
44-
45- launch(dispatcher) {
46- val result = (HANDLER_LIST .find {it.namespace == this @RedisSubscriber.namespace && it.name == this @RedisSubscriber.name } as Subscriber <P , S >)
47- .onSubscribe(payloadWrapper.typedPayload(payloadClass))
48- result.await()
49- publish(
50- PayloadWrapper (
51- payloadWrapper.uniqueId,
52- result.getCompleted(),
53- PayloadWrapper .State .RESPOND ,
54- payloadWrapper.source
55- )
56- )
57- }
58- }
48+ try {
49+ val payloadWrapper = message.asPayloadWrapper<P >()
50+ handleMessage(payloadWrapper)
51+ } catch (e: Exception ) {
52+ logger.log(Level .WARNING , " Error processing message: ${e.message} " )
53+ }
54+ }
55+ }
56+ }
5957
60- PayloadWrapper .State .FORWARD -> {
61- if (payloadWrapper.excludeSource && isSource(payloadWrapper.uniqueId)) return
62- launch(dispatcher) {
63- val result =
64- (HANDLER_LIST .find { it.namespace == this @RedisSubscriber.namespace && it.name == this @RedisSubscriber.name } as ? Subscriber <P , S >)?.onSubscribe(
65- payloadWrapper.typedPayload(payloadClass)
66- )
67- if (payloadWrapper.target == " PROCESSED" ) return @launch;
68- publish(
69- PayloadWrapper (
70- payloadWrapper.uniqueId,
71- result?.getCompleted() ? : payloadWrapper.payload,
72- if (result != null ) PayloadWrapper .State .RESPOND else payloadWrapper.state,
73- payloadWrapper.source,
74- " PROCESSED"
75- )
76- )
77- }
78- }
79- PayloadWrapper .State .RESPOND -> {
80- /* launch(dispatcher) {
81- (HANDLER_LIST.find { publisher -> publisher.id() == channel } as? Subscriber<P, S>)
82- ?.onSubscribe(payloadWrapper.typedPayload(// TODO: Result class))
83- }*/
58+ private fun handleMessage (payloadWrapper : PayloadWrapper <P >) {
59+ when (payloadWrapper.state) {
60+ PayloadWrapper .State .PROXY -> handleProxyMessage(payloadWrapper)
61+ PayloadWrapper .State .FORWARD -> handleForwardMessage(payloadWrapper)
62+ PayloadWrapper .State .RESPOND -> {} // Handle response if needed
63+ }
64+ }
65+
66+ private fun handleProxyMessage (payloadWrapper : PayloadWrapper <P >) {
67+ val isVelocity = runCatching { Class .forName(" com.velocitypowered.api.proxy.ProxyServer" ) != null }.isSuccess
68+ if (! isVelocity) return
69+
70+ launch(dispatcher) {
71+ val result = (HANDLER_LIST .find { it.namespace == namespace && it.name == name } as Subscriber <P , S >)
72+ .onSubscribe(payloadWrapper.typedPayload(payloadClass))
73+ result.await()
74+ publish(
75+ PayloadWrapper (
76+ payloadWrapper.uniqueId,
77+ result.getCompleted(),
78+ PayloadWrapper .State .RESPOND ,
79+ payloadWrapper.source
80+ )
81+ )
82+ }
83+ }
84+
85+ private fun handleForwardMessage (payloadWrapper : PayloadWrapper <P >) {
86+ if (payloadWrapper.excludeSource && isSource(payloadWrapper.uniqueId)) return
87+ launch(dispatcher) {
88+ val result = (HANDLER_LIST .find { it.namespace == namespace && it.name == name } as ? Subscriber <P , S >)
89+ ?.onSubscribe(payloadWrapper.typedPayload(payloadClass))
90+ if (payloadWrapper.target == " PROCESSED" ) return @launch
91+ publish(
92+ PayloadWrapper (
93+ payloadWrapper.uniqueId,
94+ result?.getCompleted() ? : payloadWrapper.payload,
95+ if (result != null ) PayloadWrapper .State .RESPOND else payloadWrapper.state,
96+ payloadWrapper.source,
97+ " PROCESSED"
98+ )
99+ )
100+ }
101+ }
102+
103+ private fun startSubscriber () {
104+ if (! shouldReconnect.get() || isSubscribed.get()) return
105+
106+ synchronized(this ) {
107+ if (isSubscribed.get()) return
108+
109+ subscriberThread?.interrupt()
110+ subscriberThread = Thread ({
111+ while (shouldReconnect.get()) {
112+ try {
113+ subJedis = redis.resource
114+ isSubscribed.set(true )
115+ subJedis.subscribe(pubSub, channel)
116+ } catch (e: JedisException ) {
117+ logger.log(Level .WARNING , " Redis connection lost: ${e.message} " )
118+ isSubscribed.set(false )
119+ safeCloseJedis()
120+ Thread .sleep(5000 )
121+ } catch (e: Exception ) {
122+ logger.log(Level .SEVERE , " Unexpected error in subscriber: ${e.message} " )
123+ isSubscribed.set(false )
124+ safeCloseJedis()
125+ Thread .sleep(5000 )
84126 }
85127 }
86- }
87- };
88- Thread ({ subJedis.subscribe(pubSub, channel) }, " redis-sub-sub-thread-${channel} -${UUID .randomUUID().toString().split(" -" ).first()} " ).start()
128+ }, " redis-sub-thread-${channel} -${UUID .randomUUID().toString().split(" -" ).first()} " )
129+ subscriberThread?.start()
130+ }
131+ }
132+
133+ private fun safeCloseJedis () {
134+ try {
135+ subJedis.close()
136+ } catch (e: Exception ) {
137+ logger.log(Level .WARNING , " Error closing Jedis connection: ${e.message} " )
138+ }
89139 }
90140
91141 private suspend fun publish (payload : PayloadWrapper <* >) {
92142 val publication = CompletableDeferred <Unit >()
93143 launch(dispatcher) {
94144 delay(TIMEOUT_SECONDS * 1000 )
95145 if (publication.isActive) {
96- publication.completeExceptionally(IllegalStateException (" Failed to publish payload in subscriber after ${ RedisPublisher . TIMEOUT_SECONDS } seconds. Payload: $payload (channel: ${id()} )" ))
146+ publication.completeExceptionally(IllegalStateException (" Failed to publish payload in subscriber after $TIMEOUT_SECONDS seconds. Payload: $payload (channel: ${id()} )" ))
97147 }
98148 }
99149
100150 val localJedis = redis.resource
101151 try {
102152 localJedis.publish(channel.toByteArray(), payload.asJson().toByteArray())
153+ publication.complete(Unit )
154+ } catch (e: Exception ) {
155+ logger.log(Level .WARNING , " Error publishing message: ${e.message} " )
156+ publication.completeExceptionally(e)
103157 } finally {
104158 localJedis.close()
105159 }
@@ -109,7 +163,14 @@ abstract class RedisSubscriber<P, S>(
109163 return Publisher .HANDLER_LIST .flatMap { publisher -> publisher.payloads.keys }.contains(uniqueId)
110164 }
111165
166+ fun shutdown () {
167+ shouldReconnect.set(false )
168+ pubSub.unsubscribe()
169+ safeCloseJedis()
170+ subscriberThread?.interrupt()
171+ }
172+
112173 companion object {
113174 const val TIMEOUT_SECONDS = 5L
114175 }
115- }
176+ }
0 commit comments