diff --git a/Framework/Core/scripts/hyperloop-perf-server/hl_common.py b/Framework/Core/scripts/hyperloop-perf-server/hl_common.py index 37014bd57191f..4e64d21363806 100644 --- a/Framework/Core/scripts/hyperloop-perf-server/hl_common.py +++ b/Framework/Core/scripts/hyperloop-perf-server/hl_common.py @@ -32,6 +32,14 @@ async def fetch_bytes(url: str, proxy_token: str = "", token: str = "") -> bytes then HYPERLOOP_TOKEN, then ``token``. token: Hyperloop auth token fallback. """ + # Local file (a path or a file:// URL) — read directly, no HTTP. Lets a + # locally-generated side-car (igprof-demangle-symbols output) be attached + # via load_igprof(sidecar_url=/path/to/...syms.gz) without a web server. + if url.startswith("file://") or os.path.isfile(url): + path = url[len("file://"):] if url.startswith("file://") else url + with open(path, "rb") as f: + return f.read() + proxy_token = ( proxy_token or os.environ.get("PROXY_TOKEN", "") diff --git a/Framework/Core/scripts/hyperloop-server/hyperloop_server.py b/Framework/Core/scripts/hyperloop-server/hyperloop_server.py index 4ad66e3d58b13..071a594ec6025 100644 --- a/Framework/Core/scripts/hyperloop-server/hyperloop_server.py +++ b/Framework/Core/scripts/hyperloop-server/hyperloop_server.py @@ -399,6 +399,194 @@ async def fetch_one(wid: str) -> dict | None: return "\n".join(lines) +async def _train_composition(train_id: int) -> tuple[str | None, list[dict]]: + """(dataset_name, [wagon dicts]) for a train. Shared composition fetch.""" + t = await _get("trains/train.jsp", {"train_id": train_id}) + ds = t.get("dataset_name") + wagons_ts = t.get("wagons_timestamp") or t.get("dataset_timestamp") + if not wagons_ts: + return ds, [] + wd = await _get("trains/wagons_derived_data.jsp", + {"train_id": train_id, "wagons_timestamp": wagons_ts}) + wagon_ids = list(wd.keys()) if isinstance(wd, dict) else [] + + async def fetch_one(wid: str) -> dict | None: + try: + w = await _get("analysis/wagon/wagon.jsp", + {"wagon_id": int(wid), "referenceTime": 0}) + if isinstance(w, dict) and w.get("id") is not None: + return w + except Exception: + pass + return None + + wagons = [w for w in await asyncio.gather(*(fetch_one(w) for w in wagon_ids)) if w] + return ds, wagons + + +def _summarize_sig(sig) -> str: + """Human-readable 'Nx workflow [analysis_id]' summary of a composition signature.""" + if not sig or not sig[1]: + return "(no wagons / unresolved)" + c = collections.Counter(f"{wf} [{aid}]" for wf, aid in sig[1]) + return ", ".join(f"{n}x {k}" for k, n in sorted(c.items())) + + +async def _match_compositions(train_ids: list[int]): + """Group trains by (dataset, multiset of (workflow, analysis_id)). + + Returns (groups, ref_sig, matched_ids, failed_ids) where groups maps each + signature to its train ids, ref_sig is the largest group's signature (None + if nothing resolved), and matched_ids are the trains sharing it. Shared by + validate_train_composition and grid_job_bands so both apply the same guard. + """ + async def one(tid: int): + try: + ds, wagons = await _train_composition(tid) + sig = (ds, tuple(sorted((w.get("work_flow_name") or "?", + w.get("analysis_id")) for w in wagons))) + return tid, sig + except Exception: + return tid, None + + res = await asyncio.gather(*(one(t) for t in train_ids)) + groups: dict = collections.defaultdict(list) + failed = [] + for tid, sig in res: + (failed.append(tid) if sig is None else groups[sig].append(tid)) + if not groups: + return groups, None, [], failed + ref = max(groups, key=lambda s: len(groups[s])) + return groups, ref, sorted(groups[ref]), failed + + +@mcp.tool() +async def validate_train_composition(train_ids: list[int]) -> str: + """Check whether a set of trains share the same dataset + wagon composition. + + For each train builds a signature = its dataset plus the multiset of + (workflow, analysis_id) over its wagons, then groups the trains. Run this + before comparing trains over time (throughput / CPU trends, distribution + heatmaps) so confounders — a different analysis, an extra or missing wagon, + a different dataset — are dropped rather than silently skewing the result. + + Returns the reference composition (the largest matching group), the matched + train list (feed it straight into the comparison), and each outlier with how + it differs. + """ + groups, ref, matched, failed = await _match_compositions(train_ids) + if ref is None: + return "Could not resolve composition for: " + ", ".join(map(str, failed)) + ref_ds = ref[0] + + out = [f"Composition check for {len(train_ids)} trains:\n", + f"Reference ({len(matched)}/{len(train_ids)} match): dataset={ref_ds}", + f" {_summarize_sig(ref)}", + f" matched: {', '.join(map(str, matched))}\n"] + + outliers = sorted([(s, ts) for s, ts in groups.items() if s != ref], + key=lambda x: sorted(x[1])[0]) + if outliers or failed: + out.append("Outliers (exclude from the comparison):") + for s, ts in outliers: + tag = f"dataset={s[0]}; " if s[0] != ref_ds else "" + out.append(f" {', '.join(map(str, sorted(ts)))}: {tag}{_summarize_sig(s)}") + for tid in failed: + out.append(f" {tid}: composition could not be resolved") + out.append("") + else: + out.append("All trains share the same composition. ✓\n") + + out.append(f"matched_train_ids = {matched}") + return "\n".join(out) + + +def _percentiles(vals: list[float], ps=(0, 5, 10, 25, 50, 75, 90, 95, 100)) -> dict: + """Nearest-rank percentiles of a value list (no numpy in the server env).""" + s = sorted(vals) + n = len(s) + out = {} + for p in ps: + if n == 1: + out[p] = s[0] + continue + k = (n - 1) * (p / 100.0) + lo, hi = int(k), min(int(k) + 1, n - 1) + out[p] = s[lo] + (s[hi] - s[lo]) * (k - lo) + return out + + +@mcp.tool() +async def grid_job_bands(train_ids: list[int], check_composition: bool = True) -> str: + """Per-JOB grid throughput distribution (percentile bands) across trains over time. + + For each train, fetches its per-run grid results (train.jsp jobResults) and + builds percentile bands over the *individual jobs'* throughput_per_core — the + distribution behind the grid-statistics "jobs per CPU time" histogram — NOT + the single train-average throughput, which collapses that spread to one + number. Use this to watch a job-performance distribution shift over time + (e.g. an optimization landing) rather than chasing a noisy mean. + + By default runs validate_train_composition first and keeps only the trains + that share the reference composition (set check_composition=False to skip the + guard and band every train as given). Returns a per-train percentile table + (p0/p10/p50/p90/p100 KB/s/core, job count) ordered by date, plus a fenced + ```jsonl block (one {date,train,n,tpc:[...]} per train) ready to feed a + band/fan-chart plotting script. + """ + if check_composition and len(train_ids) > 1: + groups, ref, matched, failed = await _match_compositions(train_ids) + if ref is None: + return "Could not resolve composition for any train: " + \ + ", ".join(map(str, train_ids)) + dropped = [t for t in train_ids if t not in matched] + keep = matched + else: + keep, dropped = list(train_ids), [] + + async def fetch(tid: int): + try: + t = await _get("trains/train.jsp", {"train_id": tid}) + t = t[0] if isinstance(t, list) else t + jr = t.get("jobResults") or [] + tpc = [j["throughput_per_core"] for j in jr + if (j.get("throughput_per_core") or 0) > 0] + created = t.get("created") + date = (datetime.datetime.fromtimestamp( + created / 1000, datetime.timezone.utc).strftime("%Y-%m-%d") + if created else "?") + return tid, date, tpc + except Exception as e: + return tid, None, str(e) + + rows = await asyncio.gather(*(fetch(t) for t in keep)) + good = [(tid, d, tpc) for tid, d, tpc in rows if d is not None and tpc] + good.sort(key=lambda r: (r[1], r[0])) + if not good: + return "No usable per-job throughput for: " + ", ".join(map(str, keep)) + + out = ["Per-job grid throughput bands (KB/s/core), over individual jobs " + "(not train average):\n"] + if dropped: + out.append(f"Dropped (composition mismatch): {', '.join(map(str, dropped))}\n") + out.append(f"{'date':<11}{'train':>8}{'jobs':>6}" + f"{'p0':>8}{'p10':>8}{'p50':>8}{'p90':>8}{'p100':>8}") + out.append("-" * 65) + jsonl = [] + for tid, date, tpc in good: + pc = _percentiles(tpc) + k = {p: pc[p] / 1e3 for p in pc} # KB/s/core + out.append(f"{date:<11}{tid:>8}{len(tpc):>6}" + f"{k[0]:>8.0f}{k[10]:>8.0f}{k[50]:>8.0f}{k[90]:>8.0f}{k[100]:>8.0f}") + jsonl.append(json.dumps({"date": date, "train": tid, + "n": len(tpc), "tpc": tpc})) + out.append("\nData (write to a .jsonl and feed the band plot):") + out.append("```jsonl") + out.extend(jsonl) + out.append("```") + return "\n".join(out) + + # --------------------------------------------------------------------------- # Analysis / wagon browsing #