diff --git a/kraken_ws/account.py b/kraken_ws/account.py index e855d25..da73be2 100644 --- a/kraken_ws/account.py +++ b/kraken_ws/account.py @@ -179,7 +179,7 @@ async def connect(self): await self.connect_v2() def add_handler(self, event_type: str, handler: Callable): - """Add a message handler for a specific private data type (e.g., 'ownTrades').""" + """Add a message handler for a specific private data type (e.g., 'executions').""" if event_type not in self._private_handlers: self._private_handlers[event_type] = [] self._private_handlers[event_type].append(handler) @@ -217,14 +217,22 @@ async def _handle_ws_messages_v2(self): # Handle subscription confirmations elif isinstance(data, dict) and data.get("method") == "subscribe": if data.get("success"): - logger.info(f"Successfully subscribed: {data}") + logger.info(f"Successfully subscribed to private channel: {data}") else: - logger.error(f"Subscription failed: {data.get('error', 'Unknown error')}") + logger.error(f"Private subscription failed: {data.get('error', 'Unknown error')}") + + # Handle unsubscription confirmations + elif isinstance(data, dict) and data.get("method") == "unsubscribe": + if data.get("success"): + logger.info(f"Successfully unsubscribed from private channel: {data}") + else: + logger.error(f"Private unsubscription failed: {data.get('error', 'Unknown error')}") # Handle ping messages elif isinstance(data, dict) and data.get("method") == "ping": pong_message = {"method": "pong", "req_id": data.get("req_id")} await self._ws_connection.send(json.dumps(pong_message)) + logger.debug("Sent pong response") except json.JSONDecodeError: logger.warning(f"Failed to decode WebSocket message: {message}") @@ -286,43 +294,66 @@ async def _send_subscription_v2(self, subscription: Dict): # --- Private Data Subscriptions (v2 format) --- async def subscribe_own_trades(self, - snapshot: bool = True, + snap_trades: bool = True, + snap_orders: bool = False, consolidate_taker: bool = True, + ratecounter: bool = False, handler: Optional[Callable] = None): """ Subscribe to own trades data stream using v2 format. + + Args: + snap_trades: If true, the last 50 order fills will be included in snapshot + snap_orders: If true, open orders will be included in snapshot + consolidate_taker: If true, all possible status transitions will be sent + ratecounter: If true, the rate-limit counter is included in the stream + handler: Optional callback function to handle execution messages """ if handler: - self.add_handler('executions', handler) # v2 uses 'executions' channel + self.add_handler('executions', handler) subscription = { "method": "subscribe", "params": { "channel": "executions", - "snapshot": snapshot, - "consolidate_taker": consolidate_taker + "snap_trades": snap_trades, + "snap_orders": snap_orders, + "consolidate_taker": consolidate_taker, + "ratecounter": ratecounter } } await self._send_subscription_v2(subscription) self._subscriptions['executions'] = subscription - logger.info(f"Subscribed to executions (snapshot={snapshot}, consolidate_taker={consolidate_taker})") - - async def subscribe_open_orders(self, handler: Optional[Callable] = None): - """Subscribe to open orders data stream using v2 format.""" + logger.info(f"Subscribed to executions (snap_trades={snap_trades}, snap_orders={snap_orders}, consolidate_taker={consolidate_taker})") + + async def subscribe_open_orders(self, + snap_trades: bool = False, + snap_orders: bool = True, + consolidate_taker: bool = True, + ratecounter: bool = False, + handler: Optional[Callable] = None): + """ + Subscribe to open orders data stream using v2 format. + Note: This uses the same 'executions' channel but focuses on orders. + """ if handler: - self.add_handler('orders', handler) # v2 uses 'orders' channel + self.add_handler('executions', handler) subscription = { "method": "subscribe", "params": { - "channel": "orders" + "channel": "executions", + "snap_trades": snap_trades, + "snap_orders": snap_orders, + "consolidate_taker": consolidate_taker, + "ratecounter": ratecounter } } await self._send_subscription_v2(subscription) - self._subscriptions['orders'] = subscription - logger.info("Subscribed to orders") + self._subscriptions['executions_orders'] = subscription + logger.info(f"Subscribed to executions for orders (snap_trades={snap_trades}, snap_orders={snap_orders})") async def unsubscribe_own_trades(self): """Unsubscribe from own trades data stream using v2 format.""" @@ -339,24 +370,13 @@ async def unsubscribe_own_trades(self): await self._send_subscription_v2(unsubscription) self._subscriptions.pop('executions', None) + self._subscriptions.pop('executions_orders', None) # Remove both if present logger.info("Unsubscribed from executions") async def unsubscribe_open_orders(self): """Unsubscribe from open orders data stream using v2 format.""" - if 'orders' not in self._subscriptions: - logger.warning("Not subscribed to orders") - return - - unsubscription = { - "method": "unsubscribe", - "params": { - "channel": "orders" - } - } - - await self._send_subscription_v2(unsubscription) - self._subscriptions.pop('orders', None) - logger.info("Unsubscribed from orders") + # Same as unsubscribe_own_trades since they use the same channel + await self.unsubscribe_own_trades() # --- Account Information Methods --- @@ -552,7 +572,6 @@ async def edit_order(self, txid: str, volume: Optional[str] = None, **kwargs ) - # In your KrakenPythonClient class async def cancel_order(self, order_id): """ Async version of cancel_order that makes the HTTP request