From 4fe6a927d9e9728c218f903dca5d9155ea6f5dd2 Mon Sep 17 00:00:00 2001 From: Ryan Watts Date: Thu, 31 Jul 2025 12:23:39 -0600 Subject: [PATCH 1/2] fix: get_my_recent trades error --- data/cached_prices.json | 13 + data/trades/1_trades.json | 17 ++ kraken_ws/account.py | 213 ++++++++------ src/apps/strategies/example.py | 35 ++- src/clients/kraken_python_client.py | 415 ++++++++++++++++------------ 5 files changed, 414 insertions(+), 279 deletions(-) create mode 100644 data/cached_prices.json create mode 100644 data/trades/1_trades.json diff --git a/data/cached_prices.json b/data/cached_prices.json new file mode 100644 index 0000000..03afe1f --- /dev/null +++ b/data/cached_prices.json @@ -0,0 +1,13 @@ +{ + "EURQ": 1.1425, + "ZUSD": 1.0, + "USDD": 0.9988999999999999, + "EURR": 1.14245, + "DOGE": 0.2217431, + "USDQ": 0.99925, + "ZEUR": 1.144015, + "USDG": 1.0000499999999999, + "TUSD": 1.0437, + "PYUSD": 0.99985, + "LSETH": 4134.049999999999 +} \ No newline at end of file diff --git a/data/trades/1_trades.json b/data/trades/1_trades.json new file mode 100644 index 0000000..af86c28 --- /dev/null +++ b/data/trades/1_trades.json @@ -0,0 +1,17 @@ +{"timestamp": "2025-07-30 12:19:50.445404", "side": "BUY", "pair": "EURQ/USD", "quantity": 1.9924, "price": 1.1482, "total_value": 2.28767368, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 26.9924, "ZUSD": 22.71232632}} +{"timestamp": "2025-07-30 15:24:45.048539", "side": "SELL", "pair": "EURQ/USD", "quantity": 93.3642, "price": 1.14, "total_value": 106.43518799999998, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": -66.3718, "ZUSD": 129.14751431999997}} +{"timestamp": "2025-07-30 16:00:36.933012", "side": "SELL", "pair": "EURQ/USD", "quantity": 133.7279, "price": 1.14, "total_value": 152.449806, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": -118.7279, "ZUSD": 172.449806}} +{"timestamp": "2025-07-31 11:44:57.389310", "side": "SELL", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.1428, "total_value": 5.714, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 15.0, "ZUSD": 25.714}} +{"timestamp": "2025-07-31 11:54:36.893116", "side": "BUY", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.1401, "total_value": 5.7005, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 20.0, "ZUSD": 20.0135}} +{"timestamp": "2025-07-31 12:01:24.956864", "side": "SELL", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.143, "total_value": 5.715, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 15.0, "ZUSD": 25.7285}} +{"timestamp": "2025-07-31 12:02:49.630297", "side": "BUY", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.1401, "total_value": 5.7005, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 20.0, "ZUSD": 20.028}} +{"timestamp": "2025-07-31 12:04:13.981845", "side": "SELL", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.143, "total_value": 5.715, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 15.0, "ZUSD": 25.743}} +{"timestamp": "2025-07-31 12:07:15.863322", "side": "BUY", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.1402, "total_value": 5.7010000000000005, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 20.0, "ZUSD": 20.041999999999998}} +{"timestamp": "2025-07-31 12:07:16.809399", "side": "SELL", "pair": "EURQ/USD", "quantity": 0.99, "price": 1.143, "total_value": 1.13157, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 19.01, "ZUSD": 21.173569999999998}} +{"timestamp": "2025-07-31 12:09:36.827902", "side": "SELL", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.1431, "total_value": 5.7155000000000005, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 14.010000000000002, "ZUSD": 26.889069999999997}} +{"timestamp": "2025-07-31 12:13:05.226875", "side": "BUY", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.1402, "total_value": 5.7010000000000005, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 19.01, "ZUSD": 21.188069999999996}} +{"timestamp": "2025-07-31 12:15:54.174019", "side": "BUY", "pair": "EURQ/USD", "quantity": 1.4642, "price": 1.1402, "total_value": 1.66948084, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 20.474200000000003, "ZUSD": 19.518589159999998}} +{"timestamp": "2025-07-31 12:18:02.068369", "side": "SELL", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.1431, "total_value": 5.7155000000000005, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 15.474200000000003, "ZUSD": 25.234089159999996}} +{"timestamp": "2025-07-31 12:19:12.499042", "side": "BUY", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.1401, "total_value": 5.7005, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 20.474200000000003, "ZUSD": 19.53358916}} +{"timestamp": "2025-07-31 12:20:58.785230", "side": "SELL", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.1436, "total_value": 5.718, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 15.474200000000003, "ZUSD": 25.251589159999998}} +{"timestamp": "2025-07-31 12:22:08.372194", "side": "BUY", "pair": "EURQ/USD", "quantity": 5.0, "price": 1.1402, "total_value": 5.7010000000000005, "account_id": 1, "base_currency": "EURQ", "quote_currency": "ZUSD", "balances_after": {"EURQ": 20.474200000000003, "ZUSD": 19.550589159999998}} diff --git a/kraken_ws/account.py b/kraken_ws/account.py index 41f9761..5322f4d 100644 --- a/kraken_ws/account.py +++ b/kraken_ws/account.py @@ -8,7 +8,7 @@ import json import logging import decimal -from typing import Dict, List, Optional, Any, Union, Callable +from typing import Dict, List, Optional, Any, Union, Callable, Awaitable from pathlib import Path import yaml import websockets @@ -31,31 +31,32 @@ def __init__(self, api_key: Optional[str] = None, api_secret: Optional[str] = No self.api_secret = None self._load_credentials(api_key, api_secret) - self._session = None - self._ws_connection = None + self._session: Optional[aiohttp.ClientSession] = None + self._ws_connection: Optional[websockets.WebSocketClientProtocol] = None self._ws_authenticated = False - self._auth_token = None + self._auth_token: Optional[str] = None self._pending_requests: Dict[int, asyncio.Future] = {} self._message_handler_task: Optional[asyncio.Task] = None self._connection_lock = asyncio.Lock() + self._cancel_timer_task: Optional[asyncio.Task] = None # Add handlers for private data streams - self._private_handlers: Dict[str, List[Callable]] = {} + self._private_handlers: Dict[str, List[Callable[[Dict], Awaitable[None]]]] = {} self._subscriptions: Dict[str, Dict] = {} # Cancel on disconnect settings self._cancel_on_disconnect_enabled = False - self._cancel_on_disconnect_timeout = None + self._cancel_on_disconnect_timeout: Optional[int] = None @classmethod - async def create(cls, api_key: Optional[str] = None, api_secret: Optional[str] = None): + async def create(cls, api_key: Optional[str] = None, api_secret: Optional[str] = None) -> 'KrakenAccount': """Factory method to create and connect a KrakenAccount instance using v2.""" account = cls(api_key, api_secret) await account.connect_v2() return account - def _load_credentials(self, api_key: Optional[str], api_secret: Optional[str]): + def _load_credentials(self, api_key: Optional[str], api_secret: Optional[str]) -> None: """Loads API credentials from a config file or from provided parameters.""" current_file = Path(__file__) config_path = current_file.parent.parent / "config" / "config.yaml" @@ -146,53 +147,72 @@ async def _make_rest_request(self, endpoint: str, data: Optional[Dict] = None) - return result - async def connect_v2(self): + async def connect_v2(self) -> None: """Establishes and authenticates the WebSocket v2 connection.""" async with self._connection_lock: - if self._ws_connection: + if self._ws_connection:# and not self._ws_connection.closed: logger.info("WebSocket v2 connection already established.") return try: + # Close any existing connection + if self._ws_connection:# and not self._ws_connection.closed: + await self._ws_connection.close() + + # Get new auth token self._auth_token = await self._get_ws_auth_token() + # Establish new connection self._ws_connection = await websockets.connect( self.WS_URL_V2, ping_interval=20, - ping_timeout=10 + ping_timeout=10, + close_timeout=1 ) self._ws_authenticated = True - if self._message_handler_task: + # Cancel any existing message handler + if self._message_handler_task and not self._message_handler_task.done(): self._message_handler_task.cancel() + try: + await self._message_handler_task + except asyncio.CancelledError: + pass + # Start new message handler self._message_handler_task = asyncio.create_task(self._handle_ws_messages_v2()) logger.info("WebSocket v2 connection established and authenticated.") + # Restore subscriptions if any + if self._subscriptions: + logger.info("Restoring active subscriptions...") + for sub in self._subscriptions.values(): + await self._send_subscription_v2(sub) + except Exception as e: logger.error(f"Failed to establish WebSocket v2 connection: {e}") self._ws_authenticated = False - if self._ws_connection: + if self._ws_connection:# and not self._ws_connection.closed: await self._ws_connection.close() raise # Legacy method for backward compatibility - async def connect(self): + async def connect(self) -> None: """Legacy connect method - redirects to v2.""" await self.connect_v2() - def add_handler(self, event_type: str, handler: Callable): + def add_handler(self, event_type: str, handler: Callable[[Dict], Awaitable[None]]) -> None: """Add a message handler for a specific private data type (e.g., 'executions').""" if event_type not in self._private_handlers: self._private_handlers[event_type] = [] self._private_handlers[event_type].append(handler) logger.info(f"Added handler for private {event_type}") - async def _handle_ws_messages_v2(self): + async def _handle_ws_messages_v2(self) -> None: """Continuously listens for and processes incoming WebSocket v2 messages.""" try: - async for message in self._ws_connection: + async for message in self._ws_connection: # type: ignore try: data = json.loads(message) logger.debug(f"WS v2 Recv: {data}") @@ -235,17 +255,27 @@ async def _handle_ws_messages_v2(self): # Handle ping messages elif isinstance(data, dict) and data.get("method") == "ping": pong_message = {"method": "pong", "req_id": data.get("req_id")} - await self._ws_connection.send(json.dumps(pong_message)) + await self._ws_connection.send(json.dumps(pong_message)) # type: ignore logger.debug("Sent pong response") except json.JSONDecodeError: logger.warning(f"Failed to decode WebSocket message: {message}") + except Exception as e: + logger.error(f"Error processing WebSocket message: {e}") - except websockets.exceptions.ConnectionClosed: - logger.warning("WebSocket v2 connection closed.") + except websockets.exceptions.ConnectionClosed as e: + logger.warning(f"WebSocket v2 connection closed: {e}") # Trigger cancel on disconnect if enabled if self._cancel_on_disconnect_enabled: logger.info("Connection lost - cancel_on_disconnect is enabled, orders should be cancelled automatically") + + # Attempt to reconnect + logger.info("Attempting to reconnect...") + try: + await self.connect_v2() + except Exception as reconnect_error: + logger.error(f"Failed to reconnect: {reconnect_error}") + except Exception as e: logger.error(f"Error in WebSocket v2 message handler: {e}", exc_info=True) finally: @@ -256,14 +286,15 @@ async def _handle_ws_messages_v2(self): future.set_exception(Exception("WebSocket connection lost.")) self._pending_requests.clear() - def connected(self): - if not self._ws_authenticated or not self._ws_connection: + def connected(self) -> bool: + """Check if the WebSocket connection is active and authenticated.""" + if not self._ws_authenticated or not self._ws_connection:# or self._ws_connection.closed: return False return True async def _send_request_v2(self, payload: Dict, timeout: float = 10.0) -> Dict: """Sends a request over WebSocket v2 and waits for a response.""" - if not self._ws_authenticated or not self._ws_connection: + if not self.connected(): raise ConnectionError("WebSocket is not connected. Call connect_v2() first.") req_id = int(time.time() * 1000) @@ -277,14 +308,19 @@ async def _send_request_v2(self, payload: Dict, timeout: float = 10.0) -> Dict: future = asyncio.get_running_loop().create_future() self._pending_requests[req_id] = future - await self._ws_connection.send(json.dumps(payload)) - logger.debug(f"WS v2 Sent: {payload}") - - return await asyncio.wait_for(future, timeout=timeout) + try: + await self._ws_connection.send(json.dumps(payload)) # type: ignore + logger.debug(f"WS v2 Sent: {payload}") + + return await asyncio.wait_for(future, timeout=timeout) + except Exception as e: + if req_id in self._pending_requests: + del self._pending_requests[req_id] + raise - async def _send_subscription_v2(self, subscription: Dict): + async def _send_subscription_v2(self, subscription: Dict) -> None: """Sends a subscription message to the private WebSocket v2.""" - if not self._ws_authenticated or not self._ws_connection: + if not self.connected(): raise ConnectionError("WebSocket is not connected. Call connect_v2() first.") # Add req_id and token @@ -295,7 +331,7 @@ async def _send_subscription_v2(self, subscription: Dict): subscription["params"] = {} subscription["params"]["token"] = self._auth_token - await self._ws_connection.send(json.dumps(subscription)) + await self._ws_connection.send(json.dumps(subscription)) # type: ignore logger.debug(f"WS v2 Subscription Sent: {subscription}") # --- Cancel All Orders After (Dead Man's Switch) Methods --- @@ -426,7 +462,7 @@ async def reset_timer_loop(): while True: await asyncio.sleep(reset_interval) - if self._ws_authenticated and self._ws_connection: + if self.connected(): try: result = await self.set_cancel_all_orders_after(timeout) logger.debug(f"Timer reset successful: {result.get('result', {}).get('triggerTime', 'N/A')}") @@ -439,25 +475,34 @@ async def reset_timer_loop(): logger.info("Cancel timer auto-reset task cancelled") # Try to disable the timer on cancellation try: - if self._ws_authenticated and self._ws_connection: + if self.connected(): await self.set_cancel_all_orders_after(0) logger.info("Disabled cancel timer on task cancellation") - except: + except Exception: pass raise except Exception as e: logger.error(f"Error in cancel timer task: {e}") - return asyncio.create_task(reset_timer_loop()) + # Cancel any existing task + if self._cancel_timer_task and not self._cancel_timer_task.done(): + self._cancel_timer_task.cancel() + try: + await self._cancel_timer_task + except asyncio.CancelledError: + pass + + self._cancel_timer_task = asyncio.create_task(reset_timer_loop()) + return self._cancel_timer_task # --- Private Data Subscriptions (v2 format) --- async def subscribe_own_trades(self, - snap_trades: bool = True, - snap_orders: bool = False, - consolidate_taker: bool = True, - ratecounter: bool = False, - handler: Optional[Callable] = None): + snap_trades: bool = True, + snap_orders: bool = False, + consolidate_taker: bool = True, + ratecounter: bool = False, + handler: Optional[Callable[[Dict], Awaitable[None]]] = None) -> None: """ Subscribe to own trades data stream using v2 format. @@ -487,11 +532,11 @@ async def subscribe_own_trades(self, logger.info(f"Subscribed to executions (snap_trades={snap_trades}, snap_orders={snap_orders}, consolidate_taker={consolidate_taker})") async def subscribe_open_orders(self, - snap_trades: bool = False, - snap_orders: bool = True, - consolidate_taker: bool = True, - ratecounter: bool = False, - handler: Optional[Callable] = None): + snap_trades: bool = False, + snap_orders: bool = True, + consolidate_taker: bool = True, + ratecounter: bool = False, + handler: Optional[Callable[[Dict], Awaitable[None]]] = None) -> None: """ Subscribe to open orders data stream using v2 format. Note: This uses the same 'executions' channel but focuses on orders. @@ -514,7 +559,7 @@ async def subscribe_open_orders(self, self._subscriptions['executions_orders'] = subscription logger.info(f"Subscribed to executions for orders (snap_trades={snap_trades}, snap_orders={snap_orders})") - async def unsubscribe_own_trades(self): + async def unsubscribe_own_trades(self) -> None: """Unsubscribe from own trades data stream using v2 format.""" if 'executions' not in self._subscriptions: logger.warning("Not subscribed to executions") @@ -532,7 +577,7 @@ async def unsubscribe_own_trades(self): self._subscriptions.pop('executions_orders', None) # Remove both if present logger.info("Unsubscribed from executions") - async def unsubscribe_open_orders(self): + async def unsubscribe_open_orders(self) -> None: """Unsubscribe from open orders data stream using v2 format.""" # Same as unsubscribe_own_trades since they use the same channel await self.unsubscribe_own_trades() @@ -731,21 +776,9 @@ async def edit_order(self, txid: str, volume: Optional[str] = None, **kwargs ) - async def cancel_order(self, order_id): - """ - Async version of cancel_order that makes the HTTP request - """ - try: - # Use aiohttp or similar async HTTP client here - async with self.session.post( - f"{self.base_url}/cancel_order", - json={"txid": order_id}, - headers=self._get_auth_headers() - ) as response: - return await response.json() - except Exception as e: - print(f"Error canceling order {order_id}: {e}") - return {"status": "error", "errorMessage": str(e)} + async def cancel_order(self, order_id: str) -> Dict: + """Cancel an order using v2 API format (legacy method).""" + return await self.cancel_order_v2(order_id=order_id) async def cancel_all_orders(self) -> Dict: """Legacy method - cancels all open orders.""" @@ -819,30 +852,46 @@ async def query_trades_info(self, txid: Union[str, List[str]]) -> Dict: result = await self._make_rest_request(f"/{self.API_VERSION}/private/QueryTrades", data) return result['result'] - async def close(self): + async def close(self) -> None: """Closes the WebSocket connection and cleans up resources.""" - try: - self.cancel_all_orders() - except: - pass - - if self._message_handler_task: - self._message_handler_task.cancel() - try: - await self._message_handler_task - except asyncio.CancelledError: - pass - - if self._ws_connection: - await self._ws_connection.close() - logger.info("WebSocket v2 connection closed.") - - if self._session and not self._session.closed: - await self._session.close() + try: + # Cancel the timer task if running + if self._cancel_timer_task and not self._cancel_timer_task.done(): + self._cancel_timer_task.cancel() + try: + await self._cancel_timer_task + except asyncio.CancelledError: + pass + + # Cancel the message handler task + if self._message_handler_task and not self._message_handler_task.done(): + self._message_handler_task.cancel() + try: + await self._message_handler_task + except asyncio.CancelledError: + pass + + # Close WebSocket connection + if self._ws_connection:# and not self._ws_connection.closed: + await self._ws_connection.close() + logger.info("WebSocket v2 connection closed.") + + # Close HTTP session + if self._session and not self._session.closed: + await self._session.close() + logger.info("HTTP session closed.") + + except Exception as e: + logger.error(f"Error during cleanup: {e}") + finally: + self._ws_authenticated = False + self._auth_token = None - async def __aenter__(self): + async def __aenter__(self) -> 'KrakenAccount': await self.connect_v2() return self - async def __aexit__(self, exc_type, exc_val, exc_tb): + async def __aexit__(self, exc_type: Optional[type], + exc_val: Optional[Exception], + exc_tb: Optional[Any]) -> None: await self.close() \ No newline at end of file diff --git a/src/apps/strategies/example.py b/src/apps/strategies/example.py index 2a20c30..96a0399 100644 --- a/src/apps/strategies/example.py +++ b/src/apps/strategies/example.py @@ -1,21 +1,30 @@ import sys +import os -def main(): - if len(sys.argv) < 2: - print("Usage: example.py [args...]") - sys.exit(1) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..", "clients"))) +from kraken_python_client import KrakenPythonClient - exchange = sys.argv[1] - args = sys.argv[2:] +client = KrakenPythonClient( + api_key = "vWofAmTITP1fEEM8TxWaOmeXCIrHi3yfXF5mTzPKTe+OzPiRAQBjrca7", + api_secret = "i8Sbg7N8WsnOYVvF61Vdr38eimYE0OKZxuz9x3DOyLQjov9KhCmZGyn1hitgiKIFw2JS01B4OA2x9nS4C+bXKA==") - print(f"Running strategy on exchange: {exchange}") - if args: - print("Received arguments:") - for i, arg in enumerate(args, start=1): - print(f" Arg {i}: {arg}") - else: - print("No additional arguments provided.") +def main(): + + print('___1___') + my_recent_trade_data = client.get_my_recent_trades('EURQ/USD'.replace('/', '')) + most_recent = my_recent_trade_data.iloc[-1] + my_recent_trades = { + "side": most_recent.get('side'), + "price": float(most_recent.get('price')), + "qty": float(most_recent.get('volume')), + "timestamp": most_recent.get('time') + } + + print(my_recent_trades) + # print('___2___') + # print(client.get_my_recent_trades('EURQUSD')) + if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/clients/kraken_python_client.py b/src/clients/kraken_python_client.py index b5bfa21..8f20d95 100644 --- a/src/clients/kraken_python_client.py +++ b/src/clients/kraken_python_client.py @@ -10,10 +10,12 @@ For lowest latency, use rust_kraken_client directly """ class KrakenPythonClient: - def __init__(self,asset='XBTUSD',error_message=False): + def __init__(self, asset='XBTUSD', error_message=False, api_key=None, api_secret=None): self.asset = asset self.error_message = error_message self.base_url = "https://api.kraken.com/0/public" + self.api_key = api_key + self.api_secret = api_secret def test_connection(self): try: @@ -34,10 +36,10 @@ def get_bid(self,asset='XBTUSD',index=0): if self.error_message: print(f"KrakenPythonClient.get_bid: {e}") return False - + def get_ask(self,asset='XBTUSD',index=0): """ - Get the ask price of an asset. + Get the bid price of an asset. """ try: return kraken.get_ask(asset)[index] @@ -45,244 +47,289 @@ def get_ask(self,asset='XBTUSD',index=0): return kraken.get_ask(asset) except Exception as e: if self.error_message: - print(f"KrakenPythonClient.get_ask: {e}") + print(f"KrakenPythonClient.get_bid: {e}") return False - - def get_balance(self,asset=None): + + def get_order_id(self): + pass + + def cancel_order(self,order_id): """ - Get the balance of an asset. - If asset is None, returns all balances. + Cancel an order from the Kraken API. """ try: - if asset == None: - # Returns all balances - return kraken.get_balance() - else: - # Returns specific balance - return kraken.get_balance()[asset] + return kraken.cancel_order(order_id) except Exception as e: if self.error_message: - print(f"KrakenPythonClient.get_balance: {e}") + print(f"KrakenPythonClient.cancel_order: {e}") return False - - def get_spread(self,asset='XBTUSD'): - """ - Get the spread of an asset. - """ + + def get_orderbook(self,pair): try: - return kraken.get_spread(asset) + return kraken.get_orderbook(pair) except Exception as e: if self.error_message: - print(f"KrakenPythonClient.get_spread: {e}") + print(f"KrakenPythonClient.get_orderbook: {e}") return False - - def add_order(self, asset, side, price, volume): + + def edit_order(self, txid, pair, side, price, volume, new_userref=None): """ - Add an order to the Kraken API and return a dictionary with parsed fields. + Edit an existing order on the Kraken API. + + Args: + txid (str): The transaction ID of the order to edit + pair (str): The trading pair (e.g., 'XBTUSD') + side (str): The order side ('buy' or 'sell') + price (float): The new price for the order + volume (float): The new volume for the order + new_userref (str, optional): New user reference ID for the order + + Returns: + dict: Order response containing txid and description if successful + bool: False if the operation fails """ try: - order_response = kraken.add_order(asset, side, price, volume) - return { - "txid": order_response.txid[0] if isinstance(order_response.txid, list) else order_response.txid, - "description": order_response.description - } + order_response = kraken.edit_order(txid, pair, side, price, volume, new_userref) + return order_response except Exception as e: if self.error_message: - print(f"KrakenPythonClient.add_order: {e}") + print(f"KrakenPythonClient.edit_order: {e}") return False - def get_open_orders(self, asset=None, order_type='open', headers=None): + def get_my_recent_orders(self, pair=None, since=None, count=None, userref=None): """ - Retrieve open orders with optional asset filtering and column selection. + Get recent orders for the authenticated user. Args: - asset (str, optional): Filter orders by asset pair (e.g., 'XBTUSD'). Defaults to None. - order_type (str): Type of orders to retrieve ('open'|'closed'|'both'). Defaults to 'open'. - headers (list|str, optional): Columns to return. Use '*' for all columns. Defaults to None for basic columns. - + pair (str, optional): Trading pair to filter orders (e.g., 'XBTUSD'). If None, returns all pairs. + since (str, optional): Return orders since this order ID + count (int, optional): Maximum number of orders to return + userref (str, optional): User reference ID to filter orders + Returns: - pd.DataFrame: DataFrame containing the requested orders + pd.DataFrame: DataFrame containing order history with columns: + - order_id: Order ID (txid) + - pair: Trading pair + - side: 'buy' or 'sell' + - type: Order type (market, limit, etc.) + - price: Order price + - volume: Order volume + - time: Order timestamp (datetime) + - status: Order status + - cost: Order cost + - fee: Order fee + - misc: Miscellaneous info + bool: False if the operation fails """ try: - # Get and parse orders - orders_response = kraken.get_open_orders_raw() - orders_data = json.loads(orders_response) - orders_dict = orders_data.get('result', {}).get(order_type, {}) + # Get orders from the rust client + orders_response = kraken.get_open_orders(pair, since, count, userref) + + # Parse the response if it's JSON string + if isinstance(orders_response, str): + orders_data = json.loads(orders_response) + else: + orders_data = orders_response + + # Extract orders from the result + if hasattr(orders_data, 'result'): + result = orders_data.result + elif isinstance(orders_data, dict) and 'result' in orders_data: + result = orders_data['result'] + else: + result = orders_data + + # Get the orders dictionary + if hasattr(result, 'open'): + orders_dict = result.open + elif isinstance(result, dict) and 'open' in result: + orders_dict = result['open'] + else: + # Assume the result itself is the orders dictionary + orders_dict = result if not orders_dict: - no_orders = pd.DataFrame([{ - 'order_id': None, - 'descr_pair': asset, - 'descr_type': 'buy', - 'descr_price': 0.0, - 'vol': 0.0, - 'vol_exec': 0.0 - }]) - return no_orders - + # Return empty DataFrame with expected columns + return pd.DataFrame(columns=[ + 'order_id', 'pair', 'side', 'type', 'price', 'volume', 'time', + 'status', 'cost', 'fee', 'misc' + ]) + # Convert to DataFrame - df = ( - pd.DataFrame.from_dict(orders_dict, orient='index') - .reset_index() - .rename(columns={'index': 'order_id'}) - ) - - # Expand description fields + df = pd.DataFrame.from_dict(orders_dict, orient='index').reset_index() + df = df.rename(columns={'index': 'order_id'}) + + # Extract nested order description if it exists if 'descr' in df.columns: descr_df = pd.json_normalize(df['descr']) - descr_df.columns = [f"descr_{col}" for col in descr_df.columns] df = pd.concat([df.drop('descr', axis=1), descr_df], axis=1) - - # Filter by asset if specified - if asset is not None and 'descr_pair' in df.columns: - df = df[df['descr_pair'] == asset.upper()] - - # Convert data types - numeric_cols = ['vol', 'vol_exec', 'cost', 'fee', 'price', 'stopprice', 'limitprice'] + + # Standardize column names and data types + if 'type' in df.columns and 'side' not in df.columns: + df['side'] = df['type'] + + # Convert numeric columns + numeric_cols = ['price', 'vol', 'cost', 'fee'] for col in numeric_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') - - time_cols = ['opentm', 'starttm', 'expiretm'] - for col in time_cols: - if col in df.columns: - df[col] = pd.to_datetime(df[col], unit='s', errors='coerce') - - # Select columns to return - if headers is None: - return df[['order_id', 'descr_pair', 'descr_type', 'descr_price', 'vol', 'vol_exec']] - elif headers == '*' or headers == []: - return df - else: - # Validate requested headers exist - valid_headers = [h for h in headers if h in df.columns] - return df[valid_headers] + + # Rename volume column if it exists + if 'vol' in df.columns: + df['volume'] = df['vol'] + + # Convert time to datetime + if 'opentm' in df.columns: + df['time'] = pd.to_datetime(df['opentm'], unit='s', errors='coerce') + elif 'time' in df.columns: + df['time'] = pd.to_datetime(df['time'], unit='s', errors='coerce') + + # Add status column + if 'status' not in df.columns: + df['status'] = 'open' + + # Filter by pair if specified + if pair is not None and 'pair' in df.columns: + df = df[df['pair'] == pair.upper()] + + # Select and order columns to match expected format + expected_cols = ['order_id', 'pair', 'side', 'type', 'price', 'volume', 'time', 'status', 'cost', 'fee', 'misc'] + available_cols = [col for col in expected_cols if col in df.columns] + + # Add missing columns with default values + for col in expected_cols: + if col not in df.columns: + df[col] = None + + return df[expected_cols] + except Exception as e: if self.error_message: - print(f"KrakenPythonClient.get_open_orders: {e}") + print(f"KrakenPythonClient.get_my_recent_orders: {e}") return False - - def get_recent_trades(self, pair, since=None, count=None): + + def get_my_recent_trades(self, pair=None, since=None, count=None): """ - Get recent trades for a trading pair. + Get recent trades history for the authenticated user using Kraken REST API. Args: - pair (str): Trading pair (e.g., 'XBTUSD') + pair (str, optional): Trading pair to filter trades (e.g., 'XBTUSD'). If None, returns all pairs. since (str, optional): Return trades since this trade ID - count (int, optional): Maximum number of trades to return (default: 1000) + count (int, optional): Maximum number of trades to return Returns: - dict: Dictionary containing: - - 'trades': List of trades, each containing [price, volume, time, buy/sell, market/limit, miscellaneous] - - 'last': Trade ID to use for subsequent calls to get newer trades + pd.DataFrame: DataFrame containing trade history with columns: + - trade_id: Trade ID + - pair: Trading pair + - side: 'buy' or 'sell' + - price: Trade price + - volume: Trade volume + - time: Trade timestamp (datetime) + - cost: Trade cost + - fee: Trade fee + - margin: Margin (if applicable) + - misc: Miscellaneous info bool: False if the operation fails """ try: - # Prepare parameters - params = {'pair': pair} - if since is not None: - params['since'] = since - if count is not None: - params['count'] = count - - # Make request to Kraken API - response = requests.get(f"{self.base_url}/Trades", params=params) + # This requires API credentials - you'll need to add them to your class + if not hasattr(self, 'api_key') or not hasattr(self, 'api_secret'): + if self.error_message: + print("KrakenPythonClient.get_my_recent_trades: API credentials required") + return False + + import urllib.parse + import hashlib + import hmac + import base64 + + # Kraken private API endpoint + url = "https://api.kraken.com/0/private/TradesHistory" + + # Build parameters + data = { + 'nonce': str(int(time.time() * 1000)) + } + + if pair: + data['pair'] = pair + if since: + data['start'] = since + if count: + data['count'] = count + + # Create signature for authentication + postdata = urllib.parse.urlencode(data) + encoded = (str(data['nonce']) + postdata).encode() + message = '/0/private/TradesHistory'.encode() + hashlib.sha256(encoded).digest() + + mac = hmac.new(base64.b64decode(self.api_secret), message, hashlib.sha512) + sigdigest = base64.b64encode(mac.digest()) + + headers = { + 'API-Key': self.api_key, + 'API-Sign': sigdigest.decode() + } + + # Make the request + response = requests.post(url, headers=headers, data=data) response.raise_for_status() - data = response.json() + trades_data = response.json() - # Check for API errors - if data.get('error'): + if trades_data.get('error'): if self.error_message: - print(f"KrakenPythonClient.get_recent_trades: API Error - {data['error']}") + print(f"KrakenPythonClient.get_my_recent_trades: API Error: {trades_data['error']}") return False - result = data.get('result', {}) + # Extract trades from the result + result = trades_data.get('result', {}) + trades_dict = result.get('trades', {}) - # Extract trades data - trades_data = None - last_id = None + if not trades_dict: + # Return empty DataFrame with expected columns + return pd.DataFrame(columns=[ + 'trade_id', 'pair', 'side', 'price', 'volume', 'time', + 'cost', 'fee', 'margin', 'misc' + ]) - for key, value in result.items(): - if key == 'last': - last_id = value - else: - # The trades are stored under the pair name key - trades_data = value + # Convert to DataFrame + df = pd.DataFrame.from_dict(trades_dict, orient='index').reset_index() + df = df.rename(columns={'index': 'trade_id'}) - if trades_data is None: - if self.error_message: - print(f"KrakenPythonClient.get_recent_trades: No trades data found") - return False + # Standardize column names and data types + if 'type' in df.columns: + df['side'] = df['type'] - # Convert trades to more readable format - formatted_trades = [] - for trade in trades_data: - formatted_trades.append({ - 'price': float(trade[0]), - 'volume': float(trade[1]), - 'time': pd.to_datetime(float(trade[2]), unit='s'), - 'side': 'buy' if trade[3] == 'b' else 'sell', - 'type': 'market' if trade[4] == 'm' else 'limit', - 'miscellaneous': trade[5] if len(trade) > 5 else '' - }) - - return { - 'trades': formatted_trades, - 'last': last_id - } + # Convert numeric columns + numeric_cols = ['price', 'vol', 'cost', 'fee', 'margin'] + for col in numeric_cols: + if col in df.columns: + df[col] = pd.to_numeric(df[col], errors='coerce') - except requests.exceptions.RequestException as e: - if self.error_message: - print(f"KrakenPythonClient.get_recent_trades: Request Error - {e}") - return False - except Exception as e: - if self.error_message: - print(f"KrakenPythonClient.get_recent_trades: {e}") - return False - - def get_order_id(self): - pass - - def cancel_order(self,order_id): - """ - Cancel an order from the Kraken API. - """ - try: - return kraken.cancel_order(order_id) - except Exception as e: - if self.error_message: - print(f"KrakenPythonClient.cancel_order: {e}") - return False - - def get_orderbook(self,pair): - try: - return kraken.get_orderbook(pair) - except Exception as e: - if self.error_message: - print(f"KrakenPythonClient.get_orderbook: {e}") - return False - - def edit_order(self, txid, pair, side, price, volume, new_userref=None): - """ - Edit an existing order on the Kraken API. - - Args: - txid (str): The transaction ID of the order to edit - pair (str): The trading pair (e.g., 'XBTUSD') - side (str): The order side ('buy' or 'sell') - price (float): The new price for the order - volume (float): The new volume for the order - new_userref (str, optional): New user reference ID for the order + # Rename volume column if it exists + if 'vol' in df.columns: + df['volume'] = df['vol'] + + # Convert time to datetime + if 'time' in df.columns: + df['time'] = pd.to_datetime(df['time'], unit='s', errors='coerce') + + # Filter by pair if specified (additional filtering after API call) + if pair is not None and 'pair' in df.columns: + df = df[df['pair'] == pair.upper()] + + # Select and order columns to match expected format + expected_cols = ['trade_id', 'pair', 'side', 'price', 'volume', 'time', 'cost', 'fee', 'margin', 'misc'] + + # Add missing columns with default values + for col in expected_cols: + if col not in df.columns: + df[col] = None + + return df[expected_cols] - Returns: - dict: Order response containing txid and description if successful - bool: False if the operation fails - """ - try: - order_response = kraken.edit_order(txid, pair, side, price, volume, new_userref) - return order_response except Exception as e: if self.error_message: - print(f"KrakenPythonClient.edit_order: {e}") + print(f"KrakenPythonClient.get_my_recent_trades: {e}") return False \ No newline at end of file From 416883fa5db9fa80f6e8725639bb89645305f841 Mon Sep 17 00:00:00 2001 From: Ryan Watts Date: Thu, 31 Jul 2025 12:23:58 -0600 Subject: [PATCH 2/2] Bump version to 2.1.6 in settings.yaml Updated the program version from 2.1.5 to 2.1.6 in the settings configuration file. --- resources/data/settings/settings.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resources/data/settings/settings.yaml b/resources/data/settings/settings.yaml index c98f773..fb995db 100644 --- a/resources/data/settings/settings.yaml +++ b/resources/data/settings/settings.yaml @@ -1,4 +1,4 @@ program: name: "TradeByte" - version: "2.1.5" + version: "2.1.6" debug: false