From 27b22bd786f6b6534d698481c30548ea0cd4b2da Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Mon, 16 Jun 2025 13:10:23 -0500 Subject: [PATCH 1/4] prep for import --- README.md | 46 ++++++++++++++++++------------- example_app.py | 12 ++++---- intriniorealtime/client.py | 42 ++++++++++++++++++---------- intriniorealtime/replay_client.py | 16 +++++++---- setup.py | 4 +-- 5 files changed, 74 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index 1f0cc64..350d441 100644 --- a/README.md +++ b/README.md @@ -30,31 +30,37 @@ pip install intriniorealtime ``` ## Example Usage + ```python import threading import time -from threading import Timer,Thread,Event -from intriniorealtime.client import IntrinioRealtimeClient +from threading import Timer, Thread, Event +from intriniorealtime.client import IntrinioRealtimeEquitiesClient trade_count = 0 ask_count = 0 bid_count = 0 backlog_count = 0 + def on_quote(quote, backlog): - global ask_count - global bid_count - global backlog_count - backlog_count = backlog - if 'type' in quote.__dict__: - if quote.type == "ask": ask_count += 1 - else: bid_count += 1 + global ask_count + global bid_count + global backlog_count + backlog_count = backlog + if 'type' in quote.__dict__: + if quote.type == "ask": + ask_count += 1 + else: + bid_count += 1 + + +def on_trade(trade, backlog): + global trade_count + global backlog_count + backlog_count = backlog + trade_count += 1 -def on_trade(trade, backlog): - global trade_count - global backlog_count - backlog_count = backlog - trade_count += 1 class Summarize(threading.Thread): def __init__(self, event): @@ -68,16 +74,18 @@ class Summarize(threading.Thread): global ask_count global backlog_count while not self.stopped.wait(5): - print("trades: " + str(trade_count) + "; asks: " + str(ask_count) + "; bids: " + str(bid_count) + "; backlog: " + str(backlog_count)) + print("trades: " + str(trade_count) + "; asks: " + str(ask_count) + "; bids: " + str( + bid_count) + "; backlog: " + str(backlog_count)) + options = { 'api_key': 'API_KEY_HERE', - 'provider': 'REALTIME' # REALTIME or DELAYED_SIP or NASDAQ_BASIC + 'provider': 'REALTIME' # REALTIME or DELAYED_SIP or NASDAQ_BASIC } -client = IntrinioRealtimeClient(options, on_trade, on_quote) -client.join(['AAPL','GE','MSFT']) -#client.join(['lobby']) +client = IntrinioRealtimeEquitiesClient(options, on_trade, on_quote) +client.join(['AAPL', 'GE', 'MSFT']) +# client.join(['lobby']) client.connect() stopFlag = Event() summarize_thread = Summarize(stopFlag) diff --git a/example_app.py b/example_app.py index b07d4b3..7832be3 100644 --- a/example_app.py +++ b/example_app.py @@ -4,10 +4,10 @@ import sys import datetime from threading import Timer,Thread,Event -from intriniorealtime.client import IntrinioRealtimeClient -from intriniorealtime.replay_client import IntrinioReplayClient -from intriniorealtime.client import Quote -from intriniorealtime.client import Trade +from intriniorealtime.client import IntrinioRealtimeEquitiesClient +from intriniorealtime.replay_client import IntrinioReplayEquitiesClient +from intriniorealtime.client import EquitiesQuote +from intriniorealtime.client import EquitiesTrade trade_count = 0 ask_count = 0 @@ -19,7 +19,7 @@ def on_quote(quote, backlog): global bid_count global backlog_count backlog_count = backlog - if isinstance(quote, Quote) and 'type' in quote.__dict__: + if isinstance(quote, EquitiesQuote) and 'type' in quote.__dict__: if quote.type == "ask": ask_count += 1 else: bid_count += 1 @@ -58,7 +58,7 @@ def run(self): } -client = IntrinioRealtimeClient(options, on_trade, on_quote) +client = IntrinioRealtimeEquitiesClient(options, on_trade, on_quote) # client = IntrinioReplayClient(options, on_trade, on_quote) stop_event = Event() diff --git a/intriniorealtime/client.py b/intriniorealtime/client.py index 1873a0d..6fcc88f 100644 --- a/intriniorealtime/client.py +++ b/intriniorealtime/client.py @@ -14,7 +14,7 @@ DELAYED_SIP = "DELAYED_SIP" NASDAQ_BASIC = "NASDAQ_BASIC" MANUAL = "MANUAL" -PROVIDERS = [REALTIME, MANUAL, DELAYED_SIP, NASDAQ_BASIC] +NO_PROVIDER = "NO_PROVIDER" NO_SUBPROVIDER = "NO_SUBPROVIDER" CTA_A = "CTA_A" CTA_B = "CTA_B" @@ -22,16 +22,18 @@ OTC = "OTC" NASDAQ_BASIC = "NASDAQ_BASIC" IEX = "IEX" -SUB_PROVIDERS = [NO_SUBPROVIDER, CTA_A, CTA_B, UTP, OTC, NASDAQ_BASIC, IEX] +CBOE_ONE = "CBOE_ONE" +PROVIDERS = [REALTIME, MANUAL, DELAYED_SIP, NASDAQ_BASIC, IEX, CBOE_ONE] +SUB_PROVIDERS = [NO_SUBPROVIDER, CTA_A, CTA_B, UTP, OTC, NASDAQ_BASIC, IEX, CBOE_ONE] MAX_QUEUE_SIZE = 250000 DEBUGGING = not (sys.gettrace() is None) HEADER_MESSAGE_FORMAT_KEY = "UseNewEquitiesFormat" HEADER_MESSAGE_FORMAT_VALUE = "v2" HEADER_CLIENT_INFORMATION_KEY = "Client-Information" -HEADER_CLIENT_INFORMATION_VALUE = "IntrinioPythonSDKv5.4.3" +HEADER_CLIENT_INFORMATION_VALUE = "IntrinioPythonSDKv6.0.0" -class Quote: +class EquitiesQuote: def __init__(self, symbol, type, price, size, timestamp, subprovider, market_center, condition): self.symbol = symbol self.type = type @@ -59,7 +61,7 @@ def __str__(self): return self.symbol + ", " + self.type + ", price: " + str(self.price) + ", size: " + str(self.size) + ", timestamp: " + str(self.timestamp) + ", subprovider: " + str(self.subprovider) + ", market_center: " + str(self.market_center) + ", condition: " + str(self.condition) -class Trade: +class EquitiesTrade: def __init__(self, symbol, price, size, total_volume, timestamp, subprovider, market_center, condition): self.symbol = symbol self.price = price @@ -91,7 +93,7 @@ def is_darkpool(self): return False -class IntrinioRealtimeClient: +class IntrinioRealtimeEquitiesClient: def __init__(self, options: Dict[str, Any], on_trade: Optional[callable], on_quote: Optional[callable]): if options is None: raise ValueError("Options parameter is required") @@ -160,7 +162,7 @@ def __init__(self, options: Dict[str, Any], on_trade: Optional[callable], on_quo self.token = None self.ws = None self.quote_receiver = None - self.quote_handler = QuoteHandler(self, self.bypass_parsing) + self.quote_handler = EquitiesQuoteHandler(self, self.bypass_parsing) self.joined_channels = set() self.last_queue_warning_time = 0 self.last_self_heal_backoff = -1 @@ -171,12 +173,18 @@ def auth_url(self) -> str: if self.provider == REALTIME: auth_url = "https://realtime-mx.intrinio.com/auth" + elif self.provider == IEX: + auth_url = "https://realtime-mx.intrinio.com/auth" elif self.provider == DELAYED_SIP: auth_url = "https://realtime-delayed-sip.intrinio.com/auth" elif self.provider == NASDAQ_BASIC: auth_url = "https://realtime-nasdaq-basic.intrinio.com/auth" + elif self.provider == CBOE_ONE: + auth_url = "https://cboe-one.intrinio.com/auth" elif self.provider == MANUAL: auth_url = "http://" + self.ipaddress + "/auth" + else: + auth_url = "https://realtime-mx.intrinio.com/auth" if self.api_key: auth_url = self.api_auth_url(auth_url) @@ -194,12 +202,18 @@ def api_auth_url(self, auth_url: str) -> str: def websocket_url(self) -> str: if self.provider == REALTIME: return "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + elif self.provider == IEX: + return "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token elif self.provider == DELAYED_SIP: return "wss://realtime-delayed-sip.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token elif self.provider == NASDAQ_BASIC: return "wss://realtime-nasdaq-basic.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + elif self.provider == CBOE_ONE: + return "wss://cboe-one.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token elif self.provider == MANUAL: return "ws://" + self.ipaddress + "/socket/websocket?vsn=1.0.0&token=" + self.token + else: + return "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token def do_backoff(self): self.last_self_heal_backoff += 1 @@ -248,7 +262,7 @@ def refresh_token(self): self.logger.info("Authentication successful!") def refresh_websocket(self): - self.quote_receiver = QuoteReceiver(self) + self.quote_receiver = EquitiesQuoteReceiver(self) self.quote_receiver.start() def on_connect(self): @@ -336,7 +350,7 @@ def valid_api_key(self, api_key: str): return True -class QuoteReceiver(threading.Thread): +class EquitiesQuoteReceiver(threading.Thread): def __init__(self, client): threading.Thread.__init__(self, args=(), kwargs=None) self.daemon = True @@ -394,7 +408,7 @@ def on_message(self, ws, message): raise e -class QuoteHandler(threading.Thread): +class EquitiesQuoteHandler(threading.Thread): def __init__(self, client, bypass_parsing: bool): threading.Thread.__init__(self, args=(), kwargs=None) self.daemon = True @@ -410,7 +424,7 @@ def __init__(self, client, bypass_parsing: bool): 6: IEX, } - def parse_quote(self, quote_bytes: bytes, start_index: int = 0) -> Quote: + def parse_quote(self, quote_bytes: bytes, start_index: int = 0) -> EquitiesQuote: buffer = memoryview(quote_bytes) symbol_length = buffer[start_index + 2] symbol = buffer[(start_index + 3):(start_index + 3 + symbol_length)].tobytes().decode("ascii") @@ -425,10 +439,10 @@ def parse_quote(self, quote_bytes: bytes, start_index: int = 0) -> Quote: subprovider = self.subprovider_codes.get(buffer[3 + symbol_length + start_index], IEX) # default IEX for backward behavior consistency. market_center = buffer[(start_index + 4 + symbol_length):(start_index + 6 + symbol_length)].tobytes().decode("utf-16") - return Quote(symbol, quote_type, price, size, timestamp, subprovider, market_center, condition) + return EquitiesQuote(symbol, quote_type, price, size, timestamp, subprovider, market_center, condition) - def parse_trade(self, trade_bytes: bytes, start_index: int = 0) -> Trade: + def parse_trade(self, trade_bytes: bytes, start_index: int = 0) -> EquitiesTrade: buffer = memoryview(trade_bytes) symbol_length = buffer[start_index + 2] symbol = buffer[(start_index + 3):(start_index + 3 + symbol_length)].tobytes().decode("ascii") @@ -442,7 +456,7 @@ def parse_trade(self, trade_bytes: bytes, start_index: int = 0) -> Trade: subprovider = self.subprovider_codes.get(buffer[3 + symbol_length + start_index], IEX) # default IEX for backward behavior consistency. market_center = buffer[(start_index + 4 + symbol_length):(start_index + 6 + symbol_length)].tobytes().decode("utf-16") - return Trade(symbol, price, size, total_volume, timestamp, subprovider, market_center, condition) + return EquitiesTrade(symbol, price, size, total_volume, timestamp, subprovider, market_center, condition) def parse_message(self, message_bytes: bytes, start_index: int, backlog_len: int) -> int: diff --git a/intriniorealtime/replay_client.py b/intriniorealtime/replay_client.py index 623b2a8..e76c1f4 100644 --- a/intriniorealtime/replay_client.py +++ b/intriniorealtime/replay_client.py @@ -20,7 +20,6 @@ class IntrinioRealtimeConstants: DELAYED_SIP = "DELAYED_SIP" NASDAQ_BASIC = "NASDAQ_BASIC" MANUAL = "MANUAL" - PROVIDERS = [REALTIME, MANUAL, DELAYED_SIP, NASDAQ_BASIC] NO_PROVIDER = "NO_PROVIDER" NO_SUBPROVIDER = "NO_SUBPROVIDER" CTA_A = "CTA_A" @@ -29,7 +28,9 @@ class IntrinioRealtimeConstants: OTC = "OTC" NASDAQ_BASIC = "NASDAQ_BASIC" IEX = "IEX" - SUB_PROVIDERS = [NO_SUBPROVIDER, CTA_A, CTA_B, UTP, OTC, NASDAQ_BASIC, IEX] + CBOE_ONE = "CBOE_ONE" + PROVIDERS = [REALTIME, MANUAL, DELAYED_SIP, NASDAQ_BASIC, IEX, CBOE_ONE] + SUB_PROVIDERS = [NO_SUBPROVIDER, CTA_A, CTA_B, UTP, OTC, NASDAQ_BASIC, IEX, CBOE_ONE] MAX_QUEUE_SIZE = 1000000 EVENT_BUFFER_SIZE = 100 @@ -70,7 +71,7 @@ def __init__(self, time_received, data): self.data = data -class IntrinioReplayClient: +class IntrinioReplayEquitiesClient: def __init__(self, options: Dict[str, Any], on_trade: callable, on_quote: callable): if options is None: raise ValueError("Options parameter is required") @@ -275,6 +276,8 @@ def map_subprovider_to_api_value(sub_provider): return "otc_delayed" case IntrinioRealtimeConstants.NASDAQ_BASIC: return "nasdaq_basic" + case IntrinioRealtimeConstants.CBOE_ONE: + return "cboe_one" case _: return "iex" @@ -287,11 +290,14 @@ def map_provider_to_subproviders(provider): return [] case IntrinioRealtimeConstants.REALTIME: return [IntrinioRealtimeConstants.IEX] + case IntrinioRealtimeConstants.IEX: + return [IntrinioRealtimeConstants.IEX] case IntrinioRealtimeConstants.DELAYED_SIP: - return [IntrinioRealtimeConstants.UTP, IntrinioRealtimeConstants.CTA_A, IntrinioRealtimeConstants.CTA_B, - IntrinioRealtimeConstants.OTC] + return [IntrinioRealtimeConstants.UTP, IntrinioRealtimeConstants.CTA_A, IntrinioRealtimeConstants.CTA_B, IntrinioRealtimeConstants.OTC] case IntrinioRealtimeConstants.NASDAQ_BASIC: return [IntrinioRealtimeConstants.NASDAQ_BASIC] + case IntrinioRealtimeConstants.CBOE_ONE: + return [IntrinioRealtimeConstants.CBOE_ONE] case _: return [] diff --git a/setup.py b/setup.py index b177566..3ee02a0 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ def readme(): setup( name = 'intriniorealtime', packages = ['intriniorealtime'], - version = '5.4.3', + version = '6.0.0', author = 'Intrinio Python SDK for Real-Time Stock Prices', author_email = 'success@intrinio.com', url = 'https://intrinio.com', @@ -16,7 +16,7 @@ def readme(): long_description_content_type = 'text/markdown', install_requires = ['requests>=2.26.0','websocket-client>=1.2.1','wsaccel>=0.6.3', 'intrinio-sdk>=6.26.0'], python_requires = '~=3.10', - download_url = 'https://github.com/intrinio/intrinio-realtime-python-sdk/archive/v5.1.0.tar.gz', + download_url = 'https://github.com/intrinio/intrinio-realtime-python-sdk/archive/v6.0.0.tar.gz', keywords = ['realtime','stock prices','intrinio','stock market','stock data','financial'], classifiers = [ 'Intended Audience :: Financial and Insurance Industry', From 10dfb592a6088292df481ed7a7baf733d455b0db Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Mon, 16 Jun 2025 13:21:38 -0500 Subject: [PATCH 2/4] delayed mode --- intriniorealtime/client.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/intriniorealtime/client.py b/intriniorealtime/client.py index 6fcc88f..8f4f876 100644 --- a/intriniorealtime/client.py +++ b/intriniorealtime/client.py @@ -106,6 +106,7 @@ def __init__(self, options: Dict[str, Any], on_trade: Optional[callable], on_quo self.ipaddress = options.get('ipaddress') self.tradesonly = options.get('tradesonly') self.bypass_parsing = options.get('bypass_parsing', False) + self.delayed = options.get('delayed', False) if 'channels' in options: self.channels = set(options['channels']) @@ -183,8 +184,6 @@ def auth_url(self) -> str: auth_url = "https://cboe-one.intrinio.com/auth" elif self.provider == MANUAL: auth_url = "http://" + self.ipaddress + "/auth" - else: - auth_url = "https://realtime-mx.intrinio.com/auth" if self.api_key: auth_url = self.api_auth_url(auth_url) @@ -200,20 +199,22 @@ def api_auth_url(self, auth_url: str) -> str: return auth_url + "api_key=" + self.api_key def websocket_url(self) -> str: + delayed_part = "&delayed=true" if self.delayed else "" + if self.provider == REALTIME: - return "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + return "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + delayed_part elif self.provider == IEX: - return "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + return "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + delayed_part elif self.provider == DELAYED_SIP: - return "wss://realtime-delayed-sip.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + return "wss://realtime-delayed-sip.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + delayed_part elif self.provider == NASDAQ_BASIC: - return "wss://realtime-nasdaq-basic.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + return "wss://realtime-nasdaq-basic.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + delayed_part elif self.provider == CBOE_ONE: - return "wss://cboe-one.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + return "wss://cboe-one.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + delayed_part elif self.provider == MANUAL: - return "ws://" + self.ipaddress + "/socket/websocket?vsn=1.0.0&token=" + self.token + return "ws://" + self.ipaddress + "/socket/websocket?vsn=1.0.0&token=" + self.token + delayed_part else: - return "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + return "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=" + self.token + delayed_part def do_backoff(self): self.last_self_heal_backoff += 1 From 1eb26d5d08bf5bfc45b6303665033f5e6aad8a10 Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Mon, 16 Jun 2025 13:27:08 -0500 Subject: [PATCH 3/4] add cboe and delayed to options examples --- README.md | 8 +++++--- example_app.py | 3 ++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 350d441..fc7ecad 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,8 @@ class Summarize(threading.Thread): options = { 'api_key': 'API_KEY_HERE', - 'provider': 'REALTIME' # REALTIME or DELAYED_SIP or NASDAQ_BASIC + 'provider': 'IEX', # REALTIME (IEX) or IEX or CBOE_ONE or DELAYED_SIP or NASDAQ_BASIC + #'delayed': True, # Add this if you have realtime (nondelayed) access and want to force delayed mode. If you only have delayed mode access, this is redundant. } client = IntrinioRealtimeEquitiesClient(options, on_trade, on_quote) @@ -186,7 +187,8 @@ def on_trade(trade, backlog): options = { 'api_key': '', - 'provider': 'REALTIME', # REALTIME or DELAYED_SIP or NASDAQ_BASIC + 'provider': 'IEX', # REALTIME (IEX) or IEX or CBOE_ONE or DELAYED_SIP or NASDAQ_BASIC + #'delayed': True, # Add this if you have realtime (nondelayed) access and want to force delayed mode. If you only have delayed mode access, this is redundant. 'on_quote': on_quote, 'on_trade': on_trade } @@ -248,7 +250,7 @@ def on_trade(trade, backlog): options = { 'api_key': '', - 'provider': 'REALTIME', # REALTIME or DELAYED_SIP or NASDAQ_BASIC + 'provider': 'IEX', # REALTIME (IEX) or IEX or CBOE_ONE or DELAYED_SIP or NASDAQ_BASIC 'replay_date': datetime.date.today(), 'with_simulated_delay': False, # This plays back the events at the same rate they happened in market. 'delete_file_when_done': True, diff --git a/example_app.py b/example_app.py index 7832be3..0f33d9c 100644 --- a/example_app.py +++ b/example_app.py @@ -46,7 +46,8 @@ def run(self): options = { 'api_key': 'API_KEY_HERE', - 'provider': 'REALTIME' # 'REALTIME' or DELAYED_SIP or NASDAQ_BASIC + 'provider': 'IEX' # 'REALTIME' (IEX), or 'IEX', or 'DELAYED_SIP', or 'NASDAQ_BASIC', or 'CBOE_ONE' + # ,'delayed': True # Add this if you have realtime (nondelayed) access and want to force delayed mode. If you only have delayed mode access, this is redundant. # ,'replay_date': datetime.date.today() - datetime.timedelta(days=1) # needed for ReplayClient. The date to replay. # ,'with_simulated_delay': False # needed for ReplayClient. This plays back the events at the same rate they happened in market. # ,'delete_file_when_done': True # needed for ReplayClient From 9c758f4a86065930f3c105f3a8b18e6469fb4620 Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Mon, 16 Jun 2025 14:52:30 -0500 Subject: [PATCH 4/4] import options client and update readme --- Dockerfile | 3 +- ExampleApp.cmd | 3 +- README.md | 539 ++++++++++-- example_app.py => example_app_equities.py | 15 +- example_app_options.py | 152 ++++ .../{client.py => equities_client.py} | 41 +- ...ay_client.py => equities_replay_client.py} | 0 intriniorealtime/options_client.py | 825 ++++++++++++++++++ 8 files changed, 1483 insertions(+), 95 deletions(-) rename example_app.py => example_app_equities.py (86%) create mode 100644 example_app_options.py rename intriniorealtime/{client.py => equities_client.py} (94%) rename intriniorealtime/{replay_client.py => equities_replay_client.py} (100%) create mode 100644 intriniorealtime/options_client.py diff --git a/Dockerfile b/Dockerfile index 3b0ad59..39e5947 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,4 +13,5 @@ RUN pip install 'wsaccel' RUN pip install 'intrinio_sdk' -CMD python example_app.py \ No newline at end of file +CMD python example_app_equities.py +#CMD python example_app_options.py \ No newline at end of file diff --git a/ExampleApp.cmd b/ExampleApp.cmd index 8f7934f..84efd03 100644 --- a/ExampleApp.cmd +++ b/ExampleApp.cmd @@ -1,3 +1,4 @@ pip uninstall intriniorealtime -y pip install -e intriniorealtime -python example_app.py \ No newline at end of file +python example_app_equities.py +# python example_app_options.py \ No newline at end of file diff --git a/README.md b/README.md index fc7ecad..9b047a1 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,15 @@ # intrinio realtime python sdk -SDK for working with Intrinio's realtime Multi-Exchange prices feed. Intrinio’s Multi-Exchange feed bridges the gap by merging real-time equity pricing from the IEX and MEMX exchanges. Get a comprehensive view with increased market volume and enjoy no exchange fees, no per-user requirements, no permissions or authorizations, and little to no paperwork. +SDK for working with Intrinio's realtime OPRA, IEX, delayed SIP, CBOE One, or NASDAQ Basic prices feeds. Get a comprehensive view with increased market volume and enjoy minimized exchange and per user fees. -[Intrinio](https://intrinio.com/) provides real-time stock prices via a two-way WebSocket connection. To get started, [subscribe to a real-time data feed](https://intrinio.com/real-time-multi-exchange) and follow the instructions below. - -[Documentation for our legacy realtime client](https://github.com/intrinio/intrinio-realtime-python-sdk/tree/2.2.0) +[Intrinio](https://intrinio.com/) provides real-time stock and option prices via a two-way WebSocket connection. To get started, [subscribe to a real-time equity feed](https://intrinio.com/real-time-multi-exchange), or [subscribe to a real-time options feed](https://intrinio.com/financial-market-data/options-data) and follow the instructions below. ## Requirements - Python 3.10 -- You need https://pypi.org/project/websocket-client/, not https://pypi.org/project/websocket/. +- NOTE: You need https://pypi.org/project/websocket-client/, not https://pypi.org/project/websocket/. ## Docker -Add your API key to the example_app.py file, then +Add your API key to the example_app_equities.py or example_app_options.py file, comment the correct line (16 or 17) in Dockerfile, then ``` docker compose build docker compose run client @@ -19,89 +17,130 @@ docker compose run client ## Features -* Receive streaming, real-time price quotes (last trade, bid, ask) -* Subscribe to updates from individual securities -* Subscribe to updates for all securities -* Multiple sources of data - REALTIME or DELAYED_SIP or NASDAQ_BASIC +### Equities + +* Receive streaming, real-time pricing (trades, NBBO bid, ask) +* Subscribe to updates from individual securities, individual contracts, or +* Subscribe to updates for all securities (Lobby/Firehose mode) +* Replay a specific day (at actual pace or as fast as it loads) while the servers are down, either for testing or fetching missed data. + +### Options + +* Receive streaming, real-time option price updates: + * every trade + * conflated bid and ask + * open interest, open, close, high, low + * unusual activity(block trades, sweeps, whale trades, unusual sweeps) +* Subscribe to updates from individual options contracts (or option chains) +* Subscribe to updates for the entire universe of option contracts (~1.5M option contracts) ## Installation ``` pip install intriniorealtime ``` -## Example Usage +## Handling Quotes and the Queue + +There are thousands of securities (and millions of options contracts), each with their own feed of activity. We highly encourage you to make your on-event handlers as short as possible and follow a queue pattern so your app can handle the volume of activity. +Note that quotes (ask and bid updates) comprise 99% of the volume of the entire feed. Be cautious when deciding to receive quote updates. + +## Example Equities Usage ```python import threading +import signal import time -from threading import Timer, Thread, Event -from intriniorealtime.client import IntrinioRealtimeEquitiesClient +import sys +import datetime +from threading import Timer,Thread,Event,Lock + +from intriniorealtime.equities_client import IntrinioRealtimeEquitiesClient +from intriniorealtime.equities_replay_client import IntrinioReplayEquitiesClient +from intriniorealtime.equities_client import EquitiesQuote +from intriniorealtime.equities_client import EquitiesTrade trade_count = 0 ask_count = 0 bid_count = 0 backlog_count = 0 - def on_quote(quote, backlog): - global ask_count - global bid_count - global backlog_count - backlog_count = backlog - if 'type' in quote.__dict__: - if quote.type == "ask": - ask_count += 1 - else: - bid_count += 1 - - -def on_trade(trade, backlog): - global trade_count - global backlog_count - backlog_count = backlog - trade_count += 1 + global ask_count + global bid_count + global backlog_count + backlog_count = backlog + if isinstance(quote, EquitiesQuote) and 'type' in quote.__dict__: + if quote.type == "ask": ask_count += 1 + else: bid_count += 1 +def on_trade(trade, backlog): + global trade_count + global backlog_count + backlog_count = backlog + trade_count += 1 class Summarize(threading.Thread): - def __init__(self, event): + def __init__(self, stop_flag): threading.Thread.__init__(self, args=(), kwargs=None) self.daemon = True - self.stopped = event + self.stop_flag = stop_flag def run(self): global trade_count global bid_count global ask_count global backlog_count - while not self.stopped.wait(5): - print("trades: " + str(trade_count) + "; asks: " + str(ask_count) + "; bids: " + str( - bid_count) + "; backlog: " + str(backlog_count)) + while not self.stop_flag.wait(5): + print("trades: " + str(trade_count) + "; asks: " + str(ask_count) + "; bids: " + str(bid_count) + "; backlog: " + str(backlog_count)) -options = { +configuration = { 'api_key': 'API_KEY_HERE', - 'provider': 'IEX', # REALTIME (IEX) or IEX or CBOE_ONE or DELAYED_SIP or NASDAQ_BASIC - #'delayed': True, # Add this if you have realtime (nondelayed) access and want to force delayed mode. If you only have delayed mode access, this is redundant. + 'provider': 'IEX' # 'REALTIME' (IEX), or 'IEX', or 'DELAYED_SIP', or 'NASDAQ_BASIC', or 'CBOE_ONE' + # ,'delayed': True # Add this if you have realtime (nondelayed) access and want to force delayed mode. If you only have delayed mode access, this is redundant. + # ,'replay_date': datetime.date.today() - datetime.timedelta(days=1) # needed for ReplayClient. The date to replay. + # ,'with_simulated_delay': False # needed for ReplayClient. This plays back the events at the same rate they happened in market. + # ,'delete_file_when_done': True # needed for ReplayClient + # ,'write_to_csv': False # needed for ReplayClient + # ,'csv_file_path': 'data.csv' # needed for ReplayClient + # ,'bypass_parsing': True # if you want to handle parsing yourself, set this to True. Otherwise, leave it alone. + # ,'debug': True + # ,'max_queue_size': 250000 } -client = IntrinioRealtimeEquitiesClient(options, on_trade, on_quote) -client.join(['AAPL', 'GE', 'MSFT']) + +client = IntrinioRealtimeEquitiesClient(configuration, on_trade, on_quote) +# client = IntrinioReplayClient(options, on_trade, on_quote) +stop_event = Event() + + +def on_kill_process(sig, frame): + print("Stopping") + stop_event.set() + client.disconnect() + sys.exit(0) + + +signal.signal(signal.SIGINT, on_kill_process) + + +client.join(['AAPL','GE','MSFT']) # client.join(['lobby']) client.connect() -stopFlag = Event() -summarize_thread = Summarize(stopFlag) + +summarize_thread = Summarize(stop_event) summarize_thread.start() -time.sleep(10) -client.disconnect() -# this will stop the summarize thread -stopFlag.set() -``` -## Handling Quotes and the Queue +time.sleep(120) -There are thousands of securities, each with their own feed of activity. We highly encourage you to make your trade and quote handlers has short as possible and follow a queue pattern so your app can handle the volume of activity. +# sigint, or ctrl+c, during the thread wait will also perform the same below code. +print("Stopping") +stop_event.set() +client.disconnect() +sys.exit(0) +``` -## Quote Data Format +## Equities Data Format ### Quote Message @@ -134,7 +173,6 @@ There are thousands of securities, each with their own feed of activity. We hig * **market_center** - Provides the market center * **condition** - Provides the condition - ### Trade Message ```python @@ -165,6 +203,329 @@ There are thousands of securities, each with their own feed of activity. We hig * **condition** - Provides the condition +## Example Options Usage +```python +import threading +import signal +import time +import sys +import logging +from threading import Event, Lock + +from intriniorealtime.options_client import IntrinioRealtimeOptionsClient +from intriniorealtime.options_client import OptionsQuote +from intriniorealtime.options_client import OptionsTrade +from intriniorealtime.options_client import OptionsRefresh +from intriniorealtime.options_client import OptionsUnusualActivity +from intriniorealtime.options_client import OptionsUnusualActivityType +from intriniorealtime.options_client import OptionsUnusualActivitySentiment +from intriniorealtime.options_client import log +from intriniorealtime.options_client import Config +from intriniorealtime.options_client import Providers +from intriniorealtime.options_client import LogLevel + +options_trade_count = 0 +options_trade_count_lock = Lock() +options_quote_count = 0 +options_quote_count_lock = Lock() +options_refresh_count = 0 +options_refresh_count_lock = Lock() +options_ua_block_count = 0 +options_ua_block_count_lock = Lock() +options_ua_sweep_count = 0 +options_ua_sweep_count_lock = Lock() +options_ua_large_trade_count = 0 +options_ua_large_trade_count_lock = Lock() +options_ua_unusual_sweep_count = 0 +options_ua_unusual_sweep_count_lock = Lock() + + +def on_quote(quote: OptionsQuote): + global options_quote_count + global options_quote_count_lock + with options_quote_count_lock: + options_quote_count += 1 + + +def on_trade(trade: OptionsTrade): + global options_trade_count + global options_trade_count_lock + with options_trade_count_lock: + options_trade_count += 1 + + +def on_refresh(refresh: OptionsRefresh): + global options_refresh_count + global options_refresh_count_lock + with options_refresh_count_lock: + options_refresh_count += 1 + + +def on_unusual_activity(ua: OptionsUnusualActivity): + global options_ua_block_count + global options_ua_block_count_lock + global options_ua_sweep_count + global options_ua_sweep_count_lock + global options_ua_large_trade_count + global options_ua_large_trade_count_lock + global options_ua_unusual_sweep_count + global options_ua_unusual_sweep_count_lock + if ua.activity_type == OptionsUnusualActivityType.BLOCK: + with options_ua_block_count_lock: + options_ua_block_count += 1 + elif ua.activity_type == OptionsUnusualActivityType.SWEEP: + with options_ua_sweep_count_lock: + options_ua_sweep_count += 1 + elif ua.activity_type == OptionsUnusualActivityType.LARGE: + with options_ua_large_trade_count_lock: + options_ua_large_trade_count += 1 + elif ua.activity_type == OptionsUnusualActivityType.UNUSUAL_SWEEP: + with options_ua_unusual_sweep_count_lock: + options_ua_unusual_sweep_count += 1 + else: + log("on_unusual_activity - Unknown activity_type {0}", ua.activity_type) + + +class Summarize(threading.Thread): + def __init__(self, stop_flag: threading.Event, intrinio_client: IntrinioRealtimeOptionsClient): + threading.Thread.__init__(self, args=(), kwargs=None, daemon=True) + self.__stop_flag: threading.Event = stop_flag + self.__client = intrinio_client + + def run(self): + while not self.__stop_flag.is_set(): + time.sleep(30.0) + (dataMsgs, txtMsgs, queueDepth) = self.__client.get_stats() + log("Client Stats - Data Messages: {0}, Text Messages: {1}, Queue Depth: {2}".format(dataMsgs, txtMsgs, queueDepth)) + log( + "App Stats - Trades: {0}, Quotes: {1}, Refreshes: {2}, Blocks: {3}, Sweeps: {4}, Large Trades: {5}, Unusual Sweeps: {6}" + .format( + options_trade_count, + options_quote_count, + options_refresh_count, + options_ua_block_count, + options_ua_sweep_count, + options_ua_large_trade_count, + options_ua_unusual_sweep_count)) + + +# Your config object MUST include the 'api_key' and 'provider', at a minimum +config: Config = Config( + api_key="API_KEY_HERE", + provider=Providers.OPRA, + num_threads=8, + symbols=["AAPL", "BRKB__230217C00300000"], # this is a static list of symbols (options contracts or option chains) that will automatically be subscribed to when the client starts + log_level=LogLevel.INFO, + delayed=False) #set delayed parameter to true if you have realtime access but want the data delayed 15 minutes anyway + +# Register only the callbacks that you want. +# Take special care when registering the 'on_quote' handler as it will increase throughput by ~10x +intrinioRealtimeOptionsClient: IntrinioRealtimeOptionsClient = IntrinioRealtimeOptionsClient(config, on_trade=on_trade, on_quote=on_quote, on_refresh=on_refresh, on_unusual_activity=on_unusual_activity) + +stop_event = Event() + + +def on_kill_process(sig, frame): + log("Sample Application - Stopping") + stop_event.set() + intrinioRealtimeOptionsClient.stop() + sys.exit(0) + + +signal.signal(signal.SIGINT, on_kill_process) + +summarize_thread = Summarize(stop_event, intrinioRealtimeOptionsClient) +summarize_thread.start() + +intrinioRealtimeOptionsClient.start() + +#use this to join the channels already declared in your config +intrinioRealtimeOptionsClient.join() + +# Use this to subscribe to the entire universe of symbols (option contracts). This requires special permission. +# intrinioRealtimeOptionsClient.join_firehose() + +# Use this to subscribe, dynamically, to an option chain (all option contracts for a given underlying contract). +# intrinioRealtimeOptionsClient.join("AAPL") + +# Use this to subscribe, dynamically, to a specific option contract. +# intrinioRealtimeOptionsClient.join("AAP___230616P00250000") + +# Use this to subscribe, dynamically, a list of specific option contracts or option chains. +# intrinioRealtimeOptionsClient.join("GOOG__220408C02870000", "MSFT__220408C00315000", "AAPL__220414C00180000", "TSLA", "GE") + +time.sleep(60 * 60) +# sigint, or ctrl+c, during the thread wait will also perform the same below code. +on_kill_process(None, None) + +``` + +## Options Data Format +### Trade Message + +```python +class Trade: + def __init__(self, contract: str, exchange: Exchange, price: float, size: int, timestamp: float, total_volume: int, qualifiers: tuple, ask_price_at_execution: float, bid_price_at_execution: float, underlying_price_at_execution: float): + self.contract: str = contract + self.exchange: Exchange = exchange + self.price: float = price + self.size: int = size + self.timestamp: float = timestamp + self.total_volume: int = total_volume + self.qualifiers: tuple = qualifiers + self.ask_price_at_execution = ask_price_at_execution + self.bid_price_at_execution = bid_price_at_execution + self.underlying_price_at_execution = underlying_price_at_execution +``` + +* **contract** - Identifier for the options contract. This includes the ticker symbol, put/call, expiry, and strike price. +* **exchange** - Exchange(IntEnum): the specific exchange through which the trade occurred +* **price** - the price in USD +* **size** - the size of the last trade in hundreds (each contract is for 100 shares). +* **total_volume** - The number of contracts traded so far today. +* **timestamp** - a Unix timestamp (with microsecond precision) +* **qualifiers** - a tuple containing 4 ints: each item represents one trade qualifier. see list of possible [Trade Qualifiers](#trade-qualifiers), below. +* **ask_price_at_execution** - the contract ask price in USD at the time of execution. +* **bid_price_at_execution** - the contract bid price in USD at the time of execution. +* **underlying_price_at_execution** - the contract's underlying security price in USD at the time of execution. + +### Trade Qualifiers + +### Option Trade Qualifiers + +| Value | Description | +|-------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0 | Transaction is a regular trade | +| 1 | Out-of-sequence cancellation | +| 2 | Transaction is being reported late and is out-of-sequence | +| 3 | In-sequence cancellation | +| 4 | Transaction is being reported late, but is in correct sequence. | +| 5 | Cancel the first trade of the day | +| 6 | Late report of the opening trade and is out -of-sequence. Send an open price. | +| 7 | Transaction was the only one reported this day for the particular option contract and is now to be cancelled. | +| 8 | Late report of an opening trade and is in correct sequence. Process as regular trade. | +| 9 | Transaction was executed electronically. Process as regular trade. | +| 10 | Re-opening of a contract which was halted earlier. Process as regular trade. | +| 11 | Transaction is a contract for which the terms have been adjusted to reflect stock dividend, stock split or similar event. Process as regular trade. | +| 12 | Transaction represents a trade in two options of same class (a buy and a sell in the same class). Process as regular trade. | +| 13 | Transaction represents a trade in two options of same class (a buy and a sell in a put and a call.). Process as regular trade. | +| 14 | Transaction is the execution of a sale at a price agreed upon by the floor personnel involved, where a condition of the trade is that it reported following a non -stopped trade of the same series at the same price. | +| 15 | Cancel stopped transaction. | +| 16 | Transaction represents the option portion of buy/write (buy stock, sell call options). Process as regular trade. | +| 17 | Transaction represents the buying of a call and selling of a put for same underlying stock or index. Process as regular trade. | +| 18 | Transaction was the execution of an order which was “stopped” at a price that did not constitute a Trade-Through on another market at the time of the stop. Process like a normal transaction. | +| 19 | Transaction was the execution of an order identified as an Intermarket Sweep Order. Updates open, high, low, and last. | +| 20 | Transaction reflects the execution of a “benchmark trade”. A “benchmark trade” is a trade resulting from the matching of “benchmark orders”. A “benchmark order” is an order for which the price is not based, directly or indirectly, on the quoted price of th e option at the time of the order’s execution and for which the material terms were not reasonably determinable at the time a commitment to trade the order was made. Updates open, high, and low, but not last unless the trade is the first of the day. | +| 24 | Transaction is trade through exempt, treat like a regular trade. | +| 27 | “a” (Single leg auction non ISO) | +| 28 | “b” (Single leg auction ISO) | +| 29 | “c” (Single leg cross Non ISO) | +| 30 | “d” (Single leg cross ISO) | +| 31 | “e” (Single leg floor trade) | +| 32 | “f” (Multi leg auto electronic trade) | +| 33 | “g” (Multi leg auction trade) | +| 34 | “h” (Multi leg Cross trade) | +| 35 | “i” (Multi leg floor trade) | +| 36 | “j” (Multi leg auto electronic trade against single leg) | +| 37 | “k” (Stock options Auction) | +| 38 | “l” (Multi leg auction trade against single leg) | +| 39 | “m” (Multi leg floor trade against single leg) | +| 40 | “n” (Stock options auto electronic trade) | +| 41 | “o” (Stock options cross trade) | +| 42 | “p” (Stock options floor trade) | +| 43 | “q” (Stock options auto electronic trade against single leg) | +| 44 | “r” (Stock options auction against single leg) | +| 45 | “s” (Stock options floor trade against single leg) | +| 46 | “t” (Multi leg floor trade of proprietary products) | +| 47 | “u” (Multilateral Compression Trade of Proprietary Data Products)Transaction represents an execution in a proprietary product done as part of a multilateral compression. Trades are executed outside of regular trading hours at prices derived from end of day markets. Trades do not update Open, High, Low, and Closing Prices, but will update total volume. | +| 48 | “v” (Extended Hours Trade )Transaction represents a trade that was executed outside of regular market hours. Trades do not update Open, High, Low, and Closing Prices but will update total volume. | + + +### Quote Message + +```python +class Quote: + def __init__(self, contract: str, ask_price: float, ask_size: int, bid_price: float, bid_size: int, timestamp: float): + self.contract: str = contract + self.ask_price: float = ask_price + self.bid_price: float = bid_price + self.ask_size: int = ask_size + self.bid_size: int = bid_size + self.timestamp: float = timestamp +``` + +* **contract** - Identifier for the options contract. This includes the ticker symbol, put/call, expiry, and strike price. +* **ask_price** - the ask price in USD +* **ask_size** - the size of the last ask in hundreds (each contract is for 100 shares). +* **bid_price** - the bid price in USD +* **bid_size** - the size of the last bid in hundreds (each contract is for 100 shares). +* **timestamp** - a Unix timestamp (with microsecond precision) + + +### Refresh Message + +```python +class Refresh: + def __init__(self, contract: str, open_interest: int, open_price: float, close_price: float, high_price: float, low_price: float): + self.contract: str = contract + self.open_interest: int = open_interest + self.open_price: float = open_price + self.close_price: float = close_price + self.high_price: float = high_price + self.low_price: float = low_price +``` + +* **contract** - Identifier for the options contract. This includes the ticker symbol, put/call, expiry, and strike price. +* **openInterest** - the total quantity of opened contracts as reported at the start of the trading day +* **open_price** - the open price in USD +* **close_price** - the close price in USD +* **high_price** - the daily high price in USD +* **low_price** - the daily low price in USD + +### Unusual Activity Message +```python +class UnusualActivity: + def __init__(self, + contract: str, + activity_type: UnusualActivityType, + sentiment: UnusualActivitySentiment, + total_value: float, + total_size: int, + average_price: float, + ask_price_at_execution: float, + bid_price_at_execution: float, + underlying_price_at_execution: float, + timestamp: float): + self.contract: str = contract + self.activity_type: UnusualActivityType = activity_type + self.sentiment: UnusualActivitySentiment = sentiment + self.total_value: float = total_value + self.total_size: int = total_size + self.average_price: float = average_price + self.ask_price_at_execution: float = ask_price_at_execution + self.bid_price_at_execution: float = bid_price_at_execution + self.underlying_price_at_execution: float = underlying_price_at_execution + self.timestamp: float = timestamp +``` + +* **contract** - Identifier for the options contract. This includes the ticker symbol, put/call, expiry, and strike price. +* **activity_type** - The type of unusual activity that was detected + * **`Block`** - represents an 'block' trade + * **`Sweep`** - represents an intermarket sweep + * **`Large`** - represents a trade of at least $100,000 + * **`UnusualSweep`** - represents an unusually large sweep near market open +* **sentiment** - The sentiment of the unusual activity event + * **`Neutral`** - Reflects a minimal expected price change + * **`Bullish`** - Reflects an expected positive (upward) change in price + * **`Bearish`** - Reflects an expected negative (downward) change in price +* **total_value** - The total value of the trade in USD. 'Sweeps' and 'blocks' can be comprised of multiple trades. This is the value of the entire event. +* **total_size** - The total size of the trade in number of contracts. 'Sweeps' and 'blocks' can be comprised of multiple trades. This is the total number of contracts exchanged during the event. +* **average_price** - The average price at which the trade was executed. 'Sweeps' and 'blocks' can be comprised of multiple trades. This is the average trade price for the entire event. +* **ask_price_at_execution** - The 'ask' price of the underlying at execution of the trade event. +* **bid_price_at_execution** - The 'bid' price of the underlying at execution of the trade event. +* **underlying_price_at_execution** - The last trade price of the underlying at execution of the trade event. +* **Timestamp** - a Unix timestamp (with microsecond precision). + ## API Keys You will receive your Intrinio API Key after [creating an account](https://intrinio.com/signup). You will need a subscription to a [realtime data feed](https://intrinio.com/real-time-multi-exchange) as well. @@ -172,20 +533,28 @@ You will receive your Intrinio API Key after [creating an account](https://intri ### Methods -`client = IntrinioRealtimeClient(options)` - Creates an Intrinio Realtime client -* **Parameter** `options.api_key`: Your Intrinio API Key -* **Parameter** `options.provider`: The real-time data provider to use ("REALTIME" or "DELAYED_SIP" or "NASDAQ_BASIC") -* **Parameter** `options.on_quote(quote, backlog)`: A function that handles received quotes. `backlog` is an integer representing the approximate size of the queue of unhandled quote/trade events. -* **Parameter** `options.on_trade(quote, backlog)`: A function that handles received trades. `backlog` is an integer representing the approximate size of the queue of unhandled quote/trade events. -* **Parameter** `options.logger`: (optional) A Python Logger instance to use for logging - +`client = IntrinioRealtimeEquitiesClient(configuration)` - Creates an Intrinio Realtime client +* **Parameter** `configuration.api_key`: Your Intrinio API Key +* **Parameter** `configuration.provider`: The real-time data provider to use ("REALTIME" or "DELAYED_SIP" or "NASDAQ_BASIC") +* **Parameter** `configuration.on_quote(quote, backlog)`: A function that handles received quotes. `backlog` is an integer representing the approximate size of the queue of unhandled quote/trade events. +* **Parameter** `configuration.on_trade(quote, backlog)`: A function that handles received trades. `backlog` is an integer representing the approximate size of the queue of unhandled quote/trade events. +* **Parameter** `configuration.logger`: (optional) A Python Logger instance to use for logging + +`client : IntrinioRealtimeOptionsClient = IntrinioRealtimeOptionsClient(config : Config, on_trade : Callable[[Trade], None], on_quote : Callable[[Quote], None] = None, on_refresh : Callable[[Refresh], None] = None, on_unusual_activity : Callable[[UnusualActivity],None] = None)` - Creates an Intrinio Real-Time client. +* **Parameter** `config`: The configuration to be used by the client. +* **Parameter** `on_trade`: The Callable accepting trades. If no `on_trade` callback is provided, you will not receive trade updates from the server. +* **Parameter** `on_quote`: The Callable accepting quotes. If no `on_quote` callback is provided, you will not receive quote (ask, bid) updates from the server. +* **Parameter** `on_refresh`: The Callable accepting refresh messages. If no `on_refresh` callback is provided, you will not receive open interest, high, low, open, or close data from the server. Note: open interest data is only updated at the beginning of every trading day. If this callback is provided you will recieve an update immediately, as well as every 15 minutes (approx). +* **Parameter** `on_unusual_activity`: The Callable accepting unusual activity events. If no `on_unusual_activity` callback is provided, you will not receive unusual activity updates from the server. + +#### Equities: ```python def on_quote(quote, backlog): print("QUOTE: " , quote, "BACKLOG LENGTH: ", backlog) def on_trade(trade, backlog): print("TRADE: " , trade, "BACKLOG LENGTH: ", backlog) -options = { +configuration = { 'api_key': '', 'provider': 'IEX', # REALTIME (IEX) or IEX or CBOE_ONE or DELAYED_SIP or NASDAQ_BASIC #'delayed': True, # Add this if you have realtime (nondelayed) access and want to force delayed mode. If you only have delayed mode access, this is redundant. @@ -193,7 +562,19 @@ options = { 'on_trade': on_trade } -client = IntrinioRealtimeClient(options) +client = IntrinioRealtimeEquitiesClient(configuration) +``` + +#### Options: +```python +class Config: + def __init__(self, apiKey : str, provider : Providers, numThreads : int = 4, logLevel : LogLevel = LogLevel.INFO, manualIpAddress : str = None, symbols : set[str] = None): + self.apiKey : str = apiKey + self.provider : Providers = provider # Providers.OPRA or Providers.MANUAL + self.numThreads : int = numThreads # At least 4 threads are recommended for 'FIREHOSE' connections + self.manualIpAddress : str = manualIpAddress + self.symbols : list[str] = symbols # Static list of symbols to use + self.logLevel : LogLevel = logLevel ``` --------- @@ -207,15 +588,13 @@ client.join("lobby") ``` --------- -`client.connect()` - Retrieves an auth token, opens the WebSocket connection, starts the self-healing and heartbeat intervals, joins requested channels. +Equities - `client.connect()` - Retrieves an auth token, opens the WebSocket connection, starts the self-healing and heartbeat intervals, joins requested channels. +Options - `client.start()` --------- -`client.keep_alive()` - Runs an infinite loop to keep the thread alive, so that the client continues to receive prices. You may call this function after `connect()` or use your own timing logic (for example: connect, listen for quotes for x minutes, disconnect). - ---------- - -`client.disconnect()` - Closes the WebSocket, stops the self-healing and heartbeat intervals. You _must_ call this to dispose of the client. +Equities - `client.disconnect()` - Closes the WebSocket, stops the self-healing and heartbeat intervals. You _must_ call this to dispose of the client. +Options - `client.stop()` --------- @@ -241,7 +620,7 @@ client.leave("GOOG") `client.leave_all()` - Leaves all channels. --------- -## Example Replay Client Usage +## Example Equities Replay Client Usage ```python def on_quote(quote, backlog): print("QUOTE: " , quote, "BACKLOG LENGTH: ", backlog) @@ -260,3 +639,29 @@ options = { client = IntrinioReplayClient(options, on_trade, on_quote) ``` + +### Minimum Hardware Requirements - Trades only +Equities Client: +* Non-lobby mode: 1 hardware core and 1 thread in your configuration for roughly every 100 symbols, up to the lobby mode settings. Absolute minimum 2 cores and threads. +* Lobby mode: 4 hardware cores and 4 threads in your configuration +* 5 Mbps connection +* 0.5 ms latency + +Options Client: +* Non-lobby mode: 1 hardware core and 1 thread in your configuration for roughly every 250 contracts, up to the lobby mode settings. 3 cores and 3 configured threads for each chain, up to the lobby mode settings. Absolute minimum 3 cores and threads. +* Lobby mode: 6 hardware cores and 6 threads in your configuration +* 25 Mbps connection +* 0.5 ms latency + +### Minimum Hardware Requirements - Trades and Quotes +Equities Client: +* Non-lobby mode: 1 hardware core and 1 thread in your configuration for roughly every 25 symbols, up to the lobby mode settings. Absolute minimum 4 cores and threads. +* Lobby mode: 8 hardware cores and 8 threads in your configuration +* 25 Mbps connection +* 0.5 ms latency + +Options Client: +* Non-lobby mode: 1 hardware core and 1 thread in your configuration for roughly every 100 contracts, up to the lobby mode settings. 4 cores and 4 configured threads for each chain, up to the lobby mode settings. Absolute minimum 4 cores and threads. +* Lobby mode: 12 hardware cores and 12 threads in your configuration +* 100 Mbps connection +* 0.5 ms latency \ No newline at end of file diff --git a/example_app.py b/example_app_equities.py similarity index 86% rename from example_app.py rename to example_app_equities.py index 0f33d9c..344e520 100644 --- a/example_app.py +++ b/example_app_equities.py @@ -3,11 +3,12 @@ import time import sys import datetime -from threading import Timer,Thread,Event -from intriniorealtime.client import IntrinioRealtimeEquitiesClient -from intriniorealtime.replay_client import IntrinioReplayEquitiesClient -from intriniorealtime.client import EquitiesQuote -from intriniorealtime.client import EquitiesTrade +from threading import Timer,Thread,Event,Lock + +from intriniorealtime.equities_client import IntrinioRealtimeEquitiesClient +from intriniorealtime.equities_replay_client import IntrinioReplayEquitiesClient +from intriniorealtime.equities_client import EquitiesQuote +from intriniorealtime.equities_client import EquitiesTrade trade_count = 0 ask_count = 0 @@ -44,7 +45,7 @@ def run(self): print("trades: " + str(trade_count) + "; asks: " + str(ask_count) + "; bids: " + str(bid_count) + "; backlog: " + str(backlog_count)) -options = { +configuration = { 'api_key': 'API_KEY_HERE', 'provider': 'IEX' # 'REALTIME' (IEX), or 'IEX', or 'DELAYED_SIP', or 'NASDAQ_BASIC', or 'CBOE_ONE' # ,'delayed': True # Add this if you have realtime (nondelayed) access and want to force delayed mode. If you only have delayed mode access, this is redundant. @@ -59,7 +60,7 @@ def run(self): } -client = IntrinioRealtimeEquitiesClient(options, on_trade, on_quote) +client = IntrinioRealtimeEquitiesClient(configuration, on_trade, on_quote) # client = IntrinioReplayClient(options, on_trade, on_quote) stop_event = Event() diff --git a/example_app_options.py b/example_app_options.py new file mode 100644 index 0000000..3f8e56d --- /dev/null +++ b/example_app_options.py @@ -0,0 +1,152 @@ +import threading +import signal +import time +import sys +import logging +from threading import Event, Lock + +from intriniorealtime.options_client import IntrinioRealtimeOptionsClient +from intriniorealtime.options_client import OptionsQuote +from intriniorealtime.options_client import OptionsTrade +from intriniorealtime.options_client import OptionsRefresh +from intriniorealtime.options_client import OptionsUnusualActivity +from intriniorealtime.options_client import OptionsUnusualActivityType +from intriniorealtime.options_client import OptionsUnusualActivitySentiment +from intriniorealtime.options_client import log +from intriniorealtime.options_client import Config +from intriniorealtime.options_client import Providers +from intriniorealtime.options_client import LogLevel + +options_trade_count = 0 +options_trade_count_lock = Lock() +options_quote_count = 0 +options_quote_count_lock = Lock() +options_refresh_count = 0 +options_refresh_count_lock = Lock() +options_ua_block_count = 0 +options_ua_block_count_lock = Lock() +options_ua_sweep_count = 0 +options_ua_sweep_count_lock = Lock() +options_ua_large_trade_count = 0 +options_ua_large_trade_count_lock = Lock() +options_ua_unusual_sweep_count = 0 +options_ua_unusual_sweep_count_lock = Lock() + + +def on_quote(quote: OptionsQuote): + global options_quote_count + global options_quote_count_lock + with options_quote_count_lock: + options_quote_count += 1 + + +def on_trade(trade: OptionsTrade): + global options_trade_count + global options_trade_count_lock + with options_trade_count_lock: + options_trade_count += 1 + + +def on_refresh(refresh: OptionsRefresh): + global options_refresh_count + global options_refresh_count_lock + with options_refresh_count_lock: + options_refresh_count += 1 + + +def on_unusual_activity(ua: OptionsUnusualActivity): + global options_ua_block_count + global options_ua_block_count_lock + global options_ua_sweep_count + global options_ua_sweep_count_lock + global options_ua_large_trade_count + global options_ua_large_trade_count_lock + global options_ua_unusual_sweep_count + global options_ua_unusual_sweep_count_lock + if ua.activity_type == OptionsUnusualActivityType.BLOCK: + with options_ua_block_count_lock: + options_ua_block_count += 1 + elif ua.activity_type == OptionsUnusualActivityType.SWEEP: + with options_ua_sweep_count_lock: + options_ua_sweep_count += 1 + elif ua.activity_type == OptionsUnusualActivityType.LARGE: + with options_ua_large_trade_count_lock: + options_ua_large_trade_count += 1 + elif ua.activity_type == OptionsUnusualActivityType.UNUSUAL_SWEEP: + with options_ua_unusual_sweep_count_lock: + options_ua_unusual_sweep_count += 1 + else: + log("on_unusual_activity - Unknown activity_type {0}", ua.activity_type) + + +class Summarize(threading.Thread): + def __init__(self, stop_flag: threading.Event, intrinio_client: IntrinioRealtimeOptionsClient): + threading.Thread.__init__(self, args=(), kwargs=None, daemon=True) + self.__stop_flag: threading.Event = stop_flag + self.__client = intrinio_client + + def run(self): + while not self.__stop_flag.is_set(): + time.sleep(30.0) + (dataMsgs, txtMsgs, queueDepth) = self.__client.get_stats() + log("Client Stats - Data Messages: {0}, Text Messages: {1}, Queue Depth: {2}".format(dataMsgs, txtMsgs, queueDepth)) + log( + "App Stats - Trades: {0}, Quotes: {1}, Refreshes: {2}, Blocks: {3}, Sweeps: {4}, Large Trades: {5}, Unusual Sweeps: {6}" + .format( + options_trade_count, + options_quote_count, + options_refresh_count, + options_ua_block_count, + options_ua_sweep_count, + options_ua_large_trade_count, + options_ua_unusual_sweep_count)) + + +# Your config object MUST include the 'api_key' and 'provider', at a minimum +config: Config = Config( + api_key="API_KEY_HERE", + provider=Providers.OPRA, + num_threads=8, + symbols=["AAPL", "BRKB__230217C00300000"], # this is a static list of symbols (options contracts or option chains) that will automatically be subscribed to when the client starts + log_level=LogLevel.INFO, + delayed=False) #set delayed parameter to true if you have realtime access but want the data delayed 15 minutes anyway + +# Register only the callbacks that you want. +# Take special care when registering the 'on_quote' handler as it will increase throughput by ~10x +intrinioRealtimeOptionsClient: IntrinioRealtimeOptionsClient = IntrinioRealtimeOptionsClient(config, on_trade=on_trade, on_quote=on_quote, on_refresh=on_refresh, on_unusual_activity=on_unusual_activity) + +stop_event = Event() + + +def on_kill_process(sig, frame): + log("Sample Application - Stopping") + stop_event.set() + intrinioRealtimeOptionsClient.stop() + sys.exit(0) + + +signal.signal(signal.SIGINT, on_kill_process) + +summarize_thread = Summarize(stop_event, intrinioRealtimeOptionsClient) +summarize_thread.start() + +intrinioRealtimeOptionsClient.start() + +#use this to join the channels already declared in your config +intrinioRealtimeOptionsClient.join() + +# Use this to subscribe to the entire universe of symbols (option contracts). This requires special permission. +# intrinioRealtimeOptionsClient.join_firehose() + +# Use this to subscribe, dynamically, to an option chain (all option contracts for a given underlying contract). +# intrinioRealtimeOptionsClient.join("AAPL") + +# Use this to subscribe, dynamically, to a specific option contract. +# intrinioRealtimeOptionsClient.join("AAP___230616P00250000") + +# Use this to subscribe, dynamically, a list of specific option contracts or option chains. +# intrinioRealtimeOptionsClient.join("GOOG__220408C02870000", "MSFT__220408C00315000", "AAPL__220414C00180000", "TSLA", "GE") + +time.sleep(60 * 60) +# sigint, or ctrl+c, during the thread wait will also perform the same below code. +on_kill_process(None, None) diff --git a/intriniorealtime/client.py b/intriniorealtime/equities_client.py similarity index 94% rename from intriniorealtime/client.py rename to intriniorealtime/equities_client.py index 8f4f876..e49c0be 100644 --- a/intriniorealtime/client.py +++ b/intriniorealtime/equities_client.py @@ -7,9 +7,12 @@ import struct import sys import wsaccel +from enum import IntEnum, unique from typing import Optional, Dict, Any SELF_HEAL_BACKOFFS = [10, 30, 60, 300, 600] +_EMPTY_STRING = "" +_NAN = float("NAN") REALTIME = "REALTIME" DELAYED_SIP = "DELAYED_SIP" NASDAQ_BASIC = "NASDAQ_BASIC" @@ -94,40 +97,40 @@ def is_darkpool(self): class IntrinioRealtimeEquitiesClient: - def __init__(self, options: Dict[str, Any], on_trade: Optional[callable], on_quote: Optional[callable]): - if options is None: + def __init__(self, configuration: Dict[str, Any], on_trade: Optional[callable], on_quote: Optional[callable]): + if configuration is None: raise ValueError("Options parameter is required") - self.options = options - self.api_key = options.get('api_key') - self.username = options.get('username') - self.password = options.get('password') - self.provider = options.get('provider') - self.ipaddress = options.get('ipaddress') - self.tradesonly = options.get('tradesonly') - self.bypass_parsing = options.get('bypass_parsing', False) - self.delayed = options.get('delayed', False) - - if 'channels' in options: - self.channels = set(options['channels']) + self.options = configuration + self.api_key = configuration.get('api_key') + self.username = configuration.get('username') + self.password = configuration.get('password') + self.provider = configuration.get('provider') + self.ipaddress = configuration.get('ipaddress') + self.tradesonly = configuration.get('tradesonly') + self.bypass_parsing = configuration.get('bypass_parsing', False) + self.delayed = configuration.get('delayed', False) + + if 'channels' in configuration: + self.channels = set(configuration['channels']) else: self.channels = set() - if 'logger' in options: - self.logger = options['logger'] + if 'logger' in configuration: + self.logger = configuration['logger'] else: log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') log_handler = logging.StreamHandler() log_handler.setFormatter(log_formatter) self.logger = logging.getLogger('intrinio_realtime') - if 'debug' in options and options['debug'] == True: + if 'debug' in configuration and configuration['debug'] == True: self.logger.setLevel(logging.DEBUG) else: self.logger.setLevel(logging.INFO) self.logger.addHandler(log_handler) - if 'max_queue_size' in options: - self.quotes = queue.Queue(maxsize=options['max_queue_size']) + if 'max_queue_size' in configuration: + self.quotes = queue.Queue(maxsize=configuration['max_queue_size']) else: self.quotes = queue.Queue(maxsize=MAX_QUEUE_SIZE) diff --git a/intriniorealtime/replay_client.py b/intriniorealtime/equities_replay_client.py similarity index 100% rename from intriniorealtime/replay_client.py rename to intriniorealtime/equities_replay_client.py diff --git a/intriniorealtime/options_client.py b/intriniorealtime/options_client.py new file mode 100644 index 0000000..a9eb3b7 --- /dev/null +++ b/intriniorealtime/options_client.py @@ -0,0 +1,825 @@ +from distutils.command.config import config +import queue +import time +import threading +import requests +import websocket +import logging +import struct +from collections.abc import Callable +from enum import IntEnum, unique + +_SELF_HEAL_BACKOFFS = [10, 30, 60, 300, 600] +_EMPTY_STRING = "" +_OPTIONS_TRADE_MESSAGE_SIZE = 72 # 61 used + 11 pad +_OPTIONS_QUOTE_MESSAGE_SIZE = 52 # 48 used + 4 pad +_OPTIONS_REFRESH_MESSAGE_SIZE = 52 # 44 used + 8 pad +_OPTIONS_UNUSUAL_ACTIVITY_MESSAGE_SIZE = 74 # 62 used + 12 pad +_NAN = float("NAN") + +_stopFlag: threading.Event = threading.Event() +_dataMsgLock: threading.Lock = threading.Lock() +_dataMsgCount: int = 0 +_txtMsgLock: threading.Lock = threading.Lock() +_txtMsgCount: int = 0 +_logHandler: logging.Logger = logging.StreamHandler() +_logHandler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) +_log: logging.Logger = logging.getLogger('intrinio_realtime_options') +_log.setLevel(logging.INFO) +_log.addHandler(_logHandler) + +def log(message: str): + _log.info(message) + +def do_backoff(fn: Callable[[None], bool]): + i: int = 0 + backoff: int = _SELF_HEAL_BACKOFFS[i] + success: bool = fn() + while (not success): + time.sleep(backoff) + i = min(i + 1, len(_SELF_HEAL_BACKOFFS) - 1) + backoff = _SELF_HEAL_BACKOFFS[i] + success = fn() + +@unique +class Providers(IntEnum): + OPRA = 1 + MANUAL = 2 + +@unique +class LogLevel(IntEnum): + DEBUG = logging.DEBUG + INFO = logging.INFO + +class OptionsQuote: + def __init__(self, contract: str, ask_price: float, ask_size: int, bid_price: float, bid_size: int, timestamp: float): + self.contract: str = contract + self.ask_price: float = ask_price + self.bid_price: float = bid_price + self.ask_size: int = ask_size + self.bid_size: int = bid_size + self.timestamp: float = timestamp + + def __str__(self) -> str: + return "Quote (Contract: {0}, AskPrice: {1:.2f}, AskSize: {2}, BidPrice: {3:.2f}, BidSize: {4}, Timestamp: {5})"\ + .format(self.contract, + self.ask_price, + self.ask_size, + self.bid_price, + self.bid_size, + self.timestamp) + + def get_strike_price(self) -> float: + whole: int = (ord(self.contract[13]) - ord('0')) * 10000 + (ord(self.contract[14]) - ord('0')) * 1000 + (ord(self.contract[15]) - ord('0')) * 100 + (ord(self.contract[16]) - ord('0')) * 10 + (ord(self.contract[17]) - ord('0')) + part: float = float(ord(self.contract[18]) - ord('0')) * 0.1 + float(ord(self.contract[19]) - ord('0')) * 0.01 + float(ord(self.contract[20]) - ord('0')) * 0.001 + return float(whole) + part + + def is_put(self) -> bool: + return self.contract[12] == 'P' + + def is_call(self) -> bool: + return self.contract[12] == 'C' + + def get_expiration_date(self) -> time.struct_time: + return time.strptime(self.contract[6:12], "%y%m%d") + + def get_underlying_symbol(self) -> str: + return self.contract[0:6].rstrip('_') + +@unique +class Exchange(IntEnum): + NYSE_AMERICAN = ord('A') + BOSTON = ord('B') + CBOE = ord('C') + MIAMI_EMERALD = ord('D') + BATS_EDGX = ord('E') + ISE_GEMINI = ord('H') + ISE = ord('I') + MERCURY = ord('J') + MIAMI = ord('M') + NYSE_ARCA = ord('N') + MIAMI_PEARL = ord('O') + NYSE_ARCA_DEPRECIATED = ord('P') + NASDAQ = ord('Q') + MIAX_SAPPHIRE = ord('S') + NASDAQ_BX = ord('T') + MEMX = ord('U') + CBOE_C2 = ord('W') + PHLX = ord('X') + BATS_BZX = ord('Z') + UNKNOWN = ord('?') + + @classmethod + def _missing_(cls, value): + return cls.UNKNOWN + +class OptionsTrade: + def __init__(self, contract: str, exchange: Exchange, price: float, size: int, timestamp: float, total_volume: int, qualifiers: tuple, ask_price_at_execution: float, bid_price_at_execution: float, underlying_price_at_execution: float): + self.contract: str = contract + self.exchange: Exchange = exchange + self.price: float = price + self.size: int = size + self.timestamp: float = timestamp + self.total_volume: int = total_volume + self.qualifiers: tuple = qualifiers + self.ask_price_at_execution = ask_price_at_execution + self.bid_price_at_execution = bid_price_at_execution + self.underlying_price_at_execution = underlying_price_at_execution + + def __str__(self) -> str: + return "Trade (Contract: {0}, Exchange: {1}, Price: {2:.2f}, Size: {3}, Timestamp: {4}, TotalVolume: {5}, Qualifiers: {6}, AskPriceAtExecution: {7:.2f}, BidPriceAtExecution: {8:.2f}, UnderlyingPriceAtExecution: {9:.2f})"\ + .format(self.contract, + self.exchange.name, + self.price, + self.size, + self.timestamp, + self.total_volume, + self.qualifiers, + self.ask_price_at_execution, + self.bid_price_at_execution, + self.underlying_price_at_execution) + + def get_strike_price(self) -> float: + whole: int = (ord(self.contract[13]) - ord('0')) * 10000 + (ord(self.contract[14]) - ord('0')) * 1000 + (ord(self.contract[15]) - ord('0')) * 100 + (ord(self.contract[16]) - ord('0')) * 10 + (ord(self.contract[17]) - ord('0')) + part: float = float(ord(self.contract[18]) - ord('0')) * 0.1 + float(ord(self.contract[19]) - ord('0')) * 0.01 + float(ord(self.contract[20]) - ord('0')) * 0.001 + return float(whole) + part + + def is_put(self) -> bool: + return self.contract[12] == 'P' + + def is_call(self) -> bool: + return self.contract[12] == 'C' + + def get_expiration_date(self) -> time.struct_time: + return time.strptime(self.contract[6:12], "%y%m%d") + + def get_underlying_symbol(self) -> str: + return self.contract[0:6].rstrip('_') + +@unique +class OptionsUnusualActivitySentiment(IntEnum): + NEUTRAL = 0 + BULLISH = 1 + BEARISH = 2 + +@unique +class OptionsUnusualActivityType(IntEnum): + BLOCK = 3 + SWEEP = 4 + LARGE = 5 + UNUSUAL_SWEEP = 6 + +class OptionsRefresh: + def __init__(self, contract: str, open_interest: int, open_price: float, close_price: float, high_price: float, low_price: float): + self.contract: str = contract + self.open_interest: int = open_interest + self.open_price: float = open_price + self.close_price: float = close_price + self.high_price: float = high_price + self.low_price: float = low_price + + def __str__(self) -> str: + return "Refresh (Contract: {0}, OpenInterest: {1}, OpenPrice: {2:.2f}, ClosePrice: {3:.2f}, HighPrice: {4:.2f}, LowPrice: {5:.2f})"\ + .format(self.contract, + self.open_interest, + self.open_price, + self.close_price, + self.high_price, + self.low_price) + + def get_strike_price(self) -> float: + whole: int = (ord(self.contract[13]) - ord('0')) * 10000 + (ord(self.contract[14]) - ord('0')) * 1000 + (ord(self.contract[15]) - ord('0')) * 100 + (ord(self.contract[16]) - ord('0')) * 10 + (ord(self.contract[17]) - ord('0')) + part: float = float(ord(self.contract[18]) - ord('0')) * 0.1 + float(ord(self.contract[19]) - ord('0')) * 0.01 + float(ord(self.contract[20]) - ord('0')) * 0.001 + return float(whole) + part + + def is_put(self) -> bool: + return self.contract[12] == 'P' + + def is_call(self) -> bool: + return self.contract[12] == 'C' + + def get_expiration_date(self) -> time.struct_time: + return time.strptime(self.contract[6:12], "%y%m%d") + + def get_underlying_symbol(self) -> str: + return self.contract[0:6].rstrip('_') + +class OptionsUnusualActivity: + def __init__(self, + contract: str, + activity_type: OptionsUnusualActivityType, + sentiment: OptionsUnusualActivitySentiment, + total_value: float, + total_size: int, + average_price: float, + ask_price_at_execution: float, + bid_price_at_execution: float, + underlying_price_at_execution: float, + timestamp: float): + self.contract: str = contract + self.activity_type: OptionsUnusualActivityType = activity_type + self.sentiment: OptionsUnusualActivitySentiment = sentiment + self.total_value: float = total_value + self.total_size: int = total_size + self.average_price: float = average_price + self.ask_price_at_execution: float = ask_price_at_execution + self.bid_price_at_execution: float = bid_price_at_execution + self.underlying_price_at_execution: float = underlying_price_at_execution + self.timestamp: float = timestamp + + def __str__(self) -> str: + return "Unusual Activity (Contract: {0}, Type: {1}, Sentiment: {2}, Total Value: {3:.2f}, Total Size: {4}, Average Price: {5:.2f}, Ask at Execution: {6:.2f}, Bid at Execution: {7:.2f}, Underlying Price at Execution: {8:.2f}, Timestamp: {9})"\ + .format(self.contract, + self.activity_type, + self.sentiment, + self.total_value, + self.total_size, + self.average_price, + self.ask_price_at_execution, + self.bid_price_at_execution, + self.underlying_price_at_execution, + self.timestamp) + + def get_strike_price(self) -> float: + whole: int = (ord(self.contract[13]) - ord('0')) * 10000 + (ord(self.contract[14]) - ord('0')) * 1000 + (ord(self.contract[15]) - ord('0')) * 100 + (ord(self.contract[16]) - ord('0')) * 10 + (ord(self.contract[17]) - ord('0')) + part: float = float(ord(self.contract[18]) - ord('0')) * 0.1 + float(ord(self.contract[19]) - ord('0')) * 0.01 + float(ord(self.contract[20]) - ord('0')) * 0.001 + return float(whole) + part + + def is_put(self) -> bool: + return self.contract[12] == 'P' + + def is_call(self) -> bool: + return self.contract[12] == 'C' + + def get_expiration_date(self) -> time.struct_time: + return time.strptime(self.contract[6:12], "%y%m%d") + + def get_underlying_symbol(self) -> str: + return self.contract[0:6].rstrip('_') + +def _get_option_mask(use_on_trade: bool, use_on_quote: bool, use_on_refresh: bool, use_on_unusual_activity: bool) -> int: + mask: int = 0 + if use_on_trade: + mask |= 0b0001 + if use_on_quote: + mask |= 0b0010 + if use_on_refresh: + mask |= 0b0100 + if use_on_unusual_activity: + mask |= 0b1000 + return mask + +class _WebSocket(websocket.WebSocketApp): + def __init__(self, + ws_url: str, + ws_lock: threading.Lock, + worker_threads: list[threading.Thread], + get_channels: Callable[[None], set[tuple[str, bool]]], + get_token: Callable[[None], str], + get_url: Callable[[str], str], + use_on_trade: bool, + use_on_quote: bool, + use_on_refresh: bool, + use_on_ua: bool, + data_queue: queue.Queue): + super().__init__(ws_url, on_open=self.__on_open, on_close=self.__on_close, on_data=self.__on_data, on_error=self.__on_error) + self.__wsLock: threading.Lock = ws_lock + self.__worker_threads: list[threading.Thread] = worker_threads + self.__get_channels: Callable[[None], set[tuple[str, bool]]] = get_channels + self.__get_token: Callable[[None], str] = get_token + self.__get_url: Callable[[str], str] = get_url + self.__use_on_trade: bool = use_on_trade + self.__use_on_quote: bool = use_on_quote + self.__use_on_refresh: bool = use_on_refresh + self.__use_on_ua: bool = use_on_ua + self.__data_queue: queue.Queue = data_queue + self.__is_reconnecting: bool = False + self.__last_reset: float = time.time() + self.isReady: bool = False + + def __on_open(self, ws): + _log.info("Websocket - Connected") + self.__wsLock.acquire() + try: + self.isReady = True + self.__is_reconnecting = False + for worker in self.__worker_threads: + if not worker.is_alive(): + worker.start() + finally: + self.__wsLock.release() + if self.__get_channels and callable(self.__get_channels): + channels: set[str] = self.__get_channels() + if channels and (len(channels) > 0): + for symbol in channels: + symbol_bytes = bytes(symbol, 'utf-8') + message: bytes = bytearray(len(symbol_bytes) + 2) + message[0] = 74 # join code + message[1] = _get_option_mask(self.__use_on_trade, self.__use_on_quote, self.__use_on_refresh, self.__use_on_ua) + message[2:] = symbol_bytes + if self.isReady: + _log.info("Websocket - Joining channel: {0}".format(symbol)) + self.send_binary(message) + + def __try_reconnect(self) -> bool: + _log.info("Websocket - Reconnecting...") + if self.isReady: + return True + else: + with self.__wsLock: + self.__is_reconnecting = True + token: str = self.__get_token(None) + super().url = self.__get_url(token) + self.start() + return False + + def __on_close(self, ws, closeStatusCode, closeMsg): + self.__wsLock.acquire() + try: + if (not self.__is_reconnecting): + _log.info("Websocket - Closed - {0}: {1}".format(closeStatusCode, closeMsg)) + self.isReady = False + if (not _stopFlag.is_set()): + do_backoff(self.__try_reconnect) + finally: + self.__wsLock.release() + + def __on_error(self, ws, error): + _log.error("Websocket - Error - {0}".format(error)) + + def __on_data(self, ws, data, code, continueFlag): + if code == websocket.ABNF.OPCODE_BINARY: + with _dataMsgLock: + global _dataMsgCount + _dataMsgCount += 1 + self.__data_queue.put_nowait(data) + else: + _log.debug("Websocket - Message received") + with _txtMsgLock: + global _txtMsgCount + _txtMsgCount += 1 + _log.error("Error received: {0}".format(data)) + + def start(self): + super().run_forever(skip_utf8_validation=True) + # super().run_forever(ping_interval = 5, ping_timeout = 2, skip_utf8_validation = True) + + def stop(self): + super().close() + + def send(self, message: str): + super().send(message, websocket.ABNF.OPCODE_TEXT) + + def send_binary(self, message: bytes): + super().send(message, websocket.ABNF.OPCODE_BINARY) + + def reset(self): + self.__last_reset = time.time() + +class Config: + def __init__(self, api_key: str, provider: Providers, num_threads: int = 4, log_level: LogLevel = LogLevel.INFO, + manual_ip_address: str = None, symbols: set[str] = None, delayed: bool = False): + self.api_key: str = api_key + self.provider: Providers = provider + self.num_threads: int = num_threads + self.manual_ip_address: str = manual_ip_address + self.symbols: list[str] = symbols + self.log_level: LogLevel = log_level + self.delayed: bool = delayed + +def _transform_contract_to_new(contract: str) -> str: + if (len(contract) <= 9) or (contract.find('.') >= 9): + return contract + else: # this is of the old format and we need to translate it. ex: AAPL__220101C00140000, TSLA__221111P00195000 + symbol: str = contract[0:6].rstrip('_') + date: str = contract[6:12] + call_put: str = contract[12] + whole_price: str = contract[13:18].lstrip('0') + if whole_price == '': + whole_price = '0' + decimal_price: str = contract[18:] + if decimal_price[2] == '0': + decimal_price = decimal_price[0:2] + return "{symbol}_{date}{call_put}{whole_price}.{decimal_price}".format( + symbol=symbol, + date=date, + call_put=call_put, + whole_price=whole_price, + decimal_price=decimal_price) + +def _copy_to(src: list, dest: list, dest_index: int): + for i in range(0, len(src)): + dest[i + dest_index] = src[i] + +def _transform_contract_to_old(alternate_formatted_contract: bytes) -> str: + # Transform from server format to normal format + # From this: AAPL_201016C100.00 or ABC_201016C100.003 + # To this: AAPL__201016C00100000 or ABC___201016C00100003 + contract_chars: list = [ord('_'), ord('_'), ord('_'), ord('_'), ord('_'), ord('_'), ord('2'), ord('2'), ord('0'), ord('1'), ord('0'), ord('1'), ord('C'), ord('0'), ord('0'), ord('0'), ord('0'), ord('0'), ord('0'), ord('0'), ord('0')] + underscore_index: int = alternate_formatted_contract.find(ord('_')) + decimal_index: int = alternate_formatted_contract[9:].find(ord('.')) + 9 # ignore decimals in tickersymbol + _copy_to(alternate_formatted_contract[0:underscore_index], contract_chars, 0) # copy symbol + _copy_to(alternate_formatted_contract[underscore_index+1:underscore_index+7], contract_chars, 6) # copy date + _copy_to(alternate_formatted_contract[underscore_index+7:underscore_index+8], contract_chars, 12) # copy put / call + _copy_to(alternate_formatted_contract[underscore_index+8:decimal_index], contract_chars, 18 - (decimal_index - underscore_index - 8)) # whole number copy + _copy_to(alternate_formatted_contract[decimal_index+1:], contract_chars, 18) # decimal number copy + return bytes(contract_chars).decode('ascii') + +def _get_seconds_from_epoch_from_ticks(ticks: int) -> float: + return float(ticks) / 1_000_000_000.0 + +def _scale_value(value: int, scale_type: int) -> float: + match scale_type: + case 0x00: + return float(value) # divided by 1 + case 0x01: + return float(value) / 10.0 + case 0x02: + return float(value) / 100.0 + case 0x03: + return float(value) / 1_000.0 + case 0x04: + return float(value) / 10_000.0 + case 0x05: + return float(value) / 100_000.0 + case 0x06: + return float(value) / 1_000_000.0 + case 0x07: + return float(value) / 10_000_000.0 + case 0x08: + return float(value) / 100_000_000.0 + case 0x09: + return float(value) / 1_000_000_000.0 + case 0x0A: + return float(value) / 512.0 + case 0x0F: + return 0.0 + case _: + return float(value) # divided by 1 + +def _scale_uint64(value: int, scale_type: int) -> float: + if value == 18446744073709551615: + return _NAN + else: + return _scale_value(value, scale_type) + +def _scale_int32(value: int, scale_type: int) -> float: + if value == 2147483647 or value == -2147483648: + return _NAN + else: + return _scale_value(value, scale_type) + +def _thread_fn(index: int, data: queue.Queue, + on_trade: Callable[[OptionsTrade], None], + on_quote: Callable[[OptionsQuote], None] = None, + on_refresh: Callable[[OptionsRefresh], None] = None, + on_unusual_activity: Callable[[OptionsUnusualActivity], None] = None): + _log.debug("Starting worker thread {0}".format(index)) + while not _stopFlag.is_set(): + try: + datum: bytes = data.get(True, 1.0) + count: int = datum[0] + start_index: int = 1 + for _ in range(count): + msg_type: int = datum[start_index + 22] + if msg_type == 1: # Quote + message: bytes = datum[start_index:(start_index + _OPTIONS_QUOTE_MESSAGE_SIZE)] + # byte structure: + # contract length [0] + # contract [1-21] utf-8 string + # event type [22] uint8 + # price type [23] uint8 + # ask price [24-27] int32 + # ask size [28-31] uint32 + # bid price [32-35] int32 + # bid size [36-39] uint32 + # timestamp [40-47] uint64 + contract: str = _transform_contract_to_old(message[1:message[0]+1]) + ask_price: float = _scale_int32(struct.unpack_from(' 2: # Unusual Activity + message: bytes = datum[start_index:(start_index + _OPTIONS_UNUSUAL_ACTIVITY_MESSAGE_SIZE)] + # byte structure: + # contract length [0] uint8 + # contract [1-21] utf-8 string + # event type [22] uint8 + # sentiment type [23] uint8 + # price type [24] uint8 + # underlying price type [25] uint8 + # total value [26-33] uint64 + # total size [34-37] uint32 + # average price [38-41] int32 + # ask price at execution [42-45] int32 + # bid price at execution [46-49] int32 + # underlying price at execution [50-53] int32 + # timestamp [54-61] uint64 + contract: str = _transform_contract_to_old(message[1:message[0]+1]) + activity_type: OptionsUnusualActivityType = message[22] + sentiment: OptionsUnusualActivitySentiment = message[23] + total_value: float = _scale_uint64(struct.unpack_from(' 0): + self.__channels: set[str] = set((_transform_contract_to_new(symbol)) for symbol in config.symbols) + else: + self.__channels: set[str] = set() + self.__data: queue.Queue = queue.Queue() + self.__t_lock: threading.Lock = threading.Lock() + self.__ws_lock: threading.Lock = threading.Lock() + self.__worker_threads: list[threading.Thread] = [threading.Thread(None, + _thread_fn, + args=[i, self.__data, on_trade, on_quote, on_refresh, on_unusual_activity], + daemon=True) for i in range(config.num_threads)] + self.__socket_thread: threading.Thread = None + self.__is_started: bool = False + _log.setLevel(config.log_level) + + def __all_ready(self) -> bool: + self.__ws_lock.acquire() + ready: bool = True + try: + ready = (self.__webSocket is not None) and (self.__webSocket.isReady) + finally: + self.__ws_lock.release() + return ready + + def __get_websocket(self) -> _WebSocket: + return self.__webSocket + + def __get_auth_url(self) -> str: + if self.__provider == Providers.OPRA: + return "https://realtime-options.intrinio.com/auth?api_key=" + self.__apiKey + elif self.__provider == Providers.MANUAL: + return "http://" + self.__manualIP + "/auth?api_key=" + self.__apiKey + else: + raise ValueError("Provider not specified") + + def __get_web_socket_url(self, token: str) -> str: + delay: str = "&delayed=true" if self.__delayed else "" + if self.__provider == Providers.OPRA: + return "wss://realtime-options.intrinio.com/socket/websocket?vsn=1.0.0&token=" + token + delay + elif self.__provider == Providers.MANUAL: + return "ws://" + self.__manualIP + "/socket/websocket?vsn=1.0.0&token=" + token + delay + else: + raise ValueError("Provider not specified") + + def __try_set_token(self) -> bool: + _log.info("Authorizing...") + headers = {"Client-Information": "IntrinioOptionsPythonSDKv2.5"} + try: + response: requests.Response = requests.get(self.__get_auth_url(), headers=headers, timeout=1) + if response.status_code != 200: + _log.error( + "Authorization Failure (status code = {0}): The authorization key you provided is likely incorrect.".format( + response.status_code)) + return False + self.__token = (response.text, time.time()) + _log.info("Authorization successful.") + return True + except requests.exceptions.Timeout: + _log.error("Authorization Failure: The request timed out.") + return False + except requests.exceptions.ConnectionError as err: + _log.error("Authorization Failure: {0}".format(err)) + return False + + def __get_token(self) -> str: + self.__t_lock.acquire() + try: + if ((time.time() - self.__token[1]) > (60 * 60 * 24)): # 60sec/min * 60min/hr * 24hrs = 1 day + do_backoff(self.__try_set_token) + return self.__token[0] + finally: + self.__t_lock.release() + + def __get_channels(self) -> set[str]: + return self.__channels + + def __join(self, symbol: str): + transformed_symbol: str = _transform_contract_to_new(symbol) + if transformed_symbol not in self.__channels: + self.__channels.add(transformed_symbol) + symbol_bytes = bytes(transformed_symbol, 'utf-8') + message: bytes = bytearray(len(symbol_bytes)+2) + message[0] = 74 # join code + message[1] = _get_option_mask(self.__use_on_trade, self.__use_on_quote, self.__use_on_refresh, self.__use_on_unusual_activity) + message[2:] = symbol_bytes + if self.__webSocket.isReady: + _log.info("Websocket - Joining channel: {0}".format(transformed_symbol)) + self.__webSocket.send_binary(message) + + def __leave(self, symbol: str): + transformed_symbol: str = _transform_contract_to_new(symbol) + if transformed_symbol in self.__channels: + self.__channels.remove(transformed_symbol) + symbol_bytes = bytes(transformed_symbol, 'utf-8') + message: bytes = bytearray(len(symbol_bytes) + 2) + message[0] = 76 # leave code + message[1] = _get_option_mask(self.__use_on_trade, self.__use_on_quote, self.__use_on_refresh, self.__use_on_unusual_activity) + message[2:] = symbol_bytes + if self.__webSocket.isReady: + _log.info("Websocket - Leaving channel: {0}".format(transformed_symbol)) + self.__webSocket.send_binary(message) + + def join(self, *symbols): + if self.__is_started: + while not self.__all_ready(): + time.sleep(1.0) + for (symbol) in symbols: + self.__join(symbol) + + def join_firehose(self): + if "$FIREHOSE" in self.__channels: + _log.warn("This client has already joined the firehose channel") + else: + if self.__is_started: + while not self.__all_ready(): + time.sleep(1.0) + self.__join("$FIREHOSE") + + def leave(self, *symbols): + if not symbols: + _log.info("Leaving all channels") + channels: set[str] = self.__channels.copy() + for (symbol) in channels: + self.__leave(symbol) + symbol_set: set[str] = set(symbols) + for sym in symbol_set: + self.__leave(sym) + + def leave_firehose(self): + if "$FIREHOSE" in self.__channels: + self.__leave("$FIREHOSE") + + def __socket_start_fn(self, token: str): + _log.info("Websocket - Connecting...") + ws_url: str = self.__get_web_socket_url(token) + self.__webSocket = _WebSocket(ws_url, + self.__ws_lock, + self.__worker_threads, + self.__get_channels, + self.__get_token, + self.__get_web_socket_url, + self.__use_on_trade, + self.__use_on_quote, + self.__use_on_refresh, + self.__use_on_unusual_activity, + self.__data) + self.__webSocket.start() + + def start(self): + if (not (self.__use_on_trade or self.__use_on_quote or self.__use_on_refresh or self.__use_on_unusual_activity)): + raise ValueError("You must set at least one callback method before starting client") + token: str = self.__get_token() + self.__ws_lock.acquire() + try: + self.__socket_thread = threading.Thread = threading.Thread(None, self.__socket_start_fn, args=[token], daemon=True) + finally: + self.__ws_lock.release() + self.__socket_thread.start() + self.__is_started = True + + def stop(self): + _log.info("Stopping...") + if len(self.__channels) > 0: + self.leave() + time.sleep(1.0) + self.__ws_lock.acquire() + try: + self.__webSocket.isReady = False + finally: + self.__ws_lock.release() + _stopFlag.set() + self.__webSocket.stop() + for i in range(len(self.__worker_threads)): + self.__worker_threads[i].join() + _log.debug("Worker thread {0} joined".format(i)) + self.__socket_thread.join() + _log.debug("Socket thread joined") + _log.info("Stopped") + + def get_stats(self) -> tuple[int, int, int]: + return _dataMsgCount, _txtMsgCount, self.__data.qsize()