Skip to content

Commit 0c19425

Browse files
committed
fix: make queueMessages client option True by default
- Add TO3g test verifying queueMessages defaults to true - Add RTL6c2 check to fail immediately when queueMessages is false and connection is CONNECTING/DISCONNECTED - Add test for publish failure on CONNECTING state with queueMessages=false
1 parent 78d8233 commit 0c19425

4 files changed

Lines changed: 65 additions & 4 deletions

File tree

ably/realtime/connectionmanager.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,19 +182,31 @@ async def send_protocol_message(self, protocol_message: dict) -> None:
182182
Returns:
183183
None
184184
"""
185-
if self.state not in (ConnectionState.DISCONNECTED, ConnectionState.CONNECTING, ConnectionState.CONNECTED):
186-
raise AblyException(f"ConnectionManager.send_protocol_message(): called in {self.state}", 500, 50000)
185+
state_should_queue = (self.state in
186+
(ConnectionState.INITIALIZED, ConnectionState.DISCONNECTED, ConnectionState.CONNECTING))
187+
188+
if self.state != ConnectionState.CONNECTED and not state_should_queue:
189+
raise AblyException(f"Cannot send message while connection is {self.state}", 400, 90000)
187190

188191
pending_message = PendingMessage(protocol_message)
189192

193+
# RTL6c2: If queueMessages is false, fail immediately when not CONNECTED
194+
if state_should_queue:
195+
if pending_message.ack_required and not self.options.queue_messages:
196+
raise AblyException(
197+
f"Cannot send message while connection is {self.state}, and queue_messages is false",
198+
400,
199+
90000,
200+
)
201+
190202
# Assign msgSerial to messages that need acknowledgment
191203
if pending_message.ack_required:
192204
# New message - assign fresh serial
193205
protocol_message['msgSerial'] = self.msg_serial
194206
self.pending_message_queue.push(pending_message)
195207
self.msg_serial += 1
196208

197-
if self.state in (ConnectionState.DISCONNECTED, ConnectionState.CONNECTING):
209+
if state_should_queue:
198210
self.queued_messages.appendleft(pending_message)
199211
if pending_message.ack_required:
200212
await pending_message.future

ably/types/options.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def decode(self, delta: bytes, base: bytes) -> bytes:
2626

2727
class Options(AuthOptions):
2828
def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realtime_host=None, port=0,
29-
tls_port=0, use_binary_protocol=True, queue_messages=False, recover=False, environment=None,
29+
tls_port=0, use_binary_protocol=True, queue_messages=True, recover=False, environment=None,
3030
http_open_timeout=None, http_request_timeout=None, realtime_request_timeout=None,
3131
http_max_retry_count=None, http_max_retry_duration=None, fallback_hosts=None,
3232
fallback_retry_timeout=None, disconnected_retry_timeout=None, idempotent_rest_publishing=None,

test/ably/realtime/realtimechannel_publish_test.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,46 @@ async def check_disconnected():
285285

286286
await ably.close()
287287

288+
async def test_publish_fails_on_connecting_when_queue_messages_false(self):
289+
"""RTN7d: Verify publish fails immediately when connection is CONNECTING and queueMessages=false"""
290+
# Create client with queueMessages=False
291+
ably = await TestApp.get_ably_realtime(
292+
use_binary_protocol=self.use_binary_protocol,
293+
queue_messages=False,
294+
auto_connect=False
295+
)
296+
297+
# Intercept connection to prevent it from succeeding
298+
connection_manager = ably.connection.connection_manager
299+
300+
async def block_connection():
301+
# Do nothing - prevent connection from completing
302+
pass
303+
304+
connection_manager.connect_base = block_connection
305+
306+
# Start connecting
307+
ably.connect()
308+
309+
# Wait for CONNECTING state
310+
await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTING), timeout=5)
311+
assert ably.connection.state == ConnectionState.CONNECTING
312+
313+
channel = ably.channels.get('test_connecting_channel')
314+
# Don't attach - just try to publish directly
315+
316+
# Try to publish while in CONNECTING state with queueMessages=false
317+
# Should fail immediately with error code 80000
318+
with pytest.raises(AblyException) as exc_info:
319+
await channel.publish('test_event', 'test_data')
320+
321+
# Verify it failed with appropriate error
322+
assert exc_info.value.code == 90000
323+
assert exc_info.value.status_code == 400
324+
assert 'CONNECTING' in str(exc_info.value)
325+
326+
await ably.close()
327+
288328
# RTN19a2 - Reset msgSerial on new connectionId
289329
async def test_msgserial_resets_on_new_connection_id(self):
290330
"""RTN19a2: Verify msgSerial resets to 0 when connectionId changes"""

test/ably/realtime/realtimeconnection_test.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,3 +460,12 @@ def intercepted_websocket_frame(data):
460460
assert all(isinstance(frame, str) for frame in received_raw_websocket_frames)
461461

462462
await ably.close()
463+
464+
# TO3g
465+
async def test_queue_messages_defaults_to_true(self):
466+
"""TO3g: Verify that queueMessages client option defaults to true"""
467+
ably = await TestApp.get_ably_realtime(auto_connect=False)
468+
469+
# TO3g: queueMessages defaults to true
470+
assert ably.options.queue_messages is True
471+
assert ably.connection.connection_manager.options.queue_messages is True

0 commit comments

Comments
 (0)