Skip to content

Commit 3faa68c

Browse files
author
Sebastian Molenda
committed
Aligned status emmiting
1 parent 0656e6a commit 3faa68c

8 files changed

Lines changed: 126 additions & 30 deletions

File tree

pubnub/enums.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class PNStatusCategory(Enum):
3737
PNTLSConnectionFailedCategory = 15
3838
PNTLSUntrustedCertificateCategory = 16
3939
PNInternalExceptionCategory = 17
40+
PNSubscriptionChangedCategory = 18
41+
PNConnectionErrorCategory = 19
4042

4143

4244
class PNOperationType(object):

pubnub/event_engine/effects.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ async def receive_messages_async(self, channels, groups, timetoken, region):
128128
recieve_failure = events.ReceiveFailureEvent('Empty response', 1, timetoken=timetoken)
129129
self.event_engine.trigger(recieve_failure)
130130
elif response.status.error:
131+
if self.stop_event.is_set():
132+
self.logger.debug(f'Recieve messages cancelled: {response.status.error_data.__dict__}')
133+
return
131134
self.logger.warning(f'Recieve messages failed: {response.status.error_data.__dict__}')
132135
recieve_failure = events.ReceiveFailureEvent(response.status.error_data, 1, timetoken=timetoken)
133136
self.event_engine.trigger(recieve_failure)
@@ -437,6 +440,9 @@ def set_pn(self, pubnub: PubNub):
437440
self.message_worker = BaseMessageWorker(pubnub)
438441

439442
def emit(self, invocation: invocations.PNEmittableInvocation):
443+
if isinstance(invocation, list):
444+
for inv in invocation:
445+
self.emit(inv)
440446
if isinstance(invocation, invocations.EmitMessagesInvocation):
441447
self.emit_message(invocation)
442448
if isinstance(invocation, invocations.EmitStatusInvocation):
@@ -452,5 +458,9 @@ def emit_status(self, invocation: invocations.EmitStatusInvocation):
452458
pn_status = PNStatus()
453459
pn_status.category = invocation.status
454460
pn_status.operation = invocation.operation
461+
if invocation.context and invocation.context.channels:
462+
pn_status.affected_channels = invocation.context.channels
463+
if invocation.context and invocation.context.groups:
464+
pn_status.affected_groups = invocation.context.groups
455465
pn_status.error = False
456466
self.pubnub._subscription_manager._listener_manager.announce_status(pn_status)

pubnub/event_engine/models/invocations.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Union
1+
from typing import List, Optional, Union
22
from pubnub.exceptions import PubNubException
33
from pubnub.enums import PNOperationType, PNStatusCategory
44

@@ -90,10 +90,16 @@ def __init__(self, messages: Union[None, List[str]]) -> None:
9090

9191

9292
class EmitStatusInvocation(PNEmittableInvocation):
93-
def __init__(self, status: Union[None, PNStatusCategory], operation: Union[None, PNOperationType] = None) -> None:
93+
def __init__(
94+
self,
95+
status: Optional[PNStatusCategory],
96+
operation: Optional[PNOperationType] = None,
97+
context=None,
98+
) -> None:
9499
super().__init__()
95100
self.status = status
96101
self.operation = operation
102+
self.context = context
97103

98104

99105
"""

pubnub/event_engine/models/states.py

Lines changed: 92 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,15 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
122122

123123
return PNTransition(
124124
state=HandshakingState,
125-
context=self._context
125+
context=self._context,
126+
invocation=[
127+
invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
128+
operation=PNOperationType.PNSubscribeOperation,
129+
context=self._context),
130+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
131+
operation=PNOperationType.PNSubscribeOperation,
132+
context=self._context),
133+
]
126134
)
127135

128136
def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition:
@@ -148,7 +156,7 @@ def reconnecting(self, event: events.HandshakeFailureEvent, context: PNContext)
148156

149157
return PNTransition(
150158
state=HandshakeReconnectingState,
151-
context=self._context
159+
context=self._context,
152160
)
153161

154162
def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
@@ -183,8 +191,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
183191
return PNTransition(
184192
state=UnsubscribedState,
185193
context=self._context,
186-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
187-
operation=PNOperationType.PNUnsubscribeOperation)
194+
invocation=[
195+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
196+
operation=PNOperationType.PNSubscribeOperation,
197+
context=self._context),
198+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
199+
operation=PNOperationType.PNSubscribeOperation,
200+
context=self._context),
201+
]
188202
)
189203

190204

@@ -218,7 +232,10 @@ def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTra
218232

219233
return PNTransition(
220234
state=HandshakeStoppedState,
221-
context=self._context
235+
context=self._context,
236+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
237+
operation=PNOperationType.PNSubscribeOperation,
238+
context=self._context)
222239
)
223240

224241
def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition:
@@ -230,7 +247,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
230247

231248
return PNTransition(
232249
state=HandshakeReconnectingState,
233-
context=self._context
250+
context=self._context,
251+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
252+
operation=PNOperationType.PNSubscribeOperation,
253+
context=self._context)
234254
)
235255

236256
def handshake_reconnect(self, event: events.HandshakeReconnectFailureEvent, context: PNContext) -> PNTransition:
@@ -240,7 +260,7 @@ def handshake_reconnect(self, event: events.HandshakeReconnectFailureEvent, cont
240260

241261
return PNTransition(
242262
state=HandshakeReconnectingState,
243-
context=self._context
263+
context=self._context,
244264
)
245265

246266
def give_up(self, event: events.HandshakeReconnectGiveupEvent, context: PNContext) -> PNTransition:
@@ -253,7 +273,7 @@ def give_up(self, event: events.HandshakeReconnectGiveupEvent, context: PNContex
253273
status_invocation = invocations.EmitStatusInvocation(status=event.reason.status.category,
254274
operation=PNOperationType.PNUnsubscribeOperation)
255275
else:
256-
status_invocation = invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory)
276+
status_invocation = invocations.EmitStatusInvocation(PNStatusCategory.PNConnectionErrorCategory)
257277

258278
return PNTransition(
259279
state=HandshakeFailedState,
@@ -305,7 +325,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
305325

306326
return PNTransition(
307327
state=HandshakingState,
308-
context=self._context
328+
context=self._context,
329+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
330+
operation=PNOperationType.PNSubscribeOperation,
331+
context=self._context)
309332
)
310333

311334
def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
@@ -340,8 +363,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
340363
return PNTransition(
341364
state=UnsubscribedState,
342365
context=self._context,
343-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
344-
operation=PNOperationType.PNUnsubscribeOperation)
366+
invocation=[
367+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
368+
operation=PNOperationType.PNSubscribeOperation,
369+
context=self._context),
370+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
371+
operation=PNOperationType.PNSubscribeOperation,
372+
context=self._context),
373+
]
345374
)
346375

347376

@@ -374,8 +403,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
374403
return PNTransition(
375404
state=UnsubscribedState,
376405
context=self._context,
377-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
378-
operation=PNOperationType.PNUnsubscribeOperation)
406+
invocation=[
407+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
408+
operation=PNOperationType.PNSubscribeOperation,
409+
context=self._context),
410+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
411+
operation=PNOperationType.PNSubscribeOperation,
412+
context=self._context),
413+
]
379414
)
380415

381416

@@ -412,7 +447,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
412447

413448
return PNTransition(
414449
state=self.__class__,
415-
context=self._context
450+
context=self._context,
451+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
452+
operation=PNOperationType.PNSubscribeOperation,
453+
context=self._context)
416454
)
417455

418456
def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition:
@@ -446,7 +484,7 @@ def receiving_failure(self, event: events.ReceiveFailureEvent, context: PNContex
446484
self._context.timetoken = event.timetoken
447485
return PNTransition(
448486
state=ReceiveReconnectingState,
449-
context=self._context
487+
context=self._context,
450488
)
451489

452490
def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
@@ -477,8 +515,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
477515
return PNTransition(
478516
state=UnsubscribedState,
479517
context=self._context,
480-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
481-
operation=PNOperationType.PNUnsubscribeOperation)
518+
invocation=[
519+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
520+
operation=PNOperationType.PNSubscribeOperation,
521+
context=self._context),
522+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
523+
operation=PNOperationType.PNSubscribeOperation,
524+
context=self._context),
525+
]
482526
)
483527

484528

@@ -515,7 +559,10 @@ def reconnect_failure(self, event: events.ReceiveReconnectFailureEvent, context:
515559

516560
return PNTransition(
517561
state=ReceiveReconnectingState,
518-
context=self._context
562+
context=self._context,
563+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.UnexpectedDisconnectCategory,
564+
operation=PNOperationType.PNSubscribeOperation,
565+
context=self._context)
519566
)
520567

521568
def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition:
@@ -527,7 +574,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
527574

528575
return PNTransition(
529576
state=ReceiveReconnectingState,
530-
context=self._context
577+
context=self._context,
578+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
579+
operation=PNOperationType.PNSubscribeOperation,
580+
context=self._context)
531581
)
532582

533583
def disconnect(self, event: events.DisconnectEvent, context: PNContext) -> PNTransition:
@@ -546,7 +596,9 @@ def give_up(self, event: events.ReceiveReconnectGiveupEvent, context: PNContext)
546596
return PNTransition(
547597
state=ReceiveFailedState,
548598
context=self._context,
549-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory)
599+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNUnexpectedDisconnectCategory,
600+
operation=PNOperationType.PNSubscribeOperation,
601+
context=self._context)
550602
)
551603

552604
def reconnect_success(self, event: events.ReceiveReconnectSuccessEvent, context: PNContext) -> PNTransition:
@@ -602,7 +654,10 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
602654

603655
return PNTransition(
604656
state=ReceivingState,
605-
context=self._context
657+
context=self._context,
658+
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNSubscriptionChangedCategory,
659+
operation=PNOperationType.PNSubscribeOperation,
660+
context=self._context)
606661
)
607662

608663
def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
@@ -637,8 +692,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
637692
return PNTransition(
638693
state=UnsubscribedState,
639694
context=self._context,
640-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
641-
operation=PNOperationType.PNUnsubscribeOperation)
695+
invocation=[
696+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
697+
operation=PNOperationType.PNSubscribeOperation,
698+
context=self._context),
699+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
700+
operation=PNOperationType.PNSubscribeOperation,
701+
context=self._context),
702+
]
642703
)
643704

644705

@@ -671,8 +732,14 @@ def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext)
671732
return PNTransition(
672733
state=UnsubscribedState,
673734
context=self._context,
674-
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
675-
operation=PNOperationType.PNUnsubscribeOperation)
735+
invocation=[
736+
invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory,
737+
operation=PNOperationType.PNSubscribeOperation,
738+
context=self._context),
739+
invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
740+
operation=PNOperationType.PNSubscribeOperation,
741+
context=self._context),
742+
]
676743
)
677744

678745

pubnub/pubnub_asyncio.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ def __init__(self):
559559
self.error_queue = Queue()
560560

561561
def status(self, pubnub, status):
562+
super().status(pubnub, status)
562563
if utils.is_subscribed_event(status) and not self.connected_event.is_set():
563564
self.connected_event.set()
564565
elif utils.is_unsubscribed_event(status) and not self.disconnected_event.is_set():

tests/acceptance/subscribe/steps/then_steps.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ async def step_impl(ctx: PNContext):
5858

5959
status = ctx.callback.status_result
6060
assert isinstance(status, PNStatus)
61-
assert status.category == PNStatusCategory.PNDisconnectedCategory
61+
assert status.category in [PNStatusCategory.PNConnectionErrorCategory,
62+
PNStatusCategory.PNUnexpectedDisconnectCategory]
6263
await ctx.pubnub.stop()
6364

6465

tests/functional/event_engine/test_state_machine.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,23 @@
22
from pubnub.event_engine.statemachine import StateMachine
33

44

5+
class FakePN:
6+
def __init__(self) -> None:
7+
self._subscription_manager = self
8+
self._listener_manager = self
9+
10+
def announce_status(self, pn_status):
11+
...
12+
13+
514
def test_initialize_with_state():
615
machine = StateMachine(states.UnsubscribedState)
716
assert states.UnsubscribedState.__name__ == machine.get_state_name()
817

918

1019
def test_unsubscribe_state_trigger_sub_changed():
1120
machine = StateMachine(states.UnsubscribedState)
21+
machine.get_dispatcher().set_pn(FakePN())
1222
machine.trigger(events.SubscriptionChangedEvent(
1323
channels=['test'], groups=[]
1424
))
@@ -17,6 +27,7 @@ def test_unsubscribe_state_trigger_sub_changed():
1727

1828
def test_unsubscribe_state_trigger_sub_restored():
1929
machine = StateMachine(states.UnsubscribedState)
30+
machine.get_dispatcher().set_pn(FakePN())
2031
machine.trigger(events.SubscriptionChangedEvent(
2132
channels=['test'], groups=[]
2233
))

tests/integrational/asyncio/test_subscribe.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ async def test_subscribe_publish_unsubscribe():
129129
# )
130130
@pytest.mark.asyncio
131131
async def test_encrypted_subscribe_publish_unsubscribe():
132-
133132
pubnub = PubNubAsyncio(pnconf_enc_env_copy(enable_subscribe=True))
134133
pubnub.config.uuid = 'test-subscribe-asyncio-uuid'
135134

@@ -341,7 +340,6 @@ async def test_cg_join_leave():
341340

342341
pubnub.add_listener(callback_messages)
343342
pubnub.subscribe().channel_groups(gr).execute()
344-
345343
callback_messages_future = asyncio.ensure_future(callback_messages.wait_for_connect())
346344
presence_messages_future = asyncio.ensure_future(callback_presence.wait_for_presence_on(ch))
347345
await asyncio.wait([callback_messages_future, presence_messages_future])

0 commit comments

Comments
 (0)