-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcryptomongo.py
More file actions
116 lines (85 loc) · 3.67 KB
/
cryptomongo.py
File metadata and controls
116 lines (85 loc) · 3.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import asyncio
import configparser
#from decimal import Decimal
from bson.decimal128 import Decimal128 as Decimal
import functools
#import json
import simplejson as json
import logging
from pprint import pprint
import signal
import sys
from pymongo import MongoClient
import websockets
from websockets import WebSocketClientProtocol
config_path = 'config/settings.conf'
config = configparser.ConfigParser()
config.read(config_path)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
## Standard Functions ##
def build_subscription(market_list):
markets = [
{"streamSubscription": {"resource": f"markets:{mkt.strip(' ')}:trades"}} \
for mkt in market_list.split(',')]
constructed_message = {'subscribe': {'subscriptions': markets}}
return constructed_message
def format_message(message):
if 'marketUpdate' in message:
for item in message['marketUpdate']['market']:
message['marketUpdate']['market'][item] = int(
message['marketUpdate']['market'][item])
trades_converted = []
for trade in message['marketUpdate']['tradesUpdate']['trades']:
try:
trade_reformatted = dict(
marketId=message['marketUpdate']['market']['marketId'],
amount=Decimal(trade['amountStr']),
# externalId=trade['externalId'],
orderSide=trade['orderSide'],
price=Decimal(trade['priceStr']),
timestamp=int(trade['timestamp']),
timestampNano=int(trade['timestampNano'])
)
trades_converted.append(trade_reformatted)
except Exception as e:
# print(message['marketUpdate']['market']['marketId'])
# pprint(trade)
logger.exception('Exception while constructing trade info: {}'.format(e))
message['marketUpdate']['tradesUpdate']['trades'] = trades_converted
# pprint(message)
return message
def signal_exit(signame, loop):
print("Got signal %s: exit" % signame)
logger.debug('Received exit signal. Cancelling tasks.')
for task in asyncio.Task.all_tasks():
task.cancel()
## AsyncIO Functions ##
async def consumer_handler(websocket: WebSocketClientProtocol) -> None:
mongo_client = MongoClient(
f"mongodb://{config['mongodb']['host']}:{config['mongodb']['port']}/")
trade_collection = mongo_client[
config['mongodb']['database']][config['mongodb']['collection']]
async for message in websocket:
try:
message_json = format_message(json.loads(message))
# pprint(message_json)
doc_id = trade_collection.insert_one(message_json).inserted_id
logger.info('doc_id: {}'.format(doc_id))
except asyncio.CancelledError:
logger.debug('CancelledError raised.')
break
async def consume(api_key: str, sub_msg: str) -> None:
websocket_resource_url = f'wss://stream.cryptowat.ch/connect?apikey={api_key}'
async with websockets.connect(websocket_resource_url) as websocket:
await websocket.send(json.dumps(sub_msg))
await consumer_handler(websocket)
if __name__ == '__main__':
subscription_message = build_subscription(config['cryptowatch']['market_ids'])
loop = asyncio.get_event_loop()
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(
getattr(signal, signame),
functools.partial(signal_exit, signame, loop))
loop.run_until_complete(
consume(api_key=config['cryptowatch']['api'], sub_msg=subscription_message))