-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathperf_analysis.py
More file actions
178 lines (141 loc) · 5.66 KB
/
perf_analysis.py
File metadata and controls
178 lines (141 loc) · 5.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/env python3
"""
Performance analysis script for Order Book Processor
Analyzes output patterns and message processing statistics
"""
import sys
import re
import time
from collections import defaultdict
class OrderBookAnalyzer:
def __init__(self):
self.snapshot_count = 0
self.symbols = defaultdict(int)
self.bid_levels = []
self.ask_levels = []
self.total_bid_volume = 0
self.total_ask_volume = 0
self.sequences = []
def parse_snapshot(self, line):
"""Parse a snapshot line"""
# Format: seqNo, symbol, [(price, vol), ...], [(price, vol), ...]
match = re.match(r'(\d+),\s*(\w+),\s*\[(.*?)\],\s*\[(.*?)\]', line)
if not match:
return None
seq_no = int(match.group(1))
symbol = match.group(2)
bids_str = match.group(3)
asks_str = match.group(4)
# Parse bids
bids = []
if bids_str.strip():
for bid in re.findall(r'\((\d+),\s*(\d+)\)', bids_str):
bids.append((int(bid[0]), int(bid[1])))
# Parse asks
asks = []
if asks_str.strip():
for ask in re.findall(r'\((\d+),\s*(\d+)\)', asks_str):
asks.append((int(ask[0]), int(ask[1])))
return {
'seq_no': seq_no,
'symbol': symbol,
'bids': bids,
'asks': asks
}
def analyze_snapshot(self, snapshot):
"""Analyze a single snapshot"""
self.snapshot_count += 1
self.symbols[snapshot['symbol']] += 1
self.sequences.append(snapshot['seq_no'])
bid_count = len(snapshot['bids'])
ask_count = len(snapshot['asks'])
self.bid_levels.append(bid_count)
self.ask_levels.append(ask_count)
# Calculate volumes
bid_vol = sum(vol for _, vol in snapshot['bids'])
ask_vol = sum(vol for _, vol in snapshot['asks'])
self.total_bid_volume += bid_vol
self.total_ask_volume += ask_vol
return bid_count, ask_count, bid_vol, ask_vol
def print_statistics(self):
"""Print analysis statistics"""
print("\n" + "="*60)
print("ORDER BOOK ANALYSIS RESULTS")
print("="*60)
print(f"\nTotal Snapshots: {self.snapshot_count}")
print(f"\nSymbols Processed:")
for symbol, count in sorted(self.symbols.items()):
print(f" {symbol}: {count} snapshots")
if self.sequences:
print(f"\nSequence Numbers:")
print(f" First: {min(self.sequences)}")
print(f" Last: {max(self.sequences)}")
print(f" Range: {max(self.sequences) - min(self.sequences) + 1}")
if self.bid_levels:
print(f"\nBid Levels:")
print(f" Average: {sum(self.bid_levels) / len(self.bid_levels):.2f}")
print(f" Max: {max(self.bid_levels)}")
print(f" Min: {min(self.bid_levels)}")
if self.ask_levels:
print(f"\nAsk Levels:")
print(f" Average: {sum(self.ask_levels) / len(self.ask_levels):.2f}")
print(f" Max: {max(self.ask_levels)}")
print(f" Min: {min(self.ask_levels)}")
print(f"\nVolume Statistics:")
if self.snapshot_count > 0:
print(f" Avg Bid Volume per Snapshot: {self.total_bid_volume / self.snapshot_count:.2f}")
print(f" Avg Ask Volume per Snapshot: {self.total_ask_volume / self.snapshot_count:.2f}")
print("="*60 + "\n")
def validate_ordering(self, snapshot):
"""Validate that bids/asks are correctly ordered"""
bids = snapshot['bids']
asks = snapshot['asks']
# Check bids are descending
for i in range(len(bids) - 1):
if bids[i][0] < bids[i+1][0]:
return False, f"Bids not in descending order at seq {snapshot['seq_no']}"
# Check asks are ascending
for i in range(len(asks) - 1):
if asks[i][0] > asks[i+1][0]:
return False, f"Asks not in ascending order at seq {snapshot['seq_no']}"
return True, "OK"
def main():
if len(sys.argv) > 1:
# Read from file
with open(sys.argv[1], 'r') as f:
lines = f.readlines()
else:
# Read from stdin
lines = sys.stdin.readlines()
analyzer = OrderBookAnalyzer()
errors = []
print("Analyzing order book snapshots...")
start_time = time.time()
for line_num, line in enumerate(lines, 1):
line = line.strip()
if not line:
continue
snapshot = analyzer.parse_snapshot(line)
if snapshot is None:
errors.append(f"Line {line_num}: Failed to parse")
continue
analyzer.analyze_snapshot(snapshot)
# Validate ordering
valid, msg = analyzer.validate_ordering(snapshot)
if not valid:
errors.append(f"Line {line_num}: {msg}")
elapsed = time.time() - start_time
# Print results
analyzer.print_statistics()
if errors:
print("\n⚠️ ERRORS DETECTED:")
for error in errors[:10]: # Show first 10 errors
print(f" {error}")
if len(errors) > 10:
print(f" ... and {len(errors) - 10} more errors")
else:
print("✓ All snapshots valid and correctly ordered!")
print(f"\nAnalysis completed in {elapsed:.3f} seconds")
print(f"Processing rate: {analyzer.snapshot_count / elapsed:.0f} snapshots/sec")
if __name__ == "__main__":
main()