22//
33// This source file is part of the RediStack open source project
44//
5- // Copyright (c) 2020 RediStack project authors
5+ // Copyright (c) 2020-2022 RediStack project authors
66// Licensed under Apache License v2.0
77//
88// See LICENSE.txt for license information
@@ -24,18 +24,44 @@ import NIO
2424/// - message: The message data that was received from the `publisher`.
2525public typealias RedisSubscriptionMessageReceiver = ( _ publisher: RedisChannelName , _ message: RESPValue ) -> Void
2626
27- /// A closure handler invoked for Pub/Sub subscription changes.
27+ /// The details of the subscription change.
28+ /// - Parameters:
29+ /// - subscriptionKey: The subscribed channel or pattern that had its subscription status changed.
30+ /// - currentSubscriptionCount: The current total number of subscriptions the connection has.
31+ public typealias RedisSubscriptionChangeDetails = ( subscriptionKey: String , currentSubscriptionCount: Int )
32+
33+ /// A closure handler invoked for Pub/Sub subscribe commands.
34+ ///
35+ /// This closure will be invoked only *once* for each individual channel or pattern that is having its subscription changed,
36+ /// even if it was done as a single PSUBSCRIBE or SUBSCRIBE command.
37+ /// - Warning: The receiver is called on the same `NIO.EventLoop` that processed the message.
38+ ///
39+ /// If you are doing non-trivial work in response to PubSub messages, it is **highly recommended** that the work be dispatched to another thread
40+ /// so as to not block further messages from being processed.
41+ /// - Parameter details: The details of the subscription.
42+ public typealias RedisSubscribeHandler = ( _ details: RedisSubscriptionChangeDetails ) -> Void
43+
44+ /// An enumeration of possible sources of Pub/Sub unsubscribe events.
45+ public enum RedisUnsubscribeEventSource {
46+ /// The client sent an unsubscribe command either as UNSUBSCRIBE or PUNSUBSCRIBE.
47+ case userInitiated
48+ /// The client encountered an error and had to unsubscribe.
49+ /// - Parameter _: The error the client encountered.
50+ case clientError( _ error: Error )
51+ }
52+
53+ /// A closure handler invoked for Pub/Sub unsubscribe commands.
2854///
2955/// This closure will be invoked only *once* for each individual channel or pattern that is having its subscription changed,
30- /// even if it was done as a single PSUBSCRIBE, SUBSCRIBE, PUNSUBSCRIBE, or UNSUBSCRIBE command.
56+ /// even if it was done as a single PUNSUBSCRIBE or UNSUBSCRIBE command.
3157/// - Warning: The receiver is called on the same `NIO.EventLoop` that processed the message.
3258///
3359/// If you are doing non-trivial work in response to PubSub messages, it is **highly recommended** that the work be dispatched to another thread
3460/// so as to not block further messages from being processed.
3561/// - Parameters:
36- /// - subscriptionKey : The subscribed channel or pattern that had its subscription status changed .
37- /// - currentSubscriptionCount : The current total number of subscriptions the connection has .
38- public typealias RedisSubscriptionChangeHandler = ( _ subscriptionKey : String , _ currentSubscriptionCount : Int ) -> Void
62+ /// - details : The details of the subscription.
63+ /// - source : The source of the unsubscribe event .
64+ public typealias RedisUnsubscribeHandler = ( _ details : RedisSubscriptionChangeDetails , _ source : RedisUnsubscribeEventSource ) -> Void
3965
4066/// A list of patterns or channels that a Pub/Sub subscription change is targetting.
4167///
@@ -146,7 +172,7 @@ extension RedisPubSubHandler {
146172
147173 guard let subscription = self . subscriptions [ prefixedKey] else { return }
148174
149- subscription. onSubscribe ? ( subscriptionKey, subscriptionCount)
175+ subscription. onSubscribe ? ( ( subscriptionKey, subscriptionCount) )
150176 subscription. onSubscribe = nil // nil to free memory
151177 self . subscriptions [ prefixedKey] = subscription
152178
@@ -161,8 +187,8 @@ extension RedisPubSubHandler {
161187 ) {
162188 let prefixedKey = self . prefixKey ( subscriptionKey, with: keyPrefix)
163189 guard let subscription = self . subscriptions. removeValue ( forKey: prefixedKey) else { return }
164-
165- subscription. onUnsubscribe ? ( subscriptionKey, subscriptionCount)
190+
191+ subscription. onUnsubscribe ? ( ( subscriptionKey, subscriptionCount) , . userInitiated )
166192 subscription. type. gauge. decrement ( )
167193
168194 switch self . pendingUnsubscribes. removeValue ( forKey: prefixedKey) {
@@ -208,8 +234,8 @@ extension RedisPubSubHandler {
208234 public func addSubscription(
209235 for target: RedisSubscriptionTarget ,
210236 messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver ,
211- onSubscribe subscribeHandler: RedisSubscriptionChangeHandler ? ,
212- onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler ?
237+ onSubscribe subscribeHandler: RedisSubscribeHandler ? ,
238+ onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler ?
213239 ) -> EventLoopFuture < Int > {
214240 guard self . eventLoop. inEventLoop else {
215241 return self . eventLoop. flatSubmit {
@@ -481,7 +507,8 @@ extension RedisPubSubHandler: ChannelInboundHandler {
481507 let receivers = self . subscriptions
482508 self . subscriptions. removeAll ( )
483509 receivers. forEach {
484- $0. value. onUnsubscribe ? ( $0. key, 0 )
510+ let source : RedisUnsubscribeEventSource = error. map { . clientError( $0) } ?? . userInitiated
511+ $0. value. onUnsubscribe ? ( ( $0. key, 0 ) , source)
485512 $0. value. type. gauge. decrement ( )
486513 }
487514 }
@@ -521,14 +548,14 @@ extension RedisPubSubHandler {
521548 fileprivate final class Subscription {
522549 let type : SubscriptionType
523550 let onMessage : RedisSubscriptionMessageReceiver
524- var onSubscribe : RedisSubscriptionChangeHandler ? // will be set to nil after first call
525- let onUnsubscribe : RedisSubscriptionChangeHandler ?
551+ var onSubscribe : RedisSubscribeHandler ? // will be set to nil after first call
552+ let onUnsubscribe : RedisUnsubscribeHandler ?
526553
527554 init (
528555 type: SubscriptionType ,
529556 messageReceiver: @escaping RedisSubscriptionMessageReceiver ,
530- subscribeHandler: RedisSubscriptionChangeHandler ? ,
531- unsubscribeHandler: RedisSubscriptionChangeHandler ?
557+ subscribeHandler: RedisSubscribeHandler ? ,
558+ unsubscribeHandler: RedisUnsubscribeHandler ?
532559 ) {
533560 self . type = type
534561 self . onMessage = messageReceiver
0 commit comments