Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Framework/Core/scripts/hyperloop-perf-server/hl_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ async def fetch_bytes(url: str, proxy_token: str = "", token: str = "") -> bytes
then HYPERLOOP_TOKEN, then ``token``.
token: Hyperloop auth token fallback.
"""
# Local file (a path or a file:// URL) — read directly, no HTTP. Lets a
# locally-generated side-car (igprof-demangle-symbols output) be attached
# via load_igprof(sidecar_url=/path/to/...syms.gz) without a web server.
if url.startswith("file://") or os.path.isfile(url):
path = url[len("file://"):] if url.startswith("file://") else url
with open(path, "rb") as f:
return f.read()

proxy_token = (
proxy_token
or os.environ.get("PROXY_TOKEN", "")
Expand Down
188 changes: 188 additions & 0 deletions Framework/Core/scripts/hyperloop-server/hyperloop_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,194 @@ async def fetch_one(wid: str) -> dict | None:
return "\n".join(lines)


async def _train_composition(train_id: int) -> tuple[str | None, list[dict]]:
"""(dataset_name, [wagon dicts]) for a train. Shared composition fetch."""
t = await _get("trains/train.jsp", {"train_id": train_id})
ds = t.get("dataset_name")
wagons_ts = t.get("wagons_timestamp") or t.get("dataset_timestamp")
if not wagons_ts:
return ds, []
wd = await _get("trains/wagons_derived_data.jsp",
{"train_id": train_id, "wagons_timestamp": wagons_ts})
wagon_ids = list(wd.keys()) if isinstance(wd, dict) else []

async def fetch_one(wid: str) -> dict | None:
try:
w = await _get("analysis/wagon/wagon.jsp",
{"wagon_id": int(wid), "referenceTime": 0})
if isinstance(w, dict) and w.get("id") is not None:
return w
except Exception:
pass
return None

wagons = [w for w in await asyncio.gather(*(fetch_one(w) for w in wagon_ids)) if w]
return ds, wagons


def _summarize_sig(sig) -> str:
"""Human-readable 'Nx workflow [analysis_id]' summary of a composition signature."""
if not sig or not sig[1]:
return "(no wagons / unresolved)"
c = collections.Counter(f"{wf} [{aid}]" for wf, aid in sig[1])
return ", ".join(f"{n}x {k}" for k, n in sorted(c.items()))


async def _match_compositions(train_ids: list[int]):
"""Group trains by (dataset, multiset of (workflow, analysis_id)).

Returns (groups, ref_sig, matched_ids, failed_ids) where groups maps each
signature to its train ids, ref_sig is the largest group's signature (None
if nothing resolved), and matched_ids are the trains sharing it. Shared by
validate_train_composition and grid_job_bands so both apply the same guard.
"""
async def one(tid: int):
try:
ds, wagons = await _train_composition(tid)
sig = (ds, tuple(sorted((w.get("work_flow_name") or "?",
w.get("analysis_id")) for w in wagons)))
return tid, sig
except Exception:
return tid, None

res = await asyncio.gather(*(one(t) for t in train_ids))
groups: dict = collections.defaultdict(list)
failed = []
for tid, sig in res:
(failed.append(tid) if sig is None else groups[sig].append(tid))
if not groups:
return groups, None, [], failed
ref = max(groups, key=lambda s: len(groups[s]))
return groups, ref, sorted(groups[ref]), failed


@mcp.tool()
async def validate_train_composition(train_ids: list[int]) -> str:
"""Check whether a set of trains share the same dataset + wagon composition.

For each train builds a signature = its dataset plus the multiset of
(workflow, analysis_id) over its wagons, then groups the trains. Run this
before comparing trains over time (throughput / CPU trends, distribution
heatmaps) so confounders — a different analysis, an extra or missing wagon,
a different dataset — are dropped rather than silently skewing the result.

Returns the reference composition (the largest matching group), the matched
train list (feed it straight into the comparison), and each outlier with how
it differs.
"""
groups, ref, matched, failed = await _match_compositions(train_ids)
if ref is None:
return "Could not resolve composition for: " + ", ".join(map(str, failed))
ref_ds = ref[0]

out = [f"Composition check for {len(train_ids)} trains:\n",
f"Reference ({len(matched)}/{len(train_ids)} match): dataset={ref_ds}",
f" {_summarize_sig(ref)}",
f" matched: {', '.join(map(str, matched))}\n"]

outliers = sorted([(s, ts) for s, ts in groups.items() if s != ref],
key=lambda x: sorted(x[1])[0])
if outliers or failed:
out.append("Outliers (exclude from the comparison):")
for s, ts in outliers:
tag = f"dataset={s[0]}; " if s[0] != ref_ds else ""
out.append(f" {', '.join(map(str, sorted(ts)))}: {tag}{_summarize_sig(s)}")
for tid in failed:
out.append(f" {tid}: composition could not be resolved")
out.append("")
else:
out.append("All trains share the same composition. ✓\n")

out.append(f"matched_train_ids = {matched}")
return "\n".join(out)


def _percentiles(vals: list[float], ps=(0, 5, 10, 25, 50, 75, 90, 95, 100)) -> dict:
"""Nearest-rank percentiles of a value list (no numpy in the server env)."""
s = sorted(vals)
n = len(s)
out = {}
for p in ps:
if n == 1:
out[p] = s[0]
continue
k = (n - 1) * (p / 100.0)
lo, hi = int(k), min(int(k) + 1, n - 1)
out[p] = s[lo] + (s[hi] - s[lo]) * (k - lo)
return out


@mcp.tool()
async def grid_job_bands(train_ids: list[int], check_composition: bool = True) -> str:
"""Per-JOB grid throughput distribution (percentile bands) across trains over time.

For each train, fetches its per-run grid results (train.jsp jobResults) and
builds percentile bands over the *individual jobs'* throughput_per_core — the
distribution behind the grid-statistics "jobs per CPU time" histogram — NOT
the single train-average throughput, which collapses that spread to one
number. Use this to watch a job-performance distribution shift over time
(e.g. an optimization landing) rather than chasing a noisy mean.

By default runs validate_train_composition first and keeps only the trains
that share the reference composition (set check_composition=False to skip the
guard and band every train as given). Returns a per-train percentile table
(p0/p10/p50/p90/p100 KB/s/core, job count) ordered by date, plus a fenced
```jsonl block (one {date,train,n,tpc:[...]} per train) ready to feed a
band/fan-chart plotting script.
"""
if check_composition and len(train_ids) > 1:
groups, ref, matched, failed = await _match_compositions(train_ids)
if ref is None:
return "Could not resolve composition for any train: " + \
", ".join(map(str, train_ids))
dropped = [t for t in train_ids if t not in matched]
keep = matched
else:
keep, dropped = list(train_ids), []

async def fetch(tid: int):
try:
t = await _get("trains/train.jsp", {"train_id": tid})
t = t[0] if isinstance(t, list) else t
jr = t.get("jobResults") or []
tpc = [j["throughput_per_core"] for j in jr
if (j.get("throughput_per_core") or 0) > 0]
created = t.get("created")
date = (datetime.datetime.fromtimestamp(
created / 1000, datetime.timezone.utc).strftime("%Y-%m-%d")
if created else "?")
return tid, date, tpc
except Exception as e:
return tid, None, str(e)

rows = await asyncio.gather(*(fetch(t) for t in keep))
good = [(tid, d, tpc) for tid, d, tpc in rows if d is not None and tpc]
good.sort(key=lambda r: (r[1], r[0]))
if not good:
return "No usable per-job throughput for: " + ", ".join(map(str, keep))

out = ["Per-job grid throughput bands (KB/s/core), over individual jobs "
"(not train average):\n"]
if dropped:
out.append(f"Dropped (composition mismatch): {', '.join(map(str, dropped))}\n")
out.append(f"{'date':<11}{'train':>8}{'jobs':>6}"
f"{'p0':>8}{'p10':>8}{'p50':>8}{'p90':>8}{'p100':>8}")
out.append("-" * 65)
jsonl = []
for tid, date, tpc in good:
pc = _percentiles(tpc)
k = {p: pc[p] / 1e3 for p in pc} # KB/s/core
out.append(f"{date:<11}{tid:>8}{len(tpc):>6}"
f"{k[0]:>8.0f}{k[10]:>8.0f}{k[50]:>8.0f}{k[90]:>8.0f}{k[100]:>8.0f}")
jsonl.append(json.dumps({"date": date, "train": tid,
"n": len(tpc), "tpc": tpc}))
out.append("\nData (write to a .jsonl and feed the band plot):")
out.append("```jsonl")
out.extend(jsonl)
out.append("```")
return "\n".join(out)


# ---------------------------------------------------------------------------
# Analysis / wagon browsing
#
Expand Down