Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 49 additions & 30 deletions kraken_ws/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async def connect(self):
await self.connect_v2()

def add_handler(self, event_type: str, handler: Callable):
"""Add a message handler for a specific private data type (e.g., 'ownTrades')."""
"""Add a message handler for a specific private data type (e.g., 'executions')."""
if event_type not in self._private_handlers:
self._private_handlers[event_type] = []
self._private_handlers[event_type].append(handler)
Expand Down Expand Up @@ -217,14 +217,22 @@ async def _handle_ws_messages_v2(self):
# Handle subscription confirmations
elif isinstance(data, dict) and data.get("method") == "subscribe":
if data.get("success"):
logger.info(f"Successfully subscribed: {data}")
logger.info(f"Successfully subscribed to private channel: {data}")
else:
logger.error(f"Subscription failed: {data.get('error', 'Unknown error')}")
logger.error(f"Private subscription failed: {data.get('error', 'Unknown error')}")

# Handle unsubscription confirmations
elif isinstance(data, dict) and data.get("method") == "unsubscribe":
if data.get("success"):
logger.info(f"Successfully unsubscribed from private channel: {data}")
else:
logger.error(f"Private unsubscription failed: {data.get('error', 'Unknown error')}")

# Handle ping messages
elif isinstance(data, dict) and data.get("method") == "ping":
pong_message = {"method": "pong", "req_id": data.get("req_id")}
await self._ws_connection.send(json.dumps(pong_message))
logger.debug("Sent pong response")

except json.JSONDecodeError:
logger.warning(f"Failed to decode WebSocket message: {message}")
Expand Down Expand Up @@ -286,43 +294,66 @@ async def _send_subscription_v2(self, subscription: Dict):
# --- Private Data Subscriptions (v2 format) ---

async def subscribe_own_trades(self,
snapshot: bool = True,
snap_trades: bool = True,
snap_orders: bool = False,
consolidate_taker: bool = True,
ratecounter: bool = False,
handler: Optional[Callable] = None):
"""
Subscribe to own trades data stream using v2 format.

Args:
snap_trades: If true, the last 50 order fills will be included in snapshot
snap_orders: If true, open orders will be included in snapshot
consolidate_taker: If true, all possible status transitions will be sent
ratecounter: If true, the rate-limit counter is included in the stream
handler: Optional callback function to handle execution messages
"""
if handler:
self.add_handler('executions', handler) # v2 uses 'executions' channel
self.add_handler('executions', handler)

subscription = {
"method": "subscribe",
"params": {
"channel": "executions",
"snapshot": snapshot,
"consolidate_taker": consolidate_taker
"snap_trades": snap_trades,
"snap_orders": snap_orders,
"consolidate_taker": consolidate_taker,
"ratecounter": ratecounter
}
}

await self._send_subscription_v2(subscription)
self._subscriptions['executions'] = subscription
logger.info(f"Subscribed to executions (snapshot={snapshot}, consolidate_taker={consolidate_taker})")

async def subscribe_open_orders(self, handler: Optional[Callable] = None):
"""Subscribe to open orders data stream using v2 format."""
logger.info(f"Subscribed to executions (snap_trades={snap_trades}, snap_orders={snap_orders}, consolidate_taker={consolidate_taker})")

async def subscribe_open_orders(self,
snap_trades: bool = False,
snap_orders: bool = True,
consolidate_taker: bool = True,
ratecounter: bool = False,
handler: Optional[Callable] = None):
"""
Subscribe to open orders data stream using v2 format.
Note: This uses the same 'executions' channel but focuses on orders.
"""
if handler:
self.add_handler('orders', handler) # v2 uses 'orders' channel
self.add_handler('executions', handler)

subscription = {
"method": "subscribe",
"params": {
"channel": "orders"
"channel": "executions",
"snap_trades": snap_trades,
"snap_orders": snap_orders,
"consolidate_taker": consolidate_taker,
"ratecounter": ratecounter
}
}

await self._send_subscription_v2(subscription)
self._subscriptions['orders'] = subscription
logger.info("Subscribed to orders")
self._subscriptions['executions_orders'] = subscription
logger.info(f"Subscribed to executions for orders (snap_trades={snap_trades}, snap_orders={snap_orders})")

async def unsubscribe_own_trades(self):
"""Unsubscribe from own trades data stream using v2 format."""
Expand All @@ -339,24 +370,13 @@ async def unsubscribe_own_trades(self):

await self._send_subscription_v2(unsubscription)
self._subscriptions.pop('executions', None)
self._subscriptions.pop('executions_orders', None) # Remove both if present
logger.info("Unsubscribed from executions")

async def unsubscribe_open_orders(self):
"""Unsubscribe from open orders data stream using v2 format."""
if 'orders' not in self._subscriptions:
logger.warning("Not subscribed to orders")
return

unsubscription = {
"method": "unsubscribe",
"params": {
"channel": "orders"
}
}

await self._send_subscription_v2(unsubscription)
self._subscriptions.pop('orders', None)
logger.info("Unsubscribed from orders")
# Same as unsubscribe_own_trades since they use the same channel
await self.unsubscribe_own_trades()

# --- Account Information Methods ---

Expand Down Expand Up @@ -552,7 +572,6 @@ async def edit_order(self, txid: str, volume: Optional[str] = None,
**kwargs
)

# In your KrakenPythonClient class
async def cancel_order(self, order_id):
"""
Async version of cancel_order that makes the HTTP request
Expand Down
Loading