From 64b6f6228ec55233013f908c3151433dec5a50d5 Mon Sep 17 00:00:00 2001 From: alex101xela Date: Thu, 30 Oct 2025 16:32:35 +0400 Subject: [PATCH] Simplify WS example --- examples/03_subscribe_to_stream.py | 56 ++++++++++++++++ examples/stream_example.py | 102 ----------------------------- 2 files changed, 56 insertions(+), 102 deletions(-) create mode 100644 examples/03_subscribe_to_stream.py delete mode 100644 examples/stream_example.py diff --git a/examples/03_subscribe_to_stream.py b/examples/03_subscribe_to_stream.py new file mode 100644 index 0000000..a60c69e --- /dev/null +++ b/examples/03_subscribe_to_stream.py @@ -0,0 +1,56 @@ +import asyncio +import logging +from asyncio import run +from signal import SIGINT, SIGTERM + +from examples.init_env import init_env +from x10.perpetual.configuration import MAINNET_CONFIG +from x10.perpetual.stream_client import PerpetualStreamClient + +LOGGER = logging.getLogger() +ENDPOINT_CONFIG = MAINNET_CONFIG + + +async def subscribe_to_streams(stop_event: asyncio.Event): + env_config = init_env() + stream_client = PerpetualStreamClient(api_url=ENDPOINT_CONFIG.stream_url) + + async def subscribe_to_orderbook(): + async with stream_client.subscribe_to_orderbooks("BTC-USD") as orderbook_stream: + while not stop_event.is_set(): + try: + msg = await asyncio.wait_for(orderbook_stream.recv(), timeout=1) + LOGGER.info("Orderbook: %s#%s", msg.type, msg.seq) + except asyncio.TimeoutError: + pass + + async def subscribe_to_account(): + async with stream_client.subscribe_to_account_updates(env_config.api_key) as account_stream: + while not stop_event.is_set(): + try: + msg = await asyncio.wait_for(account_stream.recv(), timeout=1) + LOGGER.info("Account: %s#%s", msg.type, msg.seq) + except asyncio.TimeoutError: + pass + + LOGGER.info("Press Ctrl+C to stop") + + await asyncio.gather(subscribe_to_orderbook(), subscribe_to_account()) + + +async def run_example(): + stop_event = asyncio.Event() + loop = asyncio.get_running_loop() + + def signal_handler(): + LOGGER.info("Signal received, stopping...") + stop_event.set() + + loop.add_signal_handler(SIGINT, signal_handler) + loop.add_signal_handler(SIGTERM, signal_handler) + + await subscribe_to_streams(stop_event) + + +if __name__ == "__main__": + run(main=run_example()) diff --git a/examples/stream_example.py b/examples/stream_example.py deleted file mode 100644 index 0fb0fd9..0000000 --- a/examples/stream_example.py +++ /dev/null @@ -1,102 +0,0 @@ -import asyncio -import logging -import signal -from typing import Any - -from x10.perpetual.configuration import TESTNET_CONFIG -from x10.perpetual.stream_client import PerpetualStreamClient - -API_KEY = "" - - -async def iterator_example(): - logger = logging.getLogger("stream_example[iterator_example]") - stream_client = PerpetualStreamClient(api_url=TESTNET_CONFIG.stream_url) - stream = await stream_client.subscribe_to_account_updates(API_KEY) - - async for event in stream: - logger.info(event) - - -async def manual_example(): - logger = logging.getLogger("stream_example[manual_example]") - stream_client = PerpetualStreamClient(api_url=TESTNET_CONFIG.stream_url) - stream = await stream_client.subscribe_to_account_updates(API_KEY) - - event1 = await stream.recv() - event2 = await stream.recv() - - logger.info("Event #1: %s", event1) - logger.info("Event #2: %s", event2) - - # etc - - await stream.close() - - -async def context_manager_example(): - logger = logging.getLogger("stream_example[context_manager_example]") - stream_client = PerpetualStreamClient(api_url=TESTNET_CONFIG.stream_url) - - async with stream_client.subscribe_to_orderbooks("BTC-USD") as stream: - msg1 = await stream.recv() - msg2 = await stream.recv() - - logger.info("Message #1: %s", msg1) - logger.info("Message #2: %s", msg2) - - # etc - - -async def merge_streams_example(): - logger = logging.getLogger("stream_example[merge_streams_example]") - stop_event = asyncio.Event() - - def sigint_handler(sig, frame): - logger.info("Interrupted by the user, stopping...") - stop_event.set() - - signal.signal(signal.SIGINT, sigint_handler) - - stream_client = PerpetualStreamClient(api_url=TESTNET_CONFIG.stream_url) - queue: asyncio.Queue[tuple[str, Any]] = asyncio.Queue() - - async def run_producer_stream1(): - async with stream_client.subscribe_to_orderbooks("BTC-USD") as stream1: - while not stop_event.is_set(): - msg = await asyncio.wait_for(stream1.recv(), timeout=5) - await queue.put(("stream1", msg)) - - if stream1.msgs_count == 5: - logger.info("Stream #1 produced 5 messages, stopping...") - break - - async def run_producer_stream2(): - async with stream_client.subscribe_to_account_updates(API_KEY) as stream2: - while not stop_event.is_set(): - msg = await asyncio.wait_for(stream2.recv(), timeout=5) - await queue.put(("stream2", msg)) - - if stream2.msgs_count == 3: - logger.info("Stream #2 produced 3 messages, stopping...") - break - - async def run_consumer(): - while not stop_event.is_set(): - try: - msg = await asyncio.wait_for(queue.get(), timeout=5) - logger.info("Message: %s", msg) - queue.task_done() - except asyncio.TimeoutError: - logger.info("No messages received in the last 5 seconds, stopping...") - break - - await asyncio.gather(run_producer_stream1(), run_producer_stream2(), run_consumer()) - - -async def main(): - await iterator_example() - - -if __name__ == "__main__": - asyncio.run(main=main())