Skip to content

Commit ce3425a

Browse files
author
Sebastian Molenda
committed
Flaky subscribe
1 parent a6bac66 commit ce3425a

3 files changed

Lines changed: 221 additions & 158 deletions

File tree

pubnub/request_handlers/httpx.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,15 @@ def _build_envelope(self, p_options, e_options):
179179
if res.text is None:
180180
text = "N/A"
181181
else:
182-
text = res.text
182+
# Safely access response text - handle streaming responses
183+
try:
184+
text = res.text
185+
except httpx.ResponseNotRead:
186+
# For streaming responses, we need to read first
187+
text = res.content.decode('utf-8', errors='ignore')
188+
except Exception:
189+
# Fallback in case of any response reading issues
190+
text = f"Response content unavailable (status: {res.status_code})"
183191

184192
if res.status_code >= 500:
185193
err = PNERR_SERVER_ERROR
@@ -259,7 +267,15 @@ def _invoke_request(self, p_options, e_options, base_origin):
259267

260268
try:
261269
res = self.session.request(**args)
262-
logger.debug("GOT %s" % res.text)
270+
# Safely access response text - read content first for streaming responses
271+
try:
272+
logger.debug("GOT %s" % res.text)
273+
except httpx.ResponseNotRead:
274+
# For streaming responses, we need to read first
275+
logger.debug("GOT %s" % res.content.decode('utf-8', errors='ignore'))
276+
except Exception as e:
277+
# Fallback logging in case of any response reading issues
278+
logger.debug("GOT response (content access failed: %s)" % str(e))
263279

264280
except httpx.ConnectError as e:
265281
raise PubNubException(
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import logging
2+
import unittest
3+
import time
4+
import pubnub as pn
5+
6+
from unittest.mock import patch
7+
from pubnub.enums import PNReconnectionPolicy, PNStatusCategory
8+
from pubnub.exceptions import PubNubException
9+
from pubnub.managers import LinearDelay, ExponentialDelay
10+
from pubnub.pubnub import PubNub, SubscribeListener
11+
12+
from tests.helper import pnconf_env_copy
13+
14+
15+
pn.set_stream_logger('pubnub', logging.DEBUG)
16+
17+
18+
class DisconnectListener(SubscribeListener):
19+
status_result = None
20+
disconnected = False
21+
22+
def status(self, pubnub, status):
23+
if status.category == PNStatusCategory.PNDisconnectedCategory:
24+
print('Could not connect. Exiting...')
25+
self.disconnected = True
26+
27+
def message(self, pubnub, message):
28+
print(f'Message:\n{message.__dict__}')
29+
30+
def presence(self, pubnub, presence):
31+
print(f'Presence:\n{presence.__dict__}')
32+
33+
34+
class TestPubNubRetryPolicies(unittest.TestCase):
35+
def test_subscribe_retry_policy_none(self):
36+
ch = "test-subscribe-retry-policy-none"
37+
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
38+
reconnect_policy=PNReconnectionPolicy.NONE, enable_presence_heartbeat=True))
39+
listener = DisconnectListener()
40+
41+
try:
42+
pubnub.add_listener(listener)
43+
pubnub.subscribe().channels(ch).execute()
44+
45+
while not listener.disconnected:
46+
time.sleep(0.5)
47+
48+
except PubNubException as e:
49+
self.fail(e)
50+
51+
def test_subscribe_retry_policy_linear(self):
52+
# we don't test the actual delay calculation here, just everything around it
53+
def mock_calculate(*args, **kwargs):
54+
return 0.2
55+
56+
with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock:
57+
ch = "test-subscribe-retry-policy-linear"
58+
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
59+
reconnect_policy=PNReconnectionPolicy.LINEAR,
60+
enable_presence_heartbeat=True))
61+
listener = DisconnectListener()
62+
63+
try:
64+
pubnub.add_listener(listener)
65+
pubnub.subscribe().channels(ch).execute()
66+
67+
while not listener.disconnected:
68+
time.sleep(0.5)
69+
70+
except PubNubException as e:
71+
self.fail(e)
72+
73+
assert calculate_mock.call_count == LinearDelay.MAX_RETRIES + 1
74+
75+
def test_subscribe_retry_policy_exponential(self):
76+
# we don't test the actual delay calculation here, just everything around it
77+
def mock_calculate(*args, **kwargs):
78+
return 0.2
79+
80+
with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock:
81+
ch = "test-subscribe-retry-policy-exponential"
82+
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
83+
reconnect_policy=PNReconnectionPolicy.EXPONENTIAL,
84+
enable_presence_heartbeat=True))
85+
listener = DisconnectListener()
86+
87+
try:
88+
pubnub.add_listener(listener)
89+
pubnub.subscribe().channels(ch).execute()
90+
91+
while not listener.disconnected:
92+
time.sleep(0.5)
93+
94+
except PubNubException as e:
95+
self.fail(e)
96+
97+
assert calculate_mock.call_count == ExponentialDelay.MAX_RETRIES + 1
98+
99+
def test_subscribe_retry_policy_linear_with_max_retries(self):
100+
# we don't test the actual delay calculation here, just everything around it
101+
def mock_calculate(*args, **kwargs):
102+
return 0.2
103+
104+
with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock:
105+
ch = "test-subscribe-retry-policy-linear"
106+
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
107+
maximum_reconnection_retries=3,
108+
reconnect_policy=PNReconnectionPolicy.LINEAR,
109+
enable_presence_heartbeat=True))
110+
listener = DisconnectListener()
111+
112+
try:
113+
pubnub.add_listener(listener)
114+
pubnub.subscribe().channels(ch).execute()
115+
116+
while not listener.disconnected:
117+
time.sleep(0.5)
118+
119+
except PubNubException as e:
120+
self.fail(e)
121+
122+
assert calculate_mock.call_count == 3
123+
124+
def test_subscribe_retry_policy_exponential_with_max_retries(self):
125+
# we don't test the actual delay calculation here, just everything around it
126+
def mock_calculate(*args, **kwargs):
127+
return 0.2
128+
129+
with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock:
130+
ch = "test-subscribe-retry-policy-exponential"
131+
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
132+
maximum_reconnection_retries=3,
133+
reconnect_policy=PNReconnectionPolicy.EXPONENTIAL,
134+
enable_presence_heartbeat=True))
135+
listener = DisconnectListener()
136+
137+
try:
138+
pubnub.add_listener(listener)
139+
pubnub.subscribe().channels(ch).execute()
140+
141+
while not listener.disconnected:
142+
time.sleep(0.5)
143+
144+
except PubNubException as e:
145+
self.fail(e)
146+
147+
assert calculate_mock.call_count == 3
148+
149+
def test_subscribe_retry_policy_linear_with_custom_interval(self):
150+
# we don't test the actual delay calculation here, just everything around it
151+
def mock_calculate(*args, **kwargs):
152+
return 0.2
153+
154+
with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock:
155+
ch = "test-subscribe-retry-policy-linear"
156+
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
157+
maximum_reconnection_retries=3, reconnection_interval=1,
158+
reconnect_policy=PNReconnectionPolicy.LINEAR,
159+
enable_presence_heartbeat=True))
160+
listener = DisconnectListener()
161+
162+
try:
163+
pubnub.add_listener(listener)
164+
pubnub.subscribe().channels(ch).execute()
165+
166+
while not listener.disconnected:
167+
time.sleep(0.5)
168+
169+
except PubNubException as e:
170+
self.fail(e)
171+
172+
assert calculate_mock.call_count == 0

0 commit comments

Comments
 (0)