Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ably/realtime/connectionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ async def __get_transport_params(self) -> dict:
params["v"] = protocol_version
if self.connection_details:
params["resume"] = self.connection_details.connection_key
# RTN2a: Set format to msgpack if use_binary_protocol is enabled
if self.options.use_binary_protocol:
params["format"] = "msgpack"
return params

async def close_impl(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion ably/realtime/realtime_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,8 @@ def _on_message(self, proto_msg: dict) -> None:
elif action == ProtocolMessageAction.MESSAGE:
messages = []
try:
messages = Message.from_encoded_array(proto_msg.get('messages'), context=self.__decoding_context)
messages = Message.from_encoded_array(proto_msg.get('messages'),
cipher=self.cipher, context=self.__decoding_context)
self.__decoding_context.last_message_id = messages[-1].id
self.__channel_serial = channel_serial
except AblyException as e:
Expand Down
29 changes: 24 additions & 5 deletions ably/transport/websockettransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from enum import IntEnum
from typing import TYPE_CHECKING

import msgpack

from ably.http.httputils import HttpUtils
from ably.types.connectiondetails import ConnectionDetails
from ably.util.eventemitter import EventEmitter
Expand Down Expand Up @@ -71,6 +73,7 @@ def __init__(self, connection_manager: ConnectionManager, host: str, params: dic
self.is_disposed = False
self.host = host
self.params = params
self.format = params.get('format', 'json')
super().__init__()

def connect(self):
Expand Down Expand Up @@ -189,12 +192,23 @@ async def ws_read_loop(self):
raise AblyException('ws_read_loop started with no websocket', 500, 50000)
try:
async for raw in self.websocket:
msg = json.loads(raw)
task = asyncio.create_task(self.on_protocol_message(msg))
task.add_done_callback(self.on_protcol_message_handled)
# Decode based on format
try:
msg = self.decode_raw_websocket_frame(raw)
task = asyncio.create_task(self.on_protocol_message(msg))
task.add_done_callback(self.on_protcol_message_handled)
except Exception as e:
log.exception(
f"WebSocketTransport.decode(): Unexpected exception handling channel message: {e}"
)
except ConnectionClosedOK:
return

def decode_raw_websocket_frame(self, raw: str | bytes) -> dict:
if self.format == 'msgpack':
return msgpack.unpackb(raw, raw=False)
return json.loads(raw)

def on_protcol_message_handled(self, task):
try:
exception = task.exception()
Expand Down Expand Up @@ -231,8 +245,13 @@ async def close(self):
async def send(self, message: dict):
if self.websocket is None:
raise Exception()
raw_msg = json.dumps(message)
log.info(f'WebSocketTransport.send(): sending {raw_msg}')
# Encode based on format
if self.format == 'msgpack':
raw_msg = msgpack.packb(message, use_bin_type=True)
log.info(f'WebSocketTransport.send(): sending msgpack message (length: {len(raw_msg)} bytes)')
else:
raw_msg = json.dumps(message)
log.info(f'WebSocketTransport.send(): sending {raw_msg}')
await self.websocket.send(raw_msg)

def set_idle_timer(self, timeout: float):
Expand Down
Loading
Loading