-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsystem_state.py
More file actions
174 lines (153 loc) · 6.27 KB
/
system_state.py
File metadata and controls
174 lines (153 loc) · 6.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""Thread-safe snapshot for the live monitoring dashboard (Chapter 11).
Updated by :class:`engine.TradingEngine` on each tick and candle close.
When the engine is not running, REST/WebSocket fall back to database-backed data.
"""
from __future__ import annotations
import json
import os
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
_lock = threading.RLock()
_engine_running = False
_last_tick_price: Optional[float] = None
_last_tick_utc: Optional[str] = None
_rsi: Optional[float] = None
_trix: Optional[float] = None
_last_candle: Optional[Dict[str, Any]] = None
_positions: List[Dict[str, Any]] = []
_daily_trade_count: int = 0
_unrealized_total: float = 0.0
_paper_equity: Optional[float] = None
_micro: Optional[Dict[str, Any]] = None
def _snapshot_file_path() -> Path:
"""
Process-shared snapshot path.
Why: the engine and dashboard often run in separate processes, so in-memory
module state isn't shared. Persisting the latest snapshot makes the
dashboard reflect the true engine status.
"""
# Allow override for Docker / custom deployments.
p = os.environ.get("SYSTEM_STATE_PATH", "").strip()
if p:
return Path(p).expanduser().resolve()
return (Path(__file__).resolve().parent / "data" / "live_snapshot.json").resolve()
def _atomic_write_json(path: Path, payload: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
data = json.dumps(payload, ensure_ascii=False, separators=(",", ":"), default=str)
tmp.write_text(data, encoding="utf-8")
tmp.replace(path)
def _persist_snapshot_locked() -> None:
"""Write the current snapshot to disk. Caller must hold `_lock`."""
p = _snapshot_file_path()
payload = {
"engine_running": _engine_running,
"last_tick_price": _last_tick_price,
"last_tick_utc": _last_tick_utc,
"rsi": _rsi,
"trix": _trix,
"last_candle": _last_candle,
"positions": list(_positions),
"daily_trade_count": _daily_trade_count,
"unrealized_total": _unrealized_total,
"paper_equity": _paper_equity,
"micro": _micro,
"updated_utc": datetime.now(timezone.utc).isoformat(),
}
try:
_atomic_write_json(p, payload)
except Exception:
# Persistence is best-effort; dashboard can still work in-process.
return
def _load_persisted_snapshot() -> Optional[Dict[str, Any]]:
p = _snapshot_file_path()
if not p.exists():
return None
try:
raw = p.read_text(encoding="utf-8")
data = json.loads(raw)
if not isinstance(data, dict):
return None
return data
except Exception:
return None
def set_engine_running(running: bool) -> None:
global _engine_running
with _lock:
_engine_running = bool(running)
_persist_snapshot_locked()
def publish_from_engine(
*,
price: Optional[float],
tick_utc: Optional[datetime],
rsi: Optional[float],
trix: Optional[float],
candle: Optional[Dict[str, Any]],
positions: List[Dict[str, Any]],
daily_trade_count: int,
unrealized_total: float,
paper_equity: Optional[float] = None,
micro: Optional[Dict[str, Any]] = None,
) -> None:
global _last_tick_price, _last_tick_utc, _rsi, _trix, _last_candle, _micro
global _positions, _daily_trade_count, _unrealized_total, _paper_equity
with _lock:
if price is not None:
_last_tick_price = float(price)
if tick_utc is not None:
_last_tick_utc = tick_utc.astimezone(timezone.utc).isoformat()
_rsi = float(rsi) if rsi is not None else None
_trix = float(trix) if trix is not None else None
_last_candle = candle
_positions = list(positions)
_daily_trade_count = int(daily_trade_count)
_unrealized_total = float(unrealized_total)
_paper_equity = paper_equity
_micro = dict(micro) if micro is not None else None
_persist_snapshot_locked()
def snapshot() -> Dict[str, Any]:
with _lock:
local = {
"engine_running": _engine_running,
"last_tick_price": _last_tick_price,
"last_tick_utc": _last_tick_utc,
"rsi": _rsi,
"trix": _trix,
"last_candle": _last_candle.copy() if _last_candle else None,
"positions": list(_positions),
"daily_trade_count": _daily_trade_count,
"unrealized_total": _unrealized_total,
"paper_equity": _paper_equity,
"micro": _micro.copy() if _micro else None,
}
# If this process isn't the engine, try the persisted snapshot so the
# dashboard reflects a separately running engine.
if not local["engine_running"]:
ext = _load_persisted_snapshot()
if ext and ext.get("engine_running"):
# Only trust reasonably recent updates (avoid stale "true").
ts_s = ext.get("updated_utc") or ext.get("last_tick_utc")
try:
t = datetime.fromisoformat(str(ts_s).replace("Z", "+00:00"))
if t.tzinfo is None:
t = t.replace(tzinfo=timezone.utc)
age = (datetime.now(timezone.utc) - t.astimezone(timezone.utc)).total_seconds()
if age < 120.0:
return {
"engine_running": bool(ext.get("engine_running", False)),
"last_tick_price": ext.get("last_tick_price"),
"last_tick_utc": ext.get("last_tick_utc"),
"rsi": ext.get("rsi"),
"trix": ext.get("trix"),
"last_candle": ext.get("last_candle"),
"positions": list(ext.get("positions") or []),
"daily_trade_count": int(ext.get("daily_trade_count") or 0),
"unrealized_total": float(ext.get("unrealized_total") or 0.0),
"paper_equity": ext.get("paper_equity"),
"micro": ext.get("micro"),
}
except Exception:
pass
return local