-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcb_socket.py
More file actions
216 lines (169 loc) · 8.29 KB
/
cb_socket.py
File metadata and controls
216 lines (169 loc) · 8.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import threading
import websocket
import json
import pandas as pd
import time
import itertools
import collections
# Class which holds each coin/token's data
# This class tracks the entire L2 Order Book for both the bids/asks side
# It takes advantage of the average O(1) for search/addition/deletion
# This results in a very fast class which operates well under hardware constraints
class Coin:
# Class takes a name, such as ETH-USD, which is used to retrieve that coin's specific data
def __init__(self, name, limit):
self.name = name
self.limit = limit
self.bids = {}
self.asks = {}
# This class must always be tied to the on_message of the CbSocket class
# It requires a websocket message and then it's decided what type of message it is/what to do with it
def set_dicts(self, result):
# This message is only received one per coin
if result['type'] == 'snapshot':
# Add each row in the snapshot of the 'bids' side to the object
for price, size in result['bids']:
self.bids[float(price)] = {'side': 'bids', 'size': float(size)}
# Add each row in the snapshot of the 'asks side to the object
for price, size in result['asks']:
self.asks[float(price)] = {'side': 'asks', 'size': float(size)}
# These messages are streamed following the snapshot
elif result['type'] == 'l2update':
# Adjust the self.bids dictionary based on the message
if result['changes'][0][0] == 'buy':
# Check if the price level is already in the dictionary
if float(result['changes'][0][1]) in self.bids:
# This checks if the size in the message is equal to zero
# According to CB API docs, this indicates it can be removed
# https://docs.pro.coinbase.com/#the-level2-channel
if float(result['changes'][0][2]) == 0.0:
# Delete the entry which corresponds to the size above
del self.bids[float(result['changes'][0][1])]
# Don't delete the entry, instead update its size
else:
self.bids[float(result['changes'][0][1])]['size'] = float(result['changes'][0][2])
# No entry found in the dictionary, add it to the bottom
else:
self.bids[float(result['changes'][0][1])] = {'side': 'bids', 'size': float(result['changes'][0][2])}
# All of the comments for the sell side are the same as the buy side above
elif result['changes'][0][0] == 'sell':
if float(result['changes'][0][1]) in self.asks:
if float(result['changes'][0][2]) == 0.0:
del self.asks[float(result['changes'][0][1])]
else:
self.asks[float(result['changes'][0][1])]['size'] = float(result['changes'][0][2])
else:
self.asks[float(result['changes'][0][1])] = {'side': 'asks', 'size': float(result['changes'][0][2])}
# This function returns a pandas DataFrame of shape 500 with price, side and size as the columns -
def get_df(self):
# Request both the self.bids and self.asks dictionary be sorted
bids_df = self.get_bids()
asks_df = self.get_asks()
# Return both dataframes
return bids_df, asks_df
# Return a Pandas DataFrame of shape 500 with only the bids side of the object
def get_bids(self):
bids = self.bid_sort()
# Create the new bids dataframe, limited to 500 entries, reset the index and insert the index as a new column
bids_df = pd.DataFrame.from_dict(data=dict(itertools.islice(bids.items(), self.limit)), orient='index')
bids_df.reset_index(level=0, inplace=True)
bids_df = bids_df.rename(columns={'index': 'price'})
return bids_df
# Return a Pandas DataFrame of shape 500 with only the asks side of the object
def get_asks(self):
asks = self.ask_sort()
# Create the new asks dataframe, limited to 500 entries, reset the index and insert the index as a new column
asks_df = pd.DataFrame.from_dict(data=dict(itertools.islice(asks.items(), self.limit)), orient='index')
asks_df.reset_index(level=0, inplace=True)
asks_df = asks_df.rename(columns={'index': 'price'})
return asks_df
# Sort the self.asks dictionary into an OrderedDict
def bid_sort(self):
ordered_dict = collections.OrderedDict(sorted(self.bids.items(), reverse=True))
return ordered_dict
# Sort the self.bids dictionary into an OrderedDict
def ask_sort(self):
ordered_dict = collections.OrderedDict(sorted(self.asks.items()))
return ordered_dict
# Return the name of the object such as ETH-USD
def get_name(self):
return self.name
# Websocket class which aggregates data from the level2 websocket on Coinbase API
class CbSocket:
def __init__(self, limit):
# this socket utilizes lambda functions to assign the on_message and on_open to local object functions
self.socket = websocket.WebSocketApp("wss://ws-feed.pro.coinbase.com",
on_message=lambda ws, msg: self.on_message(ws, msg),
on_open=lambda ws: self.on_open(ws),
on_error=lambda ws, err: self.on_error(ws, err),)
self.limit = limit
# Dictionary which creates Coin objects for each message type which can be received
self.coins = {
'ETH-USD': Coin('ETH-USD', self.limit),
'BTC-USD': Coin('BTC-USD', self.limit),
'ADA-USD': Coin('ADA-USD', self.limit),
'SOL-USD': Coin('SOL-USD', self.limit),
'XTZ-USD': Coin('XTZ-USD', self.limit),
'ALGO-USD': Coin('ALGO-USD', self.limit),
'ATOM-USD': Coin('ATOM-USD', self.limit),
'MATIC-USD': Coin('MATIC-USD', self.limit),
'DOT-USD': Coin('DOT-USD', self.limit),
'AAVE-USD': Coin('AAVE-USD', self.limit)
}
'''
dict format --
{
price:
{'side': 'bids', 'size': 0.1111},
}
'''
# Returns the selected coins bids/asks dataframes, thus the request must be bids, asks = book.get_df()
# Accepts an _id of type String which picks the correct object from the above dictionary of Coin objects
def get_df(self, _id: str):
book = self.coins.get(_id)
return book.get_df()
# Request only the asks portion of the object
def get_asks(self, _id: str):
book = self.coins.get(_id)
return book.get_asks()
# Request only the bids portion of the object
def get_bids(self, _id: str):
book = self.coins.get(_id)
return book.get_bids()
# Receives the messages from the websocket
def on_message(self, ws, message):
result = json.loads(message) # Load the JSON message sent
# Select the proper coin based on message type and pass the message to that object
if result['product_id'] in self.coins:
book = self.coins.get(result['product_id'])
book.set_dicts(result)
else:
print('key not found')
# Initial handshake with CB websocket which requires a subscribe message
# More information can be found here: https://docs.pro.coinbase.com/#subscribe
def on_open(self, ws):
ws.send(open('subscribe.json').read())
def on_error(self, ws, err):
print('Websocket error: ', err)
# Run this websocket worker in a separate thread forever
def run(self):
t_run = threading.Thread(target=self.socket.run_forever)
t_run.start()
# Main function
if __name__ == "__main__":
new = CbSocket(500)
new.run() # Runs the websocket worker in separate thread
# Prints coin dataframes every second
while True:
bids, asks = new.get_df('ETH-USD')
print(bids)
time.sleep(1)
bids, asks = new.get_df('BTC-USD')
print(bids)
time.sleep(1)
bids, asks = new.get_df('DOT-USD')
print(bids)
time.sleep(1)
bids, asks = new.get_df('ADA-USD')
print(bids)
time.sleep(1)