Skip to content

Commit 1a24064

Browse files
authored
DPL MCP: a server to investigate running trains on hyperloop (#15451)
1 parent bb4dfa3 commit 1a24064

3 files changed

Lines changed: 280 additions & 0 deletions

File tree

Binary file not shown.
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2019-2026 CERN and copyright holders of ALICE O2.
3+
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
4+
# All rights not expressly granted are reserved.
5+
#
6+
# This software is distributed under the terms of the GNU General Public
7+
# License v3 (GPL Version 3), copied verbatim in the file "COPYING".
8+
#
9+
# In applying this license CERN does not waive the privileges and immunities
10+
# granted to it by virtue of its status as an Intergovernmental Organization
11+
# or submit itself to any jurisdiction.
12+
"""AliHyperloop monitoring MCP server.
13+
14+
Exposes a small set of read-only tools to inspect ongoing Hyperloop train
15+
runs, their resource consumption, and per-wagon breakdowns. All data is
16+
fetched on demand (no polling, no bulk scraping).
17+
18+
The server talks to the Hyperloop REST API through a local authenticating
19+
proxy (ccdb_proxy.py) that handles GRID certificate auth.
20+
21+
Usage
22+
-----
23+
python3 hyperloop_server.py [--proxy URL] [--token TOKEN]
24+
25+
Environment variables
26+
HYPERLOOP_PROXY proxy base URL (default: http://localhost:8888)
27+
HYPERLOOP_TOKEN bearer token (default: foo-baz)
28+
"""
29+
30+
from __future__ import annotations
31+
32+
import asyncio
33+
import json
34+
import os
35+
import sys
36+
37+
import httpx
38+
from mcp.server.fastmcp import FastMCP
39+
40+
mcp = FastMCP("hyperloop")
41+
42+
PROXY = os.environ.get("HYPERLOOP_PROXY", "http://localhost:8888")
43+
TOKEN = os.environ.get("HYPERLOOP_TOKEN", "foo-baz")
44+
API = f"{PROXY}/alihyperloop-data"
45+
46+
47+
def _headers() -> dict[str, str]:
48+
return {"Authorization": f"Bearer {TOKEN}"}
49+
50+
51+
async def _get(path: str, params: dict | None = None) -> any:
52+
hdrs = _headers()
53+
hdrs["Accept-Encoding"] = "identity"
54+
async with httpx.AsyncClient(timeout=30) as client:
55+
r = await client.get(f"{API}/{path}", params=params, headers=hdrs)
56+
r.raise_for_status()
57+
return r.json()
58+
59+
60+
def _fmt_bytes(n: float | None) -> str:
61+
if n is None:
62+
return "n/a"
63+
for unit in ("B", "KB", "MB", "GB", "TB"):
64+
if abs(n) < 1024:
65+
return f"{n:.1f} {unit}"
66+
n /= 1024
67+
return f"{n:.1f} PB"
68+
69+
70+
def _fmt_time(seconds: float | None) -> str:
71+
if seconds is None:
72+
return "n/a"
73+
if seconds < 60:
74+
return f"{seconds:.0f}s"
75+
if seconds < 3600:
76+
return f"{seconds / 60:.1f}m"
77+
return f"{seconds / 3600:.1f}h"
78+
79+
80+
def _parse_job_status(raw: str | None) -> dict:
81+
if not raw:
82+
return {}
83+
js = json.loads(raw) if isinstance(raw, str) else raw
84+
done = sum(v for k, v in js.items() if k.startswith("DONE"))
85+
total = js.get("TOTAL", 0)
86+
errors = sum(v for k, v in js.items()
87+
if k.startswith("ERROR") or k.startswith("EXPIRED")
88+
or k.startswith("FAILED") or k.startswith("KILLED"))
89+
active = sum(v for k, v in js.items()
90+
if k.startswith("R") or k.startswith("A") or k.startswith("S"))
91+
wait = total - done - errors - active
92+
return {"total": total, "done": done, "errors": errors,
93+
"active": active, "wait": max(0, wait)}
94+
95+
96+
@mcp.tool()
97+
async def list_ongoing_trains() -> str:
98+
"""List all currently running / ready Hyperloop train runs.
99+
100+
Returns a compact table with train ID, dataset, state, job progress,
101+
error rate, and package tag. One API call.
102+
"""
103+
trains = await _get("trains/all-trains.jsp", {"state": "ready"})
104+
if not trains:
105+
return "No ongoing trains."
106+
107+
lines = []
108+
lines.append(f"{'ID':>8} {'State':<11} {'Done/Total':>12} {'Err%':>5} "
109+
f"{'Dataset':<40} {'Package'}")
110+
lines.append("-" * 120)
111+
112+
for t in sorted(trains, key=lambda x: _parse_job_status(
113+
x.get("job_status")).get("total", 0), reverse=True):
114+
js = _parse_job_status(t.get("job_status"))
115+
total = js.get("total", 0)
116+
done = js.get("done", 0)
117+
errors = js.get("errors", 0)
118+
err_pct = f"{100 * errors / total:.1f}" if total > 0 else "n/a"
119+
pkg = (t.get("package_tag") or "").replace("O2Physics::", "")
120+
ds = t.get("dataset_name", "")
121+
if len(ds) > 40:
122+
ds = ds[:37] + "..."
123+
lines.append(
124+
f"{t['id']:>8} {t.get('state', '?'):<11} "
125+
f"{done:>6}/{total:<6} {err_pct:>5} "
126+
f"{ds:<40} {pkg}"
127+
)
128+
129+
lines.append(f"\nTotal: {len(trains)} trains")
130+
return "\n".join(lines)
131+
132+
133+
@mcp.tool()
134+
async def train_detail(train_id: int) -> str:
135+
"""Get resource metrics for a specific train run.
136+
137+
Shows CPU time, wall time, memory (PSS), throughput, input/output
138+
sizes, target, and merge status. One API call.
139+
"""
140+
t = await _get("trains/train.jsp", {"train_id": train_id, "type": "ready"})
141+
142+
lines = [f"Train {t['id']}: {t.get('dataset_name', '?')}"]
143+
lines.append(f" State: {t.get('state')}")
144+
lines.append(f" Package: {t.get('package_tag')}")
145+
lines.append(f" Target: {t.get('target')}")
146+
lines.append(f" CPU cores: {t.get('cpu_cores')}")
147+
lines.append(f" CPU time: {_fmt_time(t.get('cpu_time'))}")
148+
lines.append(f" Wall time: {_fmt_time(t.get('wall_time'))}")
149+
lines.append(f" PSS memory: {_fmt_bytes(t.get('mem_pss'))} avg, "
150+
f"{_fmt_bytes(t.get('mem_pss_max'))} max")
151+
lines.append(f" Private mem: {_fmt_bytes(t.get('mem_private'))} avg, "
152+
f"{_fmt_bytes(t.get('mem_private_max'))} max")
153+
lines.append(f" Input size: {_fmt_bytes(t.get('input_size'))}")
154+
lines.append(f" Output size: {_fmt_bytes(t.get('output_size'))}")
155+
156+
throughput = t.get("estimated_throughput")
157+
if throughput:
158+
lines.append(f" Throughput: {_fmt_bytes(throughput)}/s")
159+
160+
events = t.get("events")
161+
if events and events > 0:
162+
lines.append(f" Events: {events}")
163+
164+
lines.append(f" Created: {t.get('created')}")
165+
lines.append(f" Username: {t.get('username')}")
166+
167+
return "\n".join(lines)
168+
169+
170+
@mcp.tool()
171+
async def wagon_stats(train_id: int) -> str:
172+
"""Get per-wagon CPU and memory breakdown for a train.
173+
174+
Fetches wagon IDs from the train, then retrieves grid statistics
175+
for each wagon. Typically 10-20 wagons, one API call each.
176+
"""
177+
# First get train detail for dataset_id and wagons_timestamp
178+
t = await _get("trains/train.jsp", {"train_id": train_id, "type": "ready"})
179+
dataset_id = t.get("dataset_id")
180+
wagons_ts = t.get("wagons_timestamp") or t.get("dataset_timestamp")
181+
182+
if not dataset_id or not wagons_ts:
183+
return f"Cannot determine dataset/timestamp for train {train_id}"
184+
185+
# Get wagon IDs
186+
wagons_data = await _get("trains/wagons_derived_data.jsp",
187+
{"train_id": train_id,
188+
"wagons_timestamp": wagons_ts})
189+
wagon_ids = list(wagons_data.keys()) if isinstance(wagons_data, dict) else []
190+
if not wagon_ids:
191+
return f"No wagons found for train {train_id}"
192+
193+
# Fetch stats for each wagon concurrently
194+
async def fetch_one(wid: str) -> dict | None:
195+
try:
196+
stats = await _get("analysis/wagon/wagon-dataset-grid-statistics.jsp",
197+
{"wagon_id": wid, "dataset_id": dataset_id})
198+
if isinstance(stats, dict) and str(train_id) in stats:
199+
return stats[str(train_id)]
200+
except Exception:
201+
pass
202+
return None
203+
204+
results = await asyncio.gather(*(fetch_one(wid) for wid in wagon_ids))
205+
206+
rows = []
207+
for wid, stat in zip(wagon_ids, results):
208+
if stat is None:
209+
continue
210+
rows.append(stat)
211+
212+
if not rows:
213+
return f"No wagon statistics available for train {train_id}"
214+
215+
# Sort by CPU time descending
216+
rows.sort(key=lambda r: r.get("cpu_time") or 0, reverse=True)
217+
218+
lines = [f"Wagon stats for train {train_id} "
219+
f"({t.get('dataset_name', '?')}), {len(rows)} wagons:\n"]
220+
lines.append(f"{'Wagon':<35} {'CPU time':>10} {'PSS avg':>10} "
221+
f"{'PSS max':>10} {'Throughput':>12} {'Done%':>6}")
222+
lines.append("-" * 90)
223+
224+
total_cpu = 0
225+
for r in rows:
226+
name = r.get("wagon_name", f"id={r.get('wagon_id', '?')}")
227+
if len(name) > 35:
228+
name = name[:32] + "..."
229+
cpu = r.get("cpu_time") or 0
230+
total_cpu += cpu
231+
pss_avg = _fmt_bytes(r.get("mem_pss"))
232+
pss_max = _fmt_bytes(r.get("mem_pss_max"))
233+
tp = _fmt_bytes(r.get("throughput")) + "/s" if r.get("throughput") else "n/a"
234+
pct = r.get("percent_done")
235+
pct_str = f"{pct}%" if pct is not None else "n/a"
236+
lines.append(f"{name:<35} {_fmt_time(cpu / 1000):>10} {pss_avg:>10} "
237+
f"{pss_max:>10} {tp:>12} {pct_str:>6}")
238+
239+
lines.append("-" * 90)
240+
lines.append(f"Total CPU: {_fmt_time(total_cpu / 1000)}")
241+
return "\n".join(lines)
242+
243+
244+
def main():
245+
import argparse
246+
global PROXY, TOKEN, API
247+
248+
parser = argparse.ArgumentParser(description="AliHyperloop MCP server")
249+
parser.add_argument("--proxy", default=PROXY, help="Proxy base URL")
250+
parser.add_argument("--token", default=TOKEN, help="Bearer token")
251+
args = parser.parse_args()
252+
253+
PROXY = args.proxy
254+
TOKEN = args.token
255+
API = f"{PROXY}/alihyperloop-data"
256+
257+
mcp.run(transport="stdio")
258+
259+
260+
if __name__ == "__main__":
261+
main()
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[build-system]
2+
requires = ["hatchling"]
3+
build-backend = "hatchling.build"
4+
5+
[project]
6+
name = "hyperloop-server"
7+
version = "0.1.0"
8+
description = "MCP server for monitoring AliHyperloop train runs"
9+
requires-python = ">=3.11"
10+
dependencies = [
11+
"mcp>=1.0.0",
12+
"httpx>=0.27.0",
13+
]
14+
15+
[project.scripts]
16+
hyperloop-server = "hyperloop_server:main"
17+
18+
[tool.hatch.build.targets.wheel]
19+
include = ["hyperloop_server.py"]

0 commit comments

Comments
 (0)