Skip to content

Commit 25f4dfe

Browse files
committed
Add IgProf support to the MCP
1 parent 79fec19 commit 25f4dfe

4 files changed

Lines changed: 409 additions & 26 deletions

File tree

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright 2019-2026 CERN and copyright holders of ALICE O2.
2+
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
# All rights not expressly granted are reserved.
4+
#
5+
# This software is distributed under the terms of the GNU General Public
6+
# License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
#
8+
# In applying this license CERN does not waive the privileges and immunities
9+
# granted to it by virtue of its status as an Intergovernmental Organization
10+
# or submit itself to any jurisdiction.
11+
"""Shared helpers for the Hyperloop perf / igprof MCP tools."""
12+
13+
from __future__ import annotations
14+
15+
import os
16+
17+
import httpx
18+
19+
20+
async def fetch_bytes(url: str, proxy_token: str = "", token: str = "") -> bytes:
21+
"""Fetch a workdir artefact, routing alimonitor URLs through the local proxy.
22+
23+
Mirrors the grid-cert proxy convention used across the Hyperloop tooling:
24+
``alimonitor.cern.ch/<path>`` is rewritten to
25+
``http://localhost:8888/alimonitor/<path>`` with a bearer token, and
26+
``Accept-Encoding: identity`` is required (otherwise the proxy returns a gzip
27+
Content-Length mismatch). Retries transient protocol/read errors up to 3×.
28+
29+
Args:
30+
url: Direct artefact URL (perf script, igprof dump, side-car, ...).
31+
proxy_token: Bearer token for the local proxy. Falls back to PROXY_TOKEN,
32+
then HYPERLOOP_TOKEN, then ``token``.
33+
token: Hyperloop auth token fallback.
34+
"""
35+
proxy_token = (
36+
proxy_token
37+
or os.environ.get("PROXY_TOKEN", "")
38+
or token
39+
or os.environ.get("HYPERLOOP_TOKEN", "")
40+
)
41+
42+
fetch_url = url
43+
if "alimonitor.cern.ch" in url:
44+
path = url.split("alimonitor.cern.ch", 1)[1].lstrip("/")
45+
fetch_url = f"http://localhost:8888/alimonitor/{path}"
46+
47+
headers = {"Authorization": f"Bearer {proxy_token}"} if proxy_token else {}
48+
headers["Accept-Encoding"] = "identity"
49+
50+
async with httpx.AsyncClient(verify=False) as client:
51+
for attempt in range(3):
52+
try:
53+
r = await client.get(
54+
fetch_url, headers=headers, timeout=300.0, follow_redirects=True
55+
)
56+
r.raise_for_status()
57+
return r.content
58+
except (httpx.RemoteProtocolError, httpx.ReadError):
59+
if attempt == 2:
60+
raise
61+
raise RuntimeError("unreachable")
Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
# Copyright 2019-2026 CERN and copyright holders of ALICE O2.
2+
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
# All rights not expressly granted are reserved.
4+
#
5+
# This software is distributed under the terms of the GNU General Public
6+
# License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
#
8+
# In applying this license CERN does not waive the privileges and immunities
9+
# granted to it by virtue of its status as an Intergovernmental Organization
10+
# or submit itself to any jurisdiction.
11+
"""IgProf memory-profile tools for the Hyperloop perf MCP server.
12+
13+
IgProf heap dumps are huge pre-order call trees. Rather than parse them in
14+
Python, these tools delegate every query to the ``igprof-query`` C tool (a fast
15+
streaming reader): the dump is fetched + decompressed once and cached on disk,
16+
then ``igprof-query`` is run per query (~100 ms even on a 600k-node dump), so
17+
only the answer's symbols are ever demangled.
18+
19+
Counters in a MEM dump and how they aggregate:
20+
MEM_TOTAL total bytes allocated over the run (summed)
21+
MEM_MAX largest single allocation (reduced by max)
22+
MEM_LIVE bytes still live at dump time = footprint (summed net-of-free)
23+
24+
The ``igprof-query`` binary is located via ``IGPROF_QUERY_BIN`` or ``PATH``.
25+
Build it (with readable names) from ~/src/IgProf:
26+
cmake -DIGPROF_VIEWER_ONLY=ON -DCMAKE_C_FLAGS=-DIGPROF_DEMANGLE … && make
27+
"""
28+
29+
from __future__ import annotations
30+
31+
import gzip
32+
import hashlib
33+
import os
34+
import re
35+
import shutil
36+
import subprocess
37+
from dataclasses import dataclass
38+
39+
from hl_common import fetch_bytes
40+
41+
# ---------------------------------------------------------------------------
42+
# Binary + cache
43+
# ---------------------------------------------------------------------------
44+
45+
_CACHE_DIR = os.path.expanduser(os.environ.get("IGPROF_MCP_CACHE", "~/.cache/igprof-mcp"))
46+
47+
_COUNTER_DOC = {
48+
"MEM_TOTAL": "total bytes allocated over the run (summed)",
49+
"MEM_MAX": "largest single allocation (reduced by max)",
50+
"MEM_LIVE": "bytes still live at dump time — footprint / leak (summed net-of-free)",
51+
}
52+
53+
54+
def _bin() -> str:
55+
b = os.environ.get("IGPROF_QUERY_BIN") or shutil.which("igprof-query")
56+
if not b:
57+
raise RuntimeError(
58+
"igprof-query not found. Set IGPROF_QUERY_BIN or put it on PATH. "
59+
"Build it from ~/src/IgProf: "
60+
"cmake -DIGPROF_VIEWER_ONLY=ON -DCMAKE_C_FLAGS=-DIGPROF_DEMANGLE . && make"
61+
)
62+
return b
63+
64+
65+
@dataclass
66+
class IgProfReport:
67+
url: str
68+
name: str
69+
dump_path: str
70+
sidecar_path: str
71+
counters: list[str]
72+
default_counter: str
73+
74+
75+
_reports: dict[str, IgProfReport] = {}
76+
77+
78+
def _get(name: str) -> IgProfReport:
79+
r = _reports.get(name)
80+
if r is None:
81+
avail = ", ".join(_reports) if _reports else "(none)"
82+
raise ValueError(f"No igprof report '{name}'. Loaded: {avail}. Use load_igprof first.")
83+
return r
84+
85+
86+
def _run(report: IgProfReport, args: list[str]) -> tuple[str, str]:
87+
cmd = [_bin(), *args]
88+
if report.sidecar_path:
89+
cmd += ["-S", report.sidecar_path]
90+
cmd += [report.dump_path]
91+
p = subprocess.run(cmd, capture_output=True, text=True, timeout=180)
92+
if p.returncode != 0:
93+
raise RuntimeError(f"igprof-query failed: {(p.stderr or p.stdout).strip()}")
94+
return p.stdout, p.stderr
95+
96+
97+
def _enumerate_counters(dump_path: str) -> list[str]:
98+
"""Counters are define-on-first-use (``V<id>=(NAME)``) in the first nodes."""
99+
seen: list[str] = []
100+
with open(dump_path, "r", errors="replace") as f:
101+
for _ in range(400):
102+
line = f.readline()
103+
if not line:
104+
break
105+
for m in re.finditer(r"V\d+=\(([A-Z_][A-Z0-9_]*)\)", line):
106+
if m.group(1) not in seen:
107+
seen.append(m.group(1))
108+
return seen
109+
110+
111+
_TOP_ROW = re.compile(r"^\s*(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(.+?)\s*$")
112+
113+
114+
def _parse_top(text: str) -> dict[str, tuple[int, int, int]]:
115+
"""symbol -> (cumulative, self, self_count) from `igprof-query top` output."""
116+
rows: dict[str, tuple[int, int, int]] = {}
117+
for line in text.splitlines():
118+
m = _TOP_ROW.match(line)
119+
if m:
120+
# groups: 1=rank 2=cumulative 3=self 4=self-count 5=symbol
121+
rows[m.group(5)] = (int(m.group(2)), int(m.group(3)), int(m.group(4)))
122+
return rows
123+
124+
125+
def _limit_show(text: str, n: int) -> str:
126+
"""Keep at most `n` edge rows under each `== callers/callees ==` section."""
127+
out: list[str] = []
128+
count = 0
129+
in_edges = False
130+
for line in text.splitlines():
131+
if line.startswith("=="):
132+
in_edges = line.startswith("== callers") or line.startswith("== callees")
133+
count = 0
134+
out.append(line)
135+
continue
136+
if in_edges and line.strip():
137+
count += 1
138+
if count <= n:
139+
out.append(line)
140+
elif count == n + 1:
141+
out.append(" … (more rows; raise n)")
142+
continue
143+
out.append(line)
144+
return "\n".join(out)
145+
146+
147+
# ---------------------------------------------------------------------------
148+
# Tools (registered on the shared FastMCP instance by register())
149+
# ---------------------------------------------------------------------------
150+
151+
152+
async def load_igprof(
153+
url: str,
154+
name: str = "",
155+
counter: str = "MEM_TOTAL",
156+
sidecar_url: str = "",
157+
proxy_token: str = "",
158+
) -> str:
159+
"""Fetch an IgProf heap dump and register it for querying.
160+
161+
The ``.gz`` dump is downloaded (via the alimonitor proxy for
162+
``alimonitor.cern.ch`` URLs), decompressed once, and cached on disk;
163+
subsequent tools re-read that file. No in-memory index.
164+
165+
Args:
166+
url: Direct URL to an ``igprof.<device>.<...>.gz`` dump.
167+
name: Label (defaults to the filename portion of the URL).
168+
counter: Default counter for this report (MEM_TOTAL/MEM_MAX/MEM_LIVE).
169+
sidecar_url: Optional ``igprof.*.syms.gz`` resolving ``@?0x…`` addresses.
170+
proxy_token: Bearer token for the local proxy (else PROXY_TOKEN env).
171+
"""
172+
raw = await fetch_bytes(url, proxy_token=proxy_token)
173+
os.makedirs(_CACHE_DIR, exist_ok=True)
174+
h = hashlib.sha1(url.encode()).hexdigest()[:12]
175+
dump_path = os.path.join(_CACHE_DIR, f"{h}.dump")
176+
data = gzip.decompress(raw) if (url.endswith(".gz") or raw[:2] == b"\x1f\x8b") else raw
177+
with open(dump_path, "wb") as f:
178+
f.write(data)
179+
180+
sidecar_path = ""
181+
if sidecar_url:
182+
sc = await fetch_bytes(sidecar_url, proxy_token=proxy_token)
183+
sidecar_path = os.path.join(_CACHE_DIR, f"{h}.syms.gz")
184+
with open(sidecar_path, "wb") as f:
185+
f.write(sc)
186+
187+
counters = _enumerate_counters(dump_path)
188+
if counters and counter not in counters:
189+
counter = counters[0]
190+
191+
pname = name or url.rstrip("/").split("/")[-1]
192+
report = IgProfReport(url, pname, dump_path, sidecar_path, counters, counter)
193+
_reports[pname] = report
194+
195+
nsym = ""
196+
try:
197+
_, err = _run(report, ["top", "-k", counter, "-n", "0"])
198+
m = re.search(r"symbols=(\d+)", err)
199+
if m:
200+
nsym = f", {int(m.group(1)):,} symbols"
201+
except Exception:
202+
pass
203+
204+
return (
205+
f"Loaded igprof '{pname}': {len(data):,} bytes uncompressed{nsym}. "
206+
f"counters={counters or '(none detected)'}, default={counter}"
207+
+ (", side-car attached" if sidecar_path else "")
208+
)
209+
210+
211+
def list_igprof() -> str:
212+
"""List loaded IgProf reports."""
213+
if not _reports:
214+
return "No igprof reports loaded. Use load_igprof first."
215+
return "\n".join(
216+
f"{n}: default={r.default_counter}, counters={r.counters}, url={r.url}"
217+
for n, r in _reports.items()
218+
)
219+
220+
221+
def drop_igprof(name: str) -> str:
222+
"""Free a report and delete its cached dump.
223+
224+
Args:
225+
name: Report name as returned by load_igprof.
226+
"""
227+
r = _get(name)
228+
for p in (r.dump_path, r.sidecar_path):
229+
if p and os.path.exists(p):
230+
os.remove(p)
231+
del _reports[name]
232+
return f"Dropped igprof report '{name}'."
233+
234+
235+
def igprof_counters(name: str) -> str:
236+
"""List the counters available in a report and what they mean.
237+
238+
Args:
239+
name: Report name as returned by load_igprof.
240+
"""
241+
r = _get(name)
242+
return "\n".join(
243+
f"{c}: {_COUNTER_DOC.get(c, 'profiler counter')}"
244+
+ (" (default)" if c == r.default_counter else "")
245+
for c in r.counters
246+
)
247+
248+
249+
def igprof_top(name: str, counter: str = "", n: int = 40) -> str:
250+
"""Top allocators by a counter (cumulative + self, already merged by name).
251+
252+
Args:
253+
name: Report name as returned by load_igprof.
254+
counter: MEM_TOTAL/MEM_MAX/MEM_LIVE (defaults to the report's default).
255+
n: Number of rows (default 40).
256+
"""
257+
r = _get(name)
258+
out, _ = _run(r, ["top", "-k", counter or r.default_counter, "-n", str(n)])
259+
return out
260+
261+
262+
def igprof_show(name: str, symbol: str, counter: str = "", n: int = 40) -> str:
263+
"""Callers and callees of a symbol (POSIX-extended regex), merged by name.
264+
265+
Args:
266+
name: Report name as returned by load_igprof.
267+
symbol: Regex matched against the (resolved) symbol name, e.g. ``^_Znwm$``.
268+
counter: MEM_TOTAL/MEM_MAX/MEM_LIVE (defaults to the report's default).
269+
n: Max caller/callee rows to show per side (default 40).
270+
"""
271+
r = _get(name)
272+
out, _ = _run(r, ["show", "-s", symbol, "-k", counter or r.default_counter])
273+
return _limit_show(out, n)
274+
275+
276+
def igprof_show_rank(name: str, rank: int, counter: str = "", n: int = 40) -> str:
277+
"""Drill into the RANK-th heaviest symbol (by `igprof_top`) — callers + callees.
278+
279+
Args:
280+
name: Report name as returned by load_igprof.
281+
rank: 1-based rank in the `igprof_top` ranking for `counter`.
282+
counter: MEM_TOTAL/MEM_MAX/MEM_LIVE (defaults to the report's default).
283+
n: Max caller/callee rows to show per side (default 40).
284+
"""
285+
r = _get(name)
286+
out, _ = _run(r, ["show", "-r", str(rank), "-k", counter or r.default_counter])
287+
return _limit_show(out, n)
288+
289+
290+
def igprof_compare(name_a: str, name_b: str, counter: str = "", n: int = 40) -> str:
291+
"""Diff two reports' allocators, normalised to each report's total `self`.
292+
293+
Positive Δ means the symbol takes a larger share of allocations in B than A.
294+
295+
Args:
296+
name_a: Baseline report name.
297+
name_b: Comparison report name.
298+
counter: Counter to compare (defaults to A's default).
299+
n: Number of rows (default 40).
300+
"""
301+
a, b = _get(name_a), _get(name_b)
302+
c = counter or a.default_counter
303+
ta, _ = _run(a, ["top", "-k", c, "-n", "100000"])
304+
tb, _ = _run(b, ["top", "-k", c, "-n", "100000"])
305+
ra, rb = _parse_top(ta), _parse_top(tb)
306+
sa = sum(v[1] for v in ra.values()) or 1
307+
sb = sum(v[1] for v in rb.values()) or 1
308+
diffs = []
309+
for sym in set(ra) | set(rb):
310+
fa = ra.get(sym, (0, 0, 0))[1] / sa
311+
fb = rb.get(sym, (0, 0, 0))[1] / sb
312+
diffs.append((fb - fa, sym, fa, fb))
313+
diffs.sort(key=lambda x: -abs(x[0]))
314+
lines = [
315+
f"Comparing '{name_a}' (A) vs '{name_b}' (B) counter={c}, self-share",
316+
f"{'Δ%':>8} {'A%':>7} {'B%':>7} symbol",
317+
]
318+
for d, sym, fa, fb in diffs[:n]:
319+
lines.append(f"{d*100:>+8.2f} {fa*100:>7.2f} {fb*100:>7.2f} {sym}")
320+
return "\n".join(lines)
321+
322+
323+
def register(mcp) -> None:
324+
"""Register the igprof tools on a shared FastMCP instance."""
325+
for fn in (
326+
load_igprof,
327+
list_igprof,
328+
drop_igprof,
329+
igprof_counters,
330+
igprof_top,
331+
igprof_show,
332+
igprof_show_rank,
333+
igprof_compare,
334+
):
335+
mcp.tool()(fn)

0 commit comments

Comments
 (0)