From 7a3f61eb30ecfcebd2f98b0ab6c1e8742fff9be7 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 7 May 2026 11:47:26 +0200 Subject: [PATCH 01/16] [caliper] cli: main: remove alternative flag names --- projects/caliper/cli/main.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/projects/caliper/cli/main.py b/projects/caliper/cli/main.py index 64ec2545..4ccd26b7 100644 --- a/projects/caliper/cli/main.py +++ b/projects/caliper/cli/main.py @@ -60,7 +60,7 @@ def _require_artifacts_dir(ctx: click.Context) -> Path: _exit_with_help( ctx, "This command requires the test artifact tree root: " - "`--artifacts-dir DIR` or `--base-dir DIR` " + "`--artifacts-dir DIR` " "(before or after the subcommand).", code=1, ) @@ -89,7 +89,6 @@ def _workspace_cli_options(cmd: Any) -> Any: opts = ( click.option( "--artifacts-dir", - "--base-dir", "artifacts_dir", type=click.Path(path_type=Path, exists=True), default=None, @@ -105,12 +104,11 @@ def _workspace_cli_options(cmd: Any) -> Any: help=_POSTPROCESS_CONFIG_HELP + " Overrides the global option when set here.", ), click.option( - "--plugin-module", "--plugin", "plugin_module_override", metavar="MODULE", default=None, - help="Plugin import path; same as global --plugin-module / --plugin.", + help="Plugin import path; same as global --plugin.", ), ) for opt in reversed(opts): @@ -141,7 +139,6 @@ def _plugin_tuple(ctx: click.Context) -> tuple[str, Any]: @click.group(context_settings={"help_option_names": ["-h", "--help"]}) @click.option( "--artifacts-dir", - "--base-dir", "artifacts_dir", type=click.Path(path_type=Path, exists=True), default=None, @@ -154,7 +151,6 @@ def _plugin_tuple(ctx: click.Context) -> tuple[str, Any]: help=_POSTPROCESS_CONFIG_HELP, ) @click.option( - "--plugin-module", "--plugin", "plugin_module", metavar="MODULE", @@ -521,7 +517,6 @@ def ai_eval_export( plugin=plugin, output=output, use_cache=True, - cache_path=None, ) except Exception as e: # noqa: BLE001 click.echo(f"ai-eval-export failed: {e}", err=True) From 7a0dbe957fae34b28e07bb74c86965236ef0cbb3 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Tue, 12 May 2026 20:16:40 +0200 Subject: [PATCH 02/16] [caliper] cli: main: add 'caliper artifacts import' command --- projects/caliper/cli/main.py | 76 +++++++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/projects/caliper/cli/main.py b/projects/caliper/cli/main.py index 4ccd26b7..c3d5c7f1 100644 --- a/projects/caliper/cli/main.py +++ b/projects/caliper/cli/main.py @@ -356,7 +356,7 @@ def kpi_analyze( @main.group("artifacts") @click.pass_context def artifacts_group(ctx: click.Context) -> None: - """File artifact export.""" + """File artifact export and import.""" @artifacts_group.command("export") @@ -524,6 +524,80 @@ def ai_eval_export( click.echo(f"Wrote {output}") +@artifacts_group.command("import") +@click.option("--from-mlflow", "mlflow_run_id", help="MLflow run ID to download artifacts from.") +@click.option( + "--output-dir", + type=click.Path(path_type=Path), + required=True, + help="Local directory to download artifacts to.", +) +@click.option( + "--mlflow-tracking-uri", + envvar="MLFLOW_TRACKING_URI", + help="MLflow tracking server URI (can be set via MLFLOW_TRACKING_URI).", +) +@click.option( + "--artifact-path", + default="", + help="Specific artifact path to download (default: download all artifacts).", +) +@click.pass_context +def import_command( + ctx: click.Context, + mlflow_run_id: str | None, + output_dir: Path, + mlflow_tracking_uri: str | None, + artifact_path: str, +) -> None: + """Download artifact files from MLflow.""" + if mlflow_run_id: + if not mlflow_tracking_uri: + click.echo( + "Error: MLflow tracking URI required. Set --mlflow-tracking-uri or MLFLOW_TRACKING_URI.", + err=True, + ) + sys.exit(1) + + try: + import mlflow + from mlflow.tracking import MlflowClient + + # Set tracking URI + mlflow.set_tracking_uri(mlflow_tracking_uri) + client = MlflowClient() + + # Download artifacts + output_dir.mkdir(parents=True, exist_ok=True) + + # Download artifacts to the output directory + downloaded_path = client.download_artifacts( + run_id=mlflow_run_id, path=artifact_path, dst_path=str(output_dir) + ) + + # Count downloaded files + if Path(downloaded_path).is_file(): + downloaded_files = [Path(downloaded_path)] + else: + downloaded_files = list(Path(downloaded_path).rglob("*")) + downloaded_files = [f for f in downloaded_files if f.is_file()] + + click.echo(f"Downloaded {len(downloaded_files)} files to {output_dir}") + if downloaded_files: + click.echo("Downloaded files:") + for file in downloaded_files[:10]: # Show first 10 + click.echo(f" {file.relative_to(output_dir)}") + if len(downloaded_files) > 10: + click.echo(f" ... and {len(downloaded_files) - 10} more") + + except Exception as e: # noqa: BLE001 + click.echo(f"artifacts import failed: {e}", err=True) + sys.exit(2) + else: + click.echo("Error: Specify source backend: --from-mlflow RUN_ID", err=True) + sys.exit(1) + + def run_cli() -> None: """Invoke CLI; on missing required options, print subcommand help.""" try: From d40b69a56b490497e210f57fe3954902d942bced Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Tue, 12 May 2026 20:17:21 +0200 Subject: [PATCH 03/16] [caliper] cli: main: better error handling --- projects/caliper/cli/main.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/projects/caliper/cli/main.py b/projects/caliper/cli/main.py index c3d5c7f1..b54a6100 100644 --- a/projects/caliper/cli/main.py +++ b/projects/caliper/cli/main.py @@ -606,14 +606,13 @@ def run_cli() -> None: rv = main.main(standalone_mode=False, prog_name="caliper") if isinstance(rv, int) and rv != 0: sys.exit(rv) - except click.MissingParameter as exc: - msg = exc.format_message() - sub = getattr(exc, "ctx", None) - click.echo(f"Error: {msg}\n", err=True) - if sub is not None: - click.echo(sub.get_help(), err=True) - ec = getattr(exc, "exit_code", 2) - sys.exit(2 if ec is None else int(ec)) + except click.ClickException as exc: + # Handle click exceptions including NoArgsIsHelpError and MissingParameter + if hasattr(exc, "ctx") and exc.ctx: + click.echo(exc.ctx.get_help(), err=True) + else: + exc.show(sys.stderr) + sys.exit(2) except SystemExit: raise From 9351e03438d5b3fe2b190dd33336faf2c72a672b Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Tue, 12 May 2026 20:34:00 +0200 Subject: [PATCH 04/16] [caliper] postprocess: guidellm: new post-processing helper --- projects/caliper/postprocess/__init__.py | 0 .../caliper/postprocess/guidellm/__init__.py | 0 .../postprocess/guidellm/parsing/__init__.py | 6 + .../postprocess/guidellm/parsing/kpis.py | 172 +++++++++ .../postprocess/guidellm/parsing/models.py | 103 ++++++ .../postprocess/guidellm/parsing/parsers.py | 292 +++++++++++++++ .../postprocess/guidellm/plotting/__init__.py | 0 .../caliper/postprocess/guidellm/plugin.py | 334 ++++++++++++++++++ 8 files changed, 907 insertions(+) create mode 100644 projects/caliper/postprocess/__init__.py create mode 100644 projects/caliper/postprocess/guidellm/__init__.py create mode 100644 projects/caliper/postprocess/guidellm/parsing/__init__.py create mode 100644 projects/caliper/postprocess/guidellm/parsing/kpis.py create mode 100644 projects/caliper/postprocess/guidellm/parsing/models.py create mode 100644 projects/caliper/postprocess/guidellm/parsing/parsers.py create mode 100644 projects/caliper/postprocess/guidellm/plotting/__init__.py create mode 100644 projects/caliper/postprocess/guidellm/plugin.py diff --git a/projects/caliper/postprocess/__init__.py b/projects/caliper/postprocess/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/projects/caliper/postprocess/guidellm/__init__.py b/projects/caliper/postprocess/guidellm/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/projects/caliper/postprocess/guidellm/parsing/__init__.py b/projects/caliper/postprocess/guidellm/parsing/__init__.py new file mode 100644 index 00000000..ef79517f --- /dev/null +++ b/projects/caliper/postprocess/guidellm/parsing/__init__.py @@ -0,0 +1,6 @@ +"""GuideLLM parsing components.""" + +from .kpis import GuideLLMKpiHandler +from .parsers import GuideLLMParser + +__all__ = ["GuideLLMParser", "GuideLLMKpiHandler"] diff --git a/projects/caliper/postprocess/guidellm/parsing/kpis.py b/projects/caliper/postprocess/guidellm/parsing/kpis.py new file mode 100644 index 00000000..1eb85e05 --- /dev/null +++ b/projects/caliper/postprocess/guidellm/parsing/kpis.py @@ -0,0 +1,172 @@ +"""KPI definitions and computation for GuideLLM Caliper plugin.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from typing import Any + +from projects.caliper.engine.model import UnifiedRunModel + + +class GuideLLMKpiHandler: + """Handles KPI catalog and computation for GuideLLM benchmarks.""" + + @staticmethod + def get_catalog() -> list[dict[str, Any]]: + """ + Return the KPI catalog for GuideLLM metrics. + + Returns: + List of KPI definitions + """ + return [ + # Throughput KPIs + { + "kpi_id": "guidellm_request_rate", + "name": "Request Rate", + "unit": "req/s", + "higher_is_better": True, + }, + { + "kpi_id": "guidellm_tokens_per_second", + "name": "Total Token Throughput", + "unit": "tokens/s", + "higher_is_better": True, + }, + { + "kpi_id": "guidellm_input_tokens_per_second", + "name": "Input Token Throughput", + "unit": "tokens/s", + "higher_is_better": True, + }, + { + "kpi_id": "guidellm_output_tokens_per_second", + "name": "Output Token Throughput", + "unit": "tokens/s", + "higher_is_better": True, + }, + # Latency KPIs + { + "kpi_id": "guidellm_ttft_median", + "name": "Time to First Token (Median)", + "unit": "s", + "higher_is_better": False, + }, + { + "kpi_id": "guidellm_ttft_p95", + "name": "Time to First Token (P95)", + "unit": "s", + "higher_is_better": False, + }, + { + "kpi_id": "guidellm_itl_median", + "name": "Inter Token Latency (Median)", + "unit": "s", + "higher_is_better": False, + }, + { + "kpi_id": "guidellm_tpot_median", + "name": "Time Per Output Token (Median)", + "unit": "s", + "higher_is_better": False, + }, + { + "kpi_id": "guidellm_request_latency_median", + "name": "End-to-End Request Latency (Median)", + "unit": "s", + "higher_is_better": False, + }, + { + "kpi_id": "guidellm_request_latency_p95", + "name": "End-to-End Request Latency (P95)", + "unit": "s", + "higher_is_better": False, + }, + # Token efficiency KPIs + { + "kpi_id": "guidellm_input_tokens_per_request", + "name": "Input Tokens per Request", + "unit": "tokens", + "higher_is_better": False, # Generally want efficiency + }, + { + "kpi_id": "guidellm_output_tokens_per_request", + "name": "Output Tokens per Request", + "unit": "tokens", + "higher_is_better": False, # Generally want conciseness + }, + ] + + @staticmethod + def compute_kpis(model: UnifiedRunModel) -> list[dict[str, Any]]: + """ + Compute KPI values from the unified model. + + Args: + model: Unified model containing parsed test results + + Returns: + List of KPI records + """ + ts = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + out: list[dict[str, Any]] = [] + + for r in model.unified_result_records: + # Skip records without GuideLLM data + if not r.run_identity.get("guidellm"): + continue + + # Skip if no benchmarks found + if r.metrics.get("no_benchmarks_found"): + continue + + base_labels = {**r.distinguishing_labels} + + # Define KPI mappings + kpi_mappings = [ + ("guidellm_request_rate", "request_rate", "req/s", True), + ("guidellm_tokens_per_second", "tokens_per_second", "tokens/s", True), + ("guidellm_input_tokens_per_second", "input_tokens_per_second", "tokens/s", True), + ("guidellm_output_tokens_per_second", "output_tokens_per_second", "tokens/s", True), + ("guidellm_ttft_median", "ttft_median", "s", False), + ("guidellm_ttft_p95", "ttft_p95", "s", False), + ("guidellm_itl_median", "itl_median", "s", False), + ("guidellm_tpot_median", "tpot_median", "s", False), + ("guidellm_request_latency_median", "request_latency_median", "s", False), + ("guidellm_request_latency_p95", "request_latency_p95", "s", False), + ("guidellm_input_tokens_per_request", "input_tokens_per_request", "tokens", False), + ( + "guidellm_output_tokens_per_request", + "output_tokens_per_request", + "tokens", + False, + ), + ] + + # Extract and create KPI records + for kpi_id, metric_key, unit, higher_is_better in kpi_mappings: + raw_value = r.metrics.get(metric_key, 0) + + # Convert to float + try: + value = float(raw_value) + except (TypeError, ValueError): + value = 0.0 + + out.append( + { + "schema_version": "1", + "kpi_id": kpi_id, + "value": value, + "unit": unit, + "run_id": r.test_base_path, + "timestamp": ts, + "labels": {**base_labels, "higher_is_better": higher_is_better}, + "source": { + "test_base_path": r.test_base_path, + "plugin_module": model.plugin_module, + }, + } + ) + + return out diff --git a/projects/caliper/postprocess/guidellm/parsing/models.py b/projects/caliper/postprocess/guidellm/parsing/models.py new file mode 100644 index 00000000..d2c100e1 --- /dev/null +++ b/projects/caliper/postprocess/guidellm/parsing/models.py @@ -0,0 +1,103 @@ +"""Data models for GuideLLM benchmark results.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +@dataclass +class GuideLLMBenchmark: + """Single GuideLLM benchmark result with extracted metrics.""" + + strategy: str + duration: float + warmup_time: float = 0.0 + cooldown_time: float = 0.0 + + # Request metrics + request_rate: float = 0.0 + request_concurrency: float = 0.0 + completed_requests: int = 0 + failed_requests: int = 0 + + # Token metrics (per request) + input_tokens_per_request: float = 0.0 + output_tokens_per_request: float = 0.0 + total_tokens_per_request: float = 0.0 + + # Latency metrics (in seconds) + request_latency_median: float = 0.0 + request_latency_p95: float = 0.0 + ttft_median: float = 0.0 # Time to First Token + ttft_p95: float = 0.0 + itl_median: float = 0.0 # Inter Token Latency + itl_p95: float = 0.0 + tpot_median: float = 0.0 # Time Per Output Token + tpot_p95: float = 0.0 + + # Additional TTFT percentiles + ttft_p10: float = 0.0 + ttft_p25: float = 0.0 + ttft_p50: float = 0.0 + ttft_p75: float = 0.0 + ttft_p90: float = 0.0 + + # Additional ITL percentiles + itl_p10: float = 0.0 + itl_p25: float = 0.0 + itl_p50: float = 0.0 + itl_p75: float = 0.0 + itl_p90: float = 0.0 + + # Throughput metrics + tokens_per_second: float = 0.0 + input_tokens_per_second: float = 0.0 + output_tokens_per_second: float = 0.0 + + # Output token percentiles + output_tokens_per_second_p10: float = 0.0 + output_tokens_per_second_p25: float = 0.0 + output_tokens_per_second_p50: float = 0.0 + output_tokens_per_second_p75: float = 0.0 + output_tokens_per_second_p90: float = 0.0 + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for use in UnifiedResultRecord metrics.""" + return { + "strategy": self.strategy, + "duration": self.duration, + "request_rate": self.request_rate, + "request_concurrency": self.request_concurrency, + "completed_requests": self.completed_requests, + "failed_requests": self.failed_requests, + "input_tokens_per_request": self.input_tokens_per_request, + "output_tokens_per_request": self.output_tokens_per_request, + "total_tokens_per_request": self.total_tokens_per_request, + "request_latency_median": self.request_latency_median, + "request_latency_p95": self.request_latency_p95, + "ttft_median": self.ttft_median, + "ttft_p95": self.ttft_p95, + "itl_median": self.itl_median, + "itl_p95": self.itl_p95, + "tpot_median": self.tpot_median, + "tpot_p95": self.tpot_p95, + "tokens_per_second": self.tokens_per_second, + "input_tokens_per_second": self.input_tokens_per_second, + "output_tokens_per_second": self.output_tokens_per_second, + } + + +@dataclass +class GuideLLMConfiguration: + """GuideLLM configuration extracted from benchmark files.""" + + args: dict[str, Any] | None = None + metadata: dict[str, Any] | None = None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary.""" + return { + "args": self.args or {}, + "metadata": self.metadata or {}, + } diff --git a/projects/caliper/postprocess/guidellm/parsing/parsers.py b/projects/caliper/postprocess/guidellm/parsing/parsers.py new file mode 100644 index 00000000..77244b92 --- /dev/null +++ b/projects/caliper/postprocess/guidellm/parsing/parsers.py @@ -0,0 +1,292 @@ +"""GuideLLM benchmark parsers for Caliper plugin.""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any + +from projects.caliper.engine.model import ( + ParseResult, + TestBaseNode, + UnifiedResultRecord, +) + +from .models import GuideLLMBenchmark, GuideLLMConfiguration + + +def _labels_from_node(node: TestBaseNode) -> dict[str, Any]: + """Extract labels from a test node.""" + raw = node.labels + inner = raw.get("labels") + if isinstance(inner, dict): + return dict(inner) + if isinstance(raw, dict): + return dict(raw) + return {"facet": "default"} + + +class GuideLLMParser: + """Parser for GuideLLM benchmark JSON artifacts.""" + + def parse_benchmarks_json( + self, file_path: Path + ) -> tuple[list[GuideLLMBenchmark], GuideLLMConfiguration | None, list[str]]: + """ + Parse a GuideLLM benchmarks.json file. + + Returns: + Tuple of (benchmarks list, configuration, warnings list) + """ + warnings: list[str] = [] + benchmarks: list[GuideLLMBenchmark] = [] + configuration: GuideLLMConfiguration | None = None + + try: + json_data = json.loads(file_path.read_text(encoding="utf-8")) + if not isinstance(json_data, dict): + warnings.append(f"{file_path}: benchmarks.json must be a JSON object") + return [], None, warnings + + # Parse configuration from top-level fields + args = json_data.get("args") + metadata = json_data.get("metadata") + if args or metadata: + configuration = GuideLLMConfiguration(args=args, metadata=metadata) + + # Parse each benchmark in the JSON + for benchmark_data in json_data.get("benchmarks", []): + try: + benchmark = self._parse_single_benchmark(benchmark_data) + benchmarks.append(benchmark) + except Exception as e: + warnings.append(f"Failed to parse benchmark in {file_path}: {e}") + logging.warning(f"Failed to parse benchmark data: {e}") + continue + + logging.info(f"Parsed {len(benchmarks)} GuideLLM benchmarks from {file_path}") + return benchmarks, configuration, warnings + + except json.JSONDecodeError as e: + warnings.append(f"Malformed JSON {file_path}: {e}") + return [], None, warnings + except Exception as e: + warnings.append(f"Failed to parse GuideLLM JSON {file_path}: {e}") + return [], None, warnings + + def _parse_single_benchmark(self, benchmark_data: dict[str, Any]) -> GuideLLMBenchmark: + """Parse a single benchmark entry from the JSON data.""" + # Extract strategy and concurrency info with fallback logic + scheduler = benchmark_data.get("scheduler", {}) + config = benchmark_data.get("config", {}) + strategy_info = config.get("strategy", {}) + strategy = strategy_info.get("type_", "unknown") + + # Extract concurrency (streams) with multiple fallback paths + concurrency = self._extract_concurrency(strategy_info, scheduler) + + # Extract timing info + state = scheduler.get("state", {}) + start_time = state.get("start_time", 0) + end_time = state.get("end_time", 0) + duration = end_time - start_time if end_time > start_time else 60.0 + + # Extract metrics + metrics = benchmark_data.get("metrics", {}) + + # Helper function to safely extract metric values + def get_metric_value( + metric_name: str, stat_type: str = "median", default: float = 0.0 + ) -> float: + metric_data = metrics.get(metric_name, {}).get("successful", {}) + if stat_type in ["p95", "p90", "p75", "p50", "p25", "p10"]: + percentiles = metric_data.get("percentiles", {}) + return float(percentiles.get(stat_type, default)) + else: + return float(metric_data.get(stat_type, default)) + + # Extract latency metrics (convert ms to seconds for consistency) + request_latency_median = get_metric_value("request_latency", "median") / 1000.0 + request_latency_p95 = get_metric_value("request_latency", "p95") / 1000.0 + + # Extract TTFT percentiles + ttft_median = get_metric_value("time_to_first_token_ms", "median") / 1000.0 + ttft_p10 = get_metric_value("time_to_first_token_ms", "p10") / 1000.0 + ttft_p25 = get_metric_value("time_to_first_token_ms", "p25") / 1000.0 + ttft_p50 = get_metric_value("time_to_first_token_ms", "p50") / 1000.0 + ttft_p75 = get_metric_value("time_to_first_token_ms", "p75") / 1000.0 + ttft_p90 = get_metric_value("time_to_first_token_ms", "p90") / 1000.0 + ttft_p95 = get_metric_value("time_to_first_token_ms", "p95") / 1000.0 + + # Extract ITL percentiles + itl_median = get_metric_value("inter_token_latency_ms", "median") / 1000.0 + itl_p10 = get_metric_value("inter_token_latency_ms", "p10") / 1000.0 + itl_p25 = get_metric_value("inter_token_latency_ms", "p25") / 1000.0 + itl_p50 = get_metric_value("inter_token_latency_ms", "p50") / 1000.0 + itl_p75 = get_metric_value("inter_token_latency_ms", "p75") / 1000.0 + itl_p90 = get_metric_value("inter_token_latency_ms", "p90") / 1000.0 + itl_p95 = get_metric_value("inter_token_latency_ms", "p95") / 1000.0 + + # Extract TPOT percentiles + tpot_median = get_metric_value("time_per_output_token_ms", "median") / 1000.0 + tpot_p95 = get_metric_value("time_per_output_token_ms", "p95") / 1000.0 + + # Extract throughput metrics + request_rate = get_metric_value("requests_per_second", "mean") + input_tokens_per_second = get_metric_value("input_tokens_per_second", "mean") + output_tokens_per_second = get_metric_value("output_tokens_per_second", "mean") + total_tokens_per_second = input_tokens_per_second + output_tokens_per_second + + # Extract output token percentiles + output_tokens_per_second_p10 = get_metric_value("output_tokens_per_second", "p10") + output_tokens_per_second_p25 = get_metric_value("output_tokens_per_second", "p25") + output_tokens_per_second_p50 = get_metric_value("output_tokens_per_second", "p50") + output_tokens_per_second_p75 = get_metric_value("output_tokens_per_second", "p75") + output_tokens_per_second_p90 = get_metric_value("output_tokens_per_second", "p90") + + # Calculate requests completed and tokens per request + completed_requests = int(request_rate * duration) if request_rate > 0 else 0 + input_tokens_per_request = ( + (input_tokens_per_second / request_rate) if request_rate > 0 else 0.0 + ) + output_tokens_per_request = ( + (output_tokens_per_second / request_rate) if request_rate > 0 else 0.0 + ) + total_tokens_per_request = ( + (total_tokens_per_second / request_rate) if request_rate > 0 else 0.0 + ) + + # Create GuideLLMBenchmark object + return GuideLLMBenchmark( + strategy=strategy, + duration=duration, + warmup_time=0.0, # Not available in JSON format + cooldown_time=0.0, # Not available in JSON format + # Request metrics + request_rate=request_rate, + request_concurrency=concurrency, + completed_requests=completed_requests, + failed_requests=0, # Could extract from unsuccessful metrics if needed + # Token metrics per request + input_tokens_per_request=input_tokens_per_request, + output_tokens_per_request=output_tokens_per_request, + total_tokens_per_request=total_tokens_per_request, + # Latency metrics (already in seconds) + request_latency_median=request_latency_median, + request_latency_p95=request_latency_p95, + ttft_median=ttft_median, + ttft_p10=ttft_p10, + ttft_p25=ttft_p25, + ttft_p50=ttft_p50, + ttft_p75=ttft_p75, + ttft_p90=ttft_p90, + ttft_p95=ttft_p95, + itl_median=itl_median, + itl_p10=itl_p10, + itl_p25=itl_p25, + itl_p50=itl_p50, + itl_p75=itl_p75, + itl_p90=itl_p90, + itl_p95=itl_p95, + tpot_median=tpot_median, + tpot_p95=tpot_p95, + # Throughput metrics + tokens_per_second=total_tokens_per_second, + input_tokens_per_second=input_tokens_per_second, + output_tokens_per_second=output_tokens_per_second, + # Output token percentiles + output_tokens_per_second_p10=output_tokens_per_second_p10, + output_tokens_per_second_p25=output_tokens_per_second_p25, + output_tokens_per_second_p50=output_tokens_per_second_p50, + output_tokens_per_second_p75=output_tokens_per_second_p75, + output_tokens_per_second_p90=output_tokens_per_second_p90, + ) + + def _extract_concurrency( + self, strategy_info: dict[str, Any], scheduler: dict[str, Any] + ) -> float: + """Extract concurrency (streams) from strategy or scheduler info.""" + # Try multiple paths for concurrency extraction + try: + # First try: config.strategy.streams + concurrency = float(strategy_info.get("streams", 0)) + if concurrency > 0: + return concurrency + except (ValueError, TypeError): + pass + + try: + # Second try: scheduler.strategy.streams + sched_strategy = scheduler.get("strategy", {}) + streams = sched_strategy.get("streams") + if streams and streams > 0: + return float(streams) + except (ValueError, TypeError): + pass + + logging.warning( + "Could not find concurrency 'streams' for benchmark. Using default value 1.0" + ) + return 1.0 + + def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult: + """ + Parse test nodes containing GuideLLM benchmarks.json files. + + Args: + base_dir: Base directory for the test run + nodes: List of test nodes to parse + + Returns: + ParseResult with unified records and warnings + """ + records: list[UnifiedResultRecord] = [] + warnings: list[str] = [] + + for node in nodes: + # Look for benchmarks.json files + benchmarks_files = [p for p in node.artifact_paths if p.name == "benchmarks.json"] + + if not benchmarks_files: + # No benchmarks.json found, create empty record + labels = _labels_from_node(node) + records.append( + UnifiedResultRecord( + test_base_path=str(node.directory.relative_to(base_dir.resolve())), + distinguishing_labels=labels, + metrics={"no_benchmarks_found": True}, + run_identity={"guidellm": True}, + parse_notes=["No benchmarks.json file found"], + ) + ) + continue + + for benchmarks_file in benchmarks_files: + benchmarks, config, file_warnings = self.parse_benchmarks_json(benchmarks_file) + warnings.extend(file_warnings) + + # Create a unified record for each benchmark + for benchmark in benchmarks: + labels = _labels_from_node(node) + # Add strategy to distinguishing labels for grouping + labels["strategy"] = benchmark.strategy + labels["concurrency"] = benchmark.request_concurrency + + # Convert benchmark to metrics dictionary + metrics = benchmark.to_dict() + if config: + metrics["configuration"] = config.to_dict() + + records.append( + UnifiedResultRecord( + test_base_path=str(node.directory.relative_to(base_dir.resolve())), + distinguishing_labels=labels, + metrics=metrics, + run_identity={"guidellm": True}, + parse_notes=[], + ) + ) + + logging.info(f"GuideLLM parser created {len(records)} unified result records") + return ParseResult(records=records, warnings=warnings) diff --git a/projects/caliper/postprocess/guidellm/plotting/__init__.py b/projects/caliper/postprocess/guidellm/plotting/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/projects/caliper/postprocess/guidellm/plugin.py b/projects/caliper/postprocess/guidellm/plugin.py new file mode 100644 index 00000000..56d3fedb --- /dev/null +++ b/projects/caliper/postprocess/guidellm/plugin.py @@ -0,0 +1,334 @@ +"""GuideLLM Caliper PostProcessingPlugin (`projects/caliper/postprocess/guidellm`).""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from projects.caliper.engine.model import ( + ParseResult, + PostProcessingPlugin, + TestBaseNode, + UnifiedRunModel, +) + +from .parsing import GuideLLMKpiHandler, GuideLLMParser + + +class GuideLLMPlugin(PostProcessingPlugin): + """ + Parses GuideLLM benchmark artifacts containing ``benchmarks.json`` files. + + Extracts comprehensive LLM inference performance metrics including: + * Request throughput and concurrency + * Token-level latencies (TTFT, ITL, TPOT) + * End-to-end request latency percentiles + * Token throughput metrics + * Request completion rates + + Visual reports (future implementation): + * ``benchmark_summary`` — tabular view of all benchmark strategies and metrics + * ``latency_breakdown`` — breakdown of latency components across percentiles + * ``throughput_comparison`` — comparison of token and request throughput + """ + + def __init__(self): + self.parser = GuideLLMParser() + self.kpi_handler = GuideLLMKpiHandler() + + def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult: + """Parse test nodes using the GuideLLM parser.""" + return self.parser.parse(base_dir, nodes) + + def visualize( + self, + model: UnifiedRunModel, + output_dir: Path, + report_ids: list[str] | None, + group_id: str | None, + visualize_config: dict[str, Any] | None, + ) -> list[str]: + """Generate visualization reports for GuideLLM benchmarks.""" + output_dir.mkdir(parents=True, exist_ok=True) + paths: list[str] = [] + wanted = frozenset(report_ids or ()) + + # Filter to only GuideLLM records with benchmarks + guidellm_records = [ + r + for r in model.unified_result_records + if r.run_identity.get("guidellm") and not r.metrics.get("no_benchmarks_found") + ] + + if not guidellm_records: + return paths + + # Generate throughput vs concurrency chart + if "benchmark_summary" in wanted: + path = self._generate_throughput_vs_concurrency_chart(guidellm_records, output_dir) + if path: + paths.append(path) + + # Generate latency breakdown chart + if "latency_breakdown" in wanted: + path = self._generate_latency_breakdown_chart(guidellm_records, output_dir) + if path: + paths.append(path) + + return paths + + def _generate_throughput_vs_concurrency_chart( + self, records: list, output_dir: Path + ) -> str | None: + """Generate mean throughput vs concurrency chart.""" + try: + import plotly.graph_objects as go + + # Extract data for plotting + data_by_strategy = {} + for record in records: + strategy = record.distinguishing_labels.get("strategy", "unknown") + concurrency = record.metrics.get( + "request_concurrency", record.distinguishing_labels.get("concurrency", 1) + ) + tokens_per_second = record.metrics.get("tokens_per_second", 0) + + if strategy not in data_by_strategy: + data_by_strategy[strategy] = {"concurrency": [], "tokens_per_second": []} + + data_by_strategy[strategy]["concurrency"].append(concurrency) + data_by_strategy[strategy]["tokens_per_second"].append(tokens_per_second) + + # Create figure + fig = go.Figure() + + # Add traces for each strategy + colors = ["blue", "red", "green", "purple", "orange", "brown", "pink", "gray"] + for i, (strategy, data) in enumerate(data_by_strategy.items()): + color = colors[i % len(colors)] + + fig.add_trace( + go.Scatter( + x=data["concurrency"], + y=data["tokens_per_second"], + mode="markers+lines", + name=strategy, + marker={"color": color, "size": 8}, + line={"color": color, "width": 2}, + ) + ) + + # Update layout + fig.update_layout( + title="Throughput vs Concurrency", + xaxis_title="Concurrency Level", + yaxis_title="Tokens per Second", + width=800, + height=500, + ) + + # Save the plot + output_file = output_dir / "throughput_vs_concurrency.html" + fig.write_html(output_file, include_plotlyjs="cdn") + + return str(output_file) + + except ImportError: + return None + except Exception as e: + import logging + + logging.warning(f"Failed to generate throughput chart: {e}") + return None + + def _generate_latency_breakdown_chart(self, records: list, output_dir: Path) -> str | None: + """Generate latency breakdown visualization.""" + try: + import plotly.graph_objects as go + from plotly.subplots import make_subplots + + # Extract latency data + data_by_strategy = {} + for record in records: + strategy = record.distinguishing_labels.get("strategy", "unknown") + concurrency = record.metrics.get( + "request_concurrency", record.distinguishing_labels.get("concurrency", 1) + ) + + # Extract latency metrics (convert to ms for better readability) + ttft_median = record.metrics.get("ttft_median", 0) * 1000 # Convert to ms + itl_median = record.metrics.get("itl_median", 0) * 1000 # Convert to ms + request_latency_p95 = ( + record.metrics.get("request_latency_p95", 0) * 1000 + ) # Convert to ms + + if strategy not in data_by_strategy: + data_by_strategy[strategy] = { + "concurrency": [], + "ttft_median": [], + "itl_median": [], + "request_latency_p95": [], + } + + data_by_strategy[strategy]["concurrency"].append(concurrency) + data_by_strategy[strategy]["ttft_median"].append(ttft_median) + data_by_strategy[strategy]["itl_median"].append(itl_median) + data_by_strategy[strategy]["request_latency_p95"].append(request_latency_p95) + + # Create subplots + fig = make_subplots( + rows=2, + cols=2, + subplot_titles=[ + "Time to First Token (TTFT)", + "Inter-Token Latency (ITL)", + "Request Latency P95", + "Latency Comparison", + ], + ) + + colors = ["blue", "red", "green", "purple", "orange", "brown", "pink", "gray"] + + for i, (strategy, data) in enumerate(data_by_strategy.items()): + color = colors[i % len(colors)] + + # TTFT plot + fig.add_trace( + go.Scatter( + x=data["concurrency"], + y=data["ttft_median"], + mode="markers+lines", + name=f"{strategy} TTFT", + marker={"color": color}, + showlegend=True, + ), + row=1, + col=1, + ) + + # ITL plot + fig.add_trace( + go.Scatter( + x=data["concurrency"], + y=data["itl_median"], + mode="markers+lines", + name=f"{strategy} ITL", + marker={"color": color}, + showlegend=False, + ), + row=1, + col=2, + ) + + # Request latency P95 plot + fig.add_trace( + go.Scatter( + x=data["concurrency"], + y=data["request_latency_p95"], + mode="markers+lines", + name=f"{strategy} Req P95", + marker={"color": color}, + showlegend=False, + ), + row=2, + col=1, + ) + + # Combined latency comparison + fig.add_trace( + go.Scatter( + x=data["concurrency"], + y=data["ttft_median"], + mode="lines", + name=f"{strategy} TTFT", + line={"color": color, "dash": "solid"}, + showlegend=False, + ), + row=2, + col=2, + ) + fig.add_trace( + go.Scatter( + x=data["concurrency"], + y=data["itl_median"], + mode="lines", + name=f"{strategy} ITL", + line={"color": color, "dash": "dash"}, + showlegend=False, + ), + row=2, + col=2, + ) + + # Update layout + fig.update_layout(title="GuideLLM Latency Analysis", height=800, showlegend=True) + + # Update axes labels + fig.update_xaxes(title_text="Concurrency", row=2, col=1) + fig.update_xaxes(title_text="Concurrency", row=2, col=2) + fig.update_yaxes(title_text="Latency (ms)", row=1, col=1) + fig.update_yaxes(title_text="Latency (ms)", row=1, col=2) + fig.update_yaxes(title_text="Latency (ms)", row=2, col=1) + fig.update_yaxes(title_text="Latency (ms)", row=2, col=2) + + # Save the plot + output_file = output_dir / "latency_breakdown.html" + fig.write_html(output_file, include_plotlyjs="cdn") + + return str(output_file) + + except ImportError: + return None + except Exception as e: + import logging + + logging.warning(f"Failed to generate latency breakdown chart: {e}") + return None + + def kpi_catalog(self) -> list[dict[str, Any]]: + """Return the GuideLLM KPI catalog.""" + return self.kpi_handler.get_catalog() + + def compute_kpis(self, model: UnifiedRunModel) -> list[dict[str, Any]]: + """Compute KPI values from the unified model.""" + return self.kpi_handler.compute_kpis(model) + + def build_ai_eval_payload(self, model: UnifiedRunModel) -> dict[str, Any]: + """Build AI evaluation payload from the unified model.""" + # Extract GuideLLM-specific metrics for AI evaluation + benchmarks = [] + for r in model.unified_result_records: + if r.run_identity.get("guidellm") and not r.metrics.get("no_benchmarks_found"): + strategy_info = { + "strategy": r.distinguishing_labels.get("strategy", "unknown"), + "concurrency": r.distinguishing_labels.get("concurrency", 1.0), + "request_rate": r.metrics.get("request_rate", 0.0), + "tokens_per_second": r.metrics.get("tokens_per_second", 0.0), + "ttft_median": r.metrics.get("ttft_median", 0.0), + "itl_median": r.metrics.get("itl_median", 0.0), + "request_latency_p95": r.metrics.get("request_latency_p95", 0.0), + } + benchmarks.append(strategy_info) + + return { + "schema_version": "1", + "run_id": model.base_directory, + "metrics": { + "record_count": len(model.unified_result_records), + "benchmark_count": len(benchmarks), + "strategies": [b["strategy"] for b in benchmarks], + "max_request_rate": max([b["request_rate"] for b in benchmarks], default=0.0), + "max_tokens_per_second": max( + [b["tokens_per_second"] for b in benchmarks], default=0.0 + ), + "min_ttft_median": min( + [b["ttft_median"] for b in benchmarks if b["ttft_median"] > 0], default=0.0 + ), + }, + "benchmarks": benchmarks, + } + + +def get_plugin() -> PostProcessingPlugin: + """Return the GuideLLM plugin instance.""" + return GuideLLMPlugin() From 9c65fe347d6b32e869478344edd966a3573d8176 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Tue, 12 May 2026 21:55:23 +0200 Subject: [PATCH 05/16] [caliper] engine: file_export: runner: don't indent everything under 'if verbose' ... --- projects/caliper/engine/file_export/runner.py | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/projects/caliper/engine/file_export/runner.py b/projects/caliper/engine/file_export/runner.py index 539ba6b2..7ffb4f72 100644 --- a/projects/caliper/engine/file_export/runner.py +++ b/projects/caliper/engine/file_export/runner.py @@ -85,29 +85,29 @@ def run_file_export( try: if verbose: print(f"caliper: starting backend {b!r} …", file=sys.stderr) - detail, ml_meta = mlflow_backend.log_artifacts( - artifact_root=source, - paths=paths, - tracking_uri=mlflow_tracking_uri, - experiment=mlflow_experiment, - run_id=mlflow_run_id, - run_name=mlflow_run_name, - insecure_tls=mlflow_insecure_tls, - connection=mlflow_connection, - verbose=verbose, - upload_workers=upload_workers, - run_metadata=mlflow_run_metadata, - ) - results.append( - FileExportBackendResult( - backend="mlflow", - status="success", - detail=detail, - metadata=ml_meta, - ) + detail, ml_meta = mlflow_backend.log_artifacts( + artifact_root=source, + paths=paths, + tracking_uri=mlflow_tracking_uri, + experiment=mlflow_experiment, + run_id=mlflow_run_id, + run_name=mlflow_run_name, + insecure_tls=mlflow_insecure_tls, + connection=mlflow_connection, + verbose=verbose, + upload_workers=upload_workers, + run_metadata=mlflow_run_metadata, + ) + results.append( + FileExportBackendResult( + backend="mlflow", + status="success", + detail=detail, + metadata=ml_meta, ) - if verbose: - print(f"caliper: backend {b!r} finished ({detail})", file=sys.stderr) + ) + if verbose: + print(f"caliper: backend {b!r} finished ({detail})", file=sys.stderr) except Exception as e: # noqa: BLE001 # Full chain on stderr; stdout line stays a short message for automation. traceback.print_exception(e, file=sys.stderr) From 376bcbbc48e39589a56a5a270659d068ef52ab58 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Tue, 12 May 2026 11:00:38 +0200 Subject: [PATCH 06/16] [core] Show a helpful message on 'command not found' --- projects/core/library/ci.py | 18 ++++++++++++++++++ projects/foreign_testing/orchestration/ci.py | 2 +- projects/fournos_launcher/orchestration/ci.py | 2 +- projects/jump_ci/orchestration/ci.py | 4 +++- projects/llm_d/orchestration/ci.py | 2 +- projects/llm_d_legacy/orchestration/ci.py | 2 +- projects/skeleton/orchestration/ci.py | 2 +- 7 files changed, 26 insertions(+), 6 deletions(-) diff --git a/projects/core/library/ci.py b/projects/core/library/ci.py index 6425a806..fcf0707b 100644 --- a/projects/core/library/ci.py +++ b/projects/core/library/ci.py @@ -11,6 +11,8 @@ import sys import traceback +import click + from projects.core.dsl import toolbox as dsl_toolbox from projects.core.dsl.runtime import TaskExecutionError from projects.core.library import env @@ -18,6 +20,22 @@ logger = logging.getLogger(__name__) +class HelpfulGroup(click.Group): + """ + A Click group that automatically shows help when a command is not found. + """ + + def get_command(self, ctx, cmd_name): + rv = click.Group.get_command(self, ctx, cmd_name) + if rv is not None: + return rv + + # Command not found - show help + print(f"Error: No such command '{cmd_name}'.\n", file=sys.stderr) + print(ctx.get_help(), file=sys.stderr) + ctx.exit(2) + + def handle_ci_exception(e: Exception) -> None: """ Handle CI exceptions with comprehensive logging and failure file creation. diff --git a/projects/foreign_testing/orchestration/ci.py b/projects/foreign_testing/orchestration/ci.py index a8aa1353..a399f630 100755 --- a/projects/foreign_testing/orchestration/ci.py +++ b/projects/foreign_testing/orchestration/ci.py @@ -15,7 +15,7 @@ from projects.core.library import ci as ci_lib -@click.group() +@click.group(cls=ci_lib.HelpfulGroup) @click.pass_context @ci_lib.safe_ci_function def main(ctx): diff --git a/projects/fournos_launcher/orchestration/ci.py b/projects/fournos_launcher/orchestration/ci.py index 86e833ee..a765bc77 100755 --- a/projects/fournos_launcher/orchestration/ci.py +++ b/projects/fournos_launcher/orchestration/ci.py @@ -51,7 +51,7 @@ def _set_job_owner_from_pull_request(): logger.info(f"Set job owner from pull request: {user_login}") -@click.group() +@click.group(cls=ci_lib.HelpfulGroup) @click.pass_context @ci_lib.safe_ci_function def main(ctx): diff --git a/projects/jump_ci/orchestration/ci.py b/projects/jump_ci/orchestration/ci.py index a158a764..0a6b8f88 100755 --- a/projects/jump_ci/orchestration/ci.py +++ b/projects/jump_ci/orchestration/ci.py @@ -10,6 +10,8 @@ import click +from projects.core.library import ci as ci_lib + # Add the testing directory to path for imports testing_dir = Path(__file__).parent.parent / "testing" if str(testing_dir) not in sys.path: @@ -39,7 +41,7 @@ def run_and_catch(phase, fct, *args, **kwargs): sys.exit(1) -@click.group() +@click.group(cls=ci_lib.HelpfulGroup) @click.option("--verbose", "-v", is_flag=True, help="Enable verbose output") @click.pass_context def main(ctx, verbose): diff --git a/projects/llm_d/orchestration/ci.py b/projects/llm_d/orchestration/ci.py index 7623510f..0dd4c6bd 100755 --- a/projects/llm_d/orchestration/ci.py +++ b/projects/llm_d/orchestration/ci.py @@ -13,7 +13,7 @@ from projects.core.library import ci as ci_lib -@click.group() +@click.group(cls=ci_lib.HelpfulGroup) @click.pass_context @ci_lib.safe_ci_function def main(ctx): diff --git a/projects/llm_d_legacy/orchestration/ci.py b/projects/llm_d_legacy/orchestration/ci.py index cbd73f8b..afe45907 100755 --- a/projects/llm_d_legacy/orchestration/ci.py +++ b/projects/llm_d_legacy/orchestration/ci.py @@ -44,7 +44,7 @@ def log(message: str, level: str = "info"): click.echo(f"{icon} [{project_name}] {message}") -@click.group() +@click.group(cls=ci_lib.HelpfulGroup) @click.pass_context @ci_lib.safe_ci_function def main(ctx): diff --git a/projects/skeleton/orchestration/ci.py b/projects/skeleton/orchestration/ci.py index 9c152242..f4a6d04d 100755 --- a/projects/skeleton/orchestration/ci.py +++ b/projects/skeleton/orchestration/ci.py @@ -19,7 +19,7 @@ from projects.core.library.export import caliper_export_command -@click.group() +@click.group(cls=ci_lib.HelpfulGroup) @click.pass_context @ci_lib.safe_ci_function def main(ctx): From 4e2e5d16cf9aa1d6d16ec147c8157951cd81cfa7 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Tue, 12 May 2026 20:19:21 +0200 Subject: [PATCH 07/16] [core] library: replot: add the replot library --- projects/core/library/replot.py | 103 ++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 projects/core/library/replot.py diff --git a/projects/core/library/replot.py b/projects/core/library/replot.py new file mode 100644 index 00000000..ecd1e869 --- /dev/null +++ b/projects/core/library/replot.py @@ -0,0 +1,103 @@ +""" +Shared "replot" CLI for FORGE project orchestration. + +Registers a click subcommand for replotting artifacts and visualizations. +""" + +from __future__ import annotations + +import logging +from pathlib import Path + +import click + +from projects.caliper.orchestration.replot import run_replot_from_orchestration_config +from projects.core.library import ci as ci_lib +from projects.core.library import config, env + +logger = logging.getLogger(__name__) + + +def run_replot(*, artifact_directory: Path | None): + """Run replotting logic on the specified artifact directory.""" + + if artifact_directory is None: + artifact_directory = env.ARTIFACT_DIR + + # Get the replot URL from configuration + replot_url = config.project.get_config("caliper.replot.url", None, print=False, warn=False) + if not replot_url: + raise ValueError( + "caliper.replot.url is not configured. Use /replot.url directive or set in config." + ) + + keep_replot_dir = config.project.get_config( + "caliper.replot.keep", False, print=False, warn=False + ) + + # Load MLflow configuration from export settings to get vault secrets + export_config = config.project.get_config("caliper.export", {}, print=False, warn=False) + mlflow_backend = export_config.get("backend", {}).get("mlflow", {}) + + if not mlflow_backend.get("enabled", False): + raise ValueError( + "MLflow export is not enabled in configuration. Cannot determine authentication settings." + ) + + # Get vault secrets configuration (same as export) + secrets_config = mlflow_backend.get("secrets", {}).get("vault", {}) + vault_name = secrets_config.get("name") + vault_mlflow_secret = secrets_config.get("mlflow_secret") + + if not vault_name or not vault_mlflow_secret: + raise ValueError( + "MLflow vault configuration missing. Check caliper.export.backend.mlflow.secrets.vault settings." + ) + + # Get AWS credentials configuration (optional) + vault_aws_secret = secrets_config.get("aws_secret") + + # Get post-processing configuration + postprocess_config = config.project.get_config( + "caliper.postprocess", {}, print=False, warn=False + ) + + # Run the actual replot operation + return run_replot_from_orchestration_config( + replot_url=replot_url, + artifact_directory=artifact_directory, + vault_name=vault_name, + vault_mlflow_secret=vault_mlflow_secret, + vault_aws_secret=vault_aws_secret, + keep_replot_dir=keep_replot_dir, + postprocess_config=postprocess_config, + ) + + +@click.command("replot") +@click.option( + "--artifact-directory", + "artifact_directory", + type=click.Path(path_type=Path, exists=False, file_okay=True, dir_okay=True), + default=None, + help="Artifact root directory to replot from (defaults to ARTIFACT_DIR).", +) +@click.pass_context +@ci_lib.safe_ci_command +def replot_command(_ctx, artifact_directory: Path | None): + """Replot artifacts and visualizations from a remote URL.""" + + status = run_replot(artifact_directory=artifact_directory) + + # Log the status + import yaml + + logger.info("Replot status:") + logger.info(yaml.dump(status, indent=2)) + + # Check if replot was successful + replot_status = status.get("replot", {}).get("status", "unknown") + if replot_status != "success": + return 1 + + return 0 From c745d7f1f7b359fd8aead8b33b039f54915ec14a Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 7 May 2026 16:17:43 +0200 Subject: [PATCH 08/16] [skeleton] postprocess: refactor the default plugin --- .../postprocess/default/parsing/__init__.py | 6 + .../postprocess/default/parsing/kpis.py | 103 ++++++++++ .../postprocess/default/parsing/parsers.py | 85 +++++++++ .../postprocess/default/plotting/__init__.py | 6 + .../default/plotting/summary_table.py | 54 ++++++ .../default/plotting/throughput_chart.py | 51 +++++ .../skeleton/postprocess/default/plugin.py | 178 +++--------------- 7 files changed, 329 insertions(+), 154 deletions(-) create mode 100644 projects/skeleton/postprocess/default/parsing/__init__.py create mode 100644 projects/skeleton/postprocess/default/parsing/kpis.py create mode 100644 projects/skeleton/postprocess/default/parsing/parsers.py create mode 100644 projects/skeleton/postprocess/default/plotting/__init__.py create mode 100644 projects/skeleton/postprocess/default/plotting/summary_table.py create mode 100644 projects/skeleton/postprocess/default/plotting/throughput_chart.py diff --git a/projects/skeleton/postprocess/default/parsing/__init__.py b/projects/skeleton/postprocess/default/parsing/__init__.py new file mode 100644 index 00000000..790f3752 --- /dev/null +++ b/projects/skeleton/postprocess/default/parsing/__init__.py @@ -0,0 +1,6 @@ +"""Parsing package for Skeleton Caliper plugin.""" + +from .kpis import SkeletonKpiHandler +from .parsers import SkeletonParser + +__all__ = ["SkeletonParser", "SkeletonKpiHandler"] diff --git a/projects/skeleton/postprocess/default/parsing/kpis.py b/projects/skeleton/postprocess/default/parsing/kpis.py new file mode 100644 index 00000000..9585f3b1 --- /dev/null +++ b/projects/skeleton/postprocess/default/parsing/kpis.py @@ -0,0 +1,103 @@ +"""KPI definitions and computation for Skeleton Caliper plugin.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from typing import Any + +from projects.caliper.engine.model import UnifiedRunModel + + +class SkeletonKpiHandler: + """Handles KPI catalog and computation for skeleton project.""" + + @staticmethod + def get_catalog() -> list[dict[str, Any]]: + """ + Return the KPI catalog for skeleton metrics. + + Returns: + List of KPI definitions + """ + return [ + { + "kpi_id": "skeleton_throughput_rps", + "name": "Skeleton throughput", + "unit": "req/s", + "higher_is_better": True, + }, + { + "kpi_id": "skeleton_latency_ms", + "name": "Skeleton latency", + "unit": "ms", + "higher_is_better": False, + }, + ] + + @staticmethod + def compute_kpis(model: UnifiedRunModel) -> list[dict[str, Any]]: + """ + Compute KPI values from the unified model. + + Args: + model: Unified model containing parsed test results + + Returns: + List of KPI records + """ + ts = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") + out: list[dict[str, Any]] = [] + + for r in model.unified_result_records: + tp_raw = r.metrics.get("throughput", 0) + lat_raw = r.metrics.get("latency_ms", 0) + + # Convert throughput to float + try: + tp = float(tp_raw) + except (TypeError, ValueError): + tp = 0.0 + + # Convert latency to float + try: + lat = float(lat_raw) + except (TypeError, ValueError): + lat = 0.0 + + base_labels = {**r.distinguishing_labels} + + # Add throughput KPI + out.append( + { + "schema_version": "1", + "kpi_id": "skeleton_throughput_rps", + "value": tp, + "unit": "req/s", + "run_id": r.test_base_path, + "timestamp": ts, + "labels": {**base_labels, "higher_is_better": True}, + "source": { + "test_base_path": r.test_base_path, + "plugin_module": model.plugin_module, + }, + } + ) + + # Add latency KPI + out.append( + { + "schema_version": "1", + "kpi_id": "skeleton_latency_ms", + "value": lat, + "unit": "ms", + "run_id": r.test_base_path, + "timestamp": ts, + "labels": {**base_labels, "higher_is_better": False}, + "source": { + "test_base_path": r.test_base_path, + "plugin_module": model.plugin_module, + }, + } + ) + + return out diff --git a/projects/skeleton/postprocess/default/parsing/parsers.py b/projects/skeleton/postprocess/default/parsing/parsers.py new file mode 100644 index 00000000..2be3df21 --- /dev/null +++ b/projects/skeleton/postprocess/default/parsing/parsers.py @@ -0,0 +1,85 @@ +"""File parsers for Skeleton Caliper plugin.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from projects.caliper.engine.model import ( + ParseResult, + TestBaseNode, + UnifiedResultRecord, +) + + +def _labels_from_node(node: TestBaseNode) -> dict[str, Any]: + """Extract labels from a test node.""" + raw = node.labels + inner = raw.get("labels") + if isinstance(inner, dict): + return dict(inner) + if isinstance(raw, dict): + return dict(raw) + return {"facet": "default"} + + +class SkeletonParser: + """Parser for skeleton project test artifacts.""" + + def parse_metrics_json(self, file_path: Path) -> tuple[dict[str, Any], list[str]]: + """ + Parse a metrics.json file. + + Returns: + Tuple of (metrics dict, warnings list) + """ + warnings: list[str] = [] + + try: + metrics = json.loads(file_path.read_text(encoding="utf-8")) + if not isinstance(metrics, dict): + warnings.append(f"{file_path}: metrics.json must be a JSON object") + return {}, warnings + return metrics, warnings + except json.JSONDecodeError as e: + warnings.append(f"Malformed JSON {file_path}: {e}") + return {"_parse_error": True}, warnings + + def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult: + """ + Parse test nodes containing metrics.json files. + + Args: + base_dir: Base directory for the test run + nodes: List of test nodes to parse + + Returns: + ParseResult with unified records and warnings + """ + records: list[UnifiedResultRecord] = [] + warnings: list[str] = [] + + for node in nodes: + metrics: dict[str, Any] = {} + for p in node.artifact_paths: + if p.name != "metrics.json": + continue + + node_metrics, node_warnings = self.parse_metrics_json(p) + metrics = node_metrics + warnings.extend(node_warnings) + break + + labels = _labels_from_node(node) + records.append( + UnifiedResultRecord( + test_base_path=str(node.directory.relative_to(base_dir.resolve())), + distinguishing_labels=labels, + metrics=dict(metrics) if metrics else {"throughput": 0.0}, + run_identity={"skeleton_sample": True}, + parse_notes=[], + ) + ) + + return ParseResult(records=records, warnings=warnings) diff --git a/projects/skeleton/postprocess/default/plotting/__init__.py b/projects/skeleton/postprocess/default/plotting/__init__.py new file mode 100644 index 00000000..4a16f3bd --- /dev/null +++ b/projects/skeleton/postprocess/default/plotting/__init__.py @@ -0,0 +1,6 @@ +"""Plotting package for Skeleton Caliper plugin.""" + +from .summary_table import SummaryTablePlot +from .throughput_chart import ThroughputChartPlot + +__all__ = ["SummaryTablePlot", "ThroughputChartPlot"] diff --git a/projects/skeleton/postprocess/default/plotting/summary_table.py b/projects/skeleton/postprocess/default/plotting/summary_table.py new file mode 100644 index 00000000..8e19ef2a --- /dev/null +++ b/projects/skeleton/postprocess/default/plotting/summary_table.py @@ -0,0 +1,54 @@ +"""Summary table plot for Skeleton Caliper plugin.""" + +from __future__ import annotations + +import html as html_lib +from pathlib import Path + +from projects.caliper.engine.model import UnifiedRunModel + + +class SummaryTablePlot: + """Generates an HTML summary table of test results.""" + + @staticmethod + def generate(model: UnifiedRunModel, output_dir: Path) -> str: + """ + Generate a summary table HTML report. + + Args: + model: Unified model containing parsed test results + output_dir: Directory to write the output file + + Returns: + Path to the generated HTML file + """ + rows = [] + for r in model.unified_result_records: + scenario = r.distinguishing_labels.get("scenario", "") + tp = r.metrics.get("throughput", "") + lat = r.metrics.get("latency_ms", "") + rows.append( + "" + f"{html_lib.escape(str(r.test_base_path))}" + f"{html_lib.escape(str(scenario))}" + f"{html_lib.escape(str(tp))}" + f"{html_lib.escape(str(lat))}" + "" + ) + + table_html = ( + "" + "Skeleton sample — summary" + "" + "

Skeleton default plugin — summary

" + "" + "" + "" + "".join(rows) + "
test_base_pathscenariothroughputlatency_ms
" + ) + + output_file = output_dir / "summary_table.html" + output_file.write_text(table_html, encoding="utf-8") + return str(output_file) diff --git a/projects/skeleton/postprocess/default/plotting/throughput_chart.py b/projects/skeleton/postprocess/default/plotting/throughput_chart.py new file mode 100644 index 00000000..26acb524 --- /dev/null +++ b/projects/skeleton/postprocess/default/plotting/throughput_chart.py @@ -0,0 +1,51 @@ +"""Throughput chart plot for Skeleton Caliper plugin.""" + +from __future__ import annotations + +from pathlib import Path + +import plotly.graph_objects as go + +from projects.caliper.engine.model import UnifiedRunModel + + +class ThroughputChartPlot: + """Generates a Plotly bar chart of throughput by scenario.""" + + @staticmethod + def generate(model: UnifiedRunModel, output_dir: Path) -> str: + """ + Generate a throughput chart HTML report. + + Args: + model: Unified model containing parsed test results + output_dir: Directory to write the output file + + Returns: + Path to the generated HTML file + """ + xs: list[str] = [] + ys: list[float] = [] + + for r in model.unified_result_records: + label = str(r.distinguishing_labels.get("scenario") or r.test_base_path) + raw = r.metrics.get("throughput", 0) + + try: + y = float(raw) + except (TypeError, ValueError): + y = 0.0 + + xs.append(label) + ys.append(y) + + fig = go.Figure(data=[go.Bar(x=xs, y=ys)]) + fig.update_layout( + title="Throughput by scenario", + xaxis_title="Scenario", + yaxis_title="Throughput", + ) + + output_file = output_dir / "throughput_chart.html" + fig.write_html(output_file, include_plotlyjs="cdn") + return str(output_file) diff --git a/projects/skeleton/postprocess/default/plugin.py b/projects/skeleton/postprocess/default/plugin.py index 254c05c2..d83473ab 100644 --- a/projects/skeleton/postprocess/default/plugin.py +++ b/projects/skeleton/postprocess/default/plugin.py @@ -2,8 +2,6 @@ from __future__ import annotations -import json -from datetime import UTC, datetime from pathlib import Path from typing import Any @@ -11,19 +9,11 @@ ParseResult, PostProcessingPlugin, TestBaseNode, - UnifiedResultRecord, UnifiedRunModel, ) - -def _labels_from_node(node: TestBaseNode) -> dict[str, Any]: - raw = node.labels - inner = raw.get("labels") - if isinstance(inner, dict): - return dict(inner) - if isinstance(raw, dict): - return dict(raw) - return {"facet": "default"} +from .parsing import SkeletonKpiHandler, SkeletonParser +from .plotting import SummaryTablePlot, ThroughputChartPlot class SkeletonDefaultPlugin(PostProcessingPlugin): @@ -36,34 +26,17 @@ class SkeletonDefaultPlugin(PostProcessingPlugin): * ``throughput_chart`` — bar chart of ``throughput`` when present. """ + def __init__(self): + self.parser = SkeletonParser() + self.kpi_handler = SkeletonKpiHandler() + self.plots = { + "summary_table": SummaryTablePlot, + "throughput_chart": ThroughputChartPlot, + } + def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult: - records: list[UnifiedResultRecord] = [] - warnings: list[str] = [] - for node in nodes: - metrics: dict[str, Any] = {} - for p in node.artifact_paths: - if p.name != "metrics.json": - continue - try: - metrics = json.loads(p.read_text(encoding="utf-8")) - if not isinstance(metrics, dict): - warnings.append(f"{p}: metrics.json must be a JSON object") - metrics = {} - except json.JSONDecodeError as e: - warnings.append(f"Malformed JSON {p}: {e}") - metrics = {"_parse_error": True} - break - labels = _labels_from_node(node) - records.append( - UnifiedResultRecord( - test_base_path=str(node.directory.relative_to(base_dir.resolve())), - distinguishing_labels=labels, - metrics=dict(metrics) if metrics else {"throughput": 0.0}, - run_identity={"skeleton_sample": True}, - parse_notes=[], - ) - ) - return ParseResult(records=records, warnings=warnings) + """Parse test nodes using the skeleton parser.""" + return self.parser.parse(base_dir, nodes) def visualize( self, @@ -73,133 +46,29 @@ def visualize( group_id: str | None, visualize_config: dict[str, Any] | None, ) -> list[str]: - import html as html_lib - - import plotly.graph_objects as go - + """Generate visualization reports.""" output_dir.mkdir(parents=True, exist_ok=True) paths: list[str] = [] wanted = frozenset(report_ids or ()) - if "summary_table" in wanted: - rows = [] - for r in model.unified_result_records: - scenario = r.distinguishing_labels.get("scenario", "") - tp = r.metrics.get("throughput", "") - lat = r.metrics.get("latency_ms", "") - rows.append( - "" - f"{html_lib.escape(str(r.test_base_path))}" - f"{html_lib.escape(str(scenario))}" - f"{html_lib.escape(str(tp))}" - f"{html_lib.escape(str(lat))}" - "" - ) - table_html = ( - "" - "Skeleton sample — summary" - "" - "

Skeleton default plugin — summary

" - "" - "" - "" + "".join(rows) + "
test_base_pathscenariothroughputlatency_ms
" - ) - out = output_dir / "summary_table.html" - out.write_text(table_html, encoding="utf-8") - paths.append(str(out)) - - if "throughput_chart" in wanted: - xs: list[str] = [] - ys: list[float] = [] - for r in model.unified_result_records: - label = str(r.distinguishing_labels.get("scenario") or r.test_base_path) - raw = r.metrics.get("throughput", 0) - try: - y = float(raw) - except (TypeError, ValueError): - y = 0.0 - xs.append(label) - ys.append(y) - fig = go.Figure(data=[go.Bar(x=xs, y=ys)]) - fig.update_layout( - title="Throughput by scenario", - xaxis_title="Scenario", - yaxis_title="Throughput", - ) - out = output_dir / "throughput_chart.html" - fig.write_html(out, include_plotlyjs="cdn") - paths.append(str(out)) + for report_id in wanted: + if report_id in self.plots: + plot_class = self.plots[report_id] + path = plot_class.generate(model, output_dir) + paths.append(path) return paths def kpi_catalog(self) -> list[dict[str, Any]]: - return [ - { - "kpi_id": "skeleton_throughput_rps", - "name": "Skeleton throughput", - "unit": "req/s", - "higher_is_better": True, - }, - { - "kpi_id": "skeleton_latency_ms", - "name": "Skeleton latency", - "unit": "ms", - "higher_is_better": False, - }, - ] + """Return the KPI catalog.""" + return self.kpi_handler.get_catalog() def compute_kpis(self, model: UnifiedRunModel) -> list[dict[str, Any]]: - ts = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") - out: list[dict[str, Any]] = [] - for r in model.unified_result_records: - tp_raw = r.metrics.get("throughput", 0) - lat_raw = r.metrics.get("latency_ms", 0) - try: - tp = float(tp_raw) - except (TypeError, ValueError): - tp = 0.0 - try: - lat = float(lat_raw) - except (TypeError, ValueError): - lat = 0.0 - base_labels = { - **r.distinguishing_labels, - } - out.append( - { - "schema_version": "1", - "kpi_id": "skeleton_throughput_rps", - "value": tp, - "unit": "req/s", - "run_id": r.test_base_path, - "timestamp": ts, - "labels": {**base_labels, "higher_is_better": True}, - "source": { - "test_base_path": r.test_base_path, - "plugin_module": model.plugin_module, - }, - } - ) - out.append( - { - "schema_version": "1", - "kpi_id": "skeleton_latency_ms", - "value": lat, - "unit": "ms", - "run_id": r.test_base_path, - "timestamp": ts, - "labels": {**base_labels, "higher_is_better": False}, - "source": { - "test_base_path": r.test_base_path, - "plugin_module": model.plugin_module, - }, - } - ) - return out + """Compute KPI values from the unified model.""" + return self.kpi_handler.compute_kpis(model) def build_ai_eval_payload(self, model: UnifiedRunModel) -> dict[str, Any]: + """Build AI evaluation payload from the unified model.""" return { "schema_version": "1", "run_id": model.base_directory, @@ -215,4 +84,5 @@ def build_ai_eval_payload(self, model: UnifiedRunModel) -> dict[str, Any]: def get_plugin() -> PostProcessingPlugin: + """Return the skeleton plugin instance.""" return SkeletonDefaultPlugin() From 9ff0975f36fb92dfd2457a78649835c91ae55762 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Wed, 13 May 2026 06:49:24 +0200 Subject: [PATCH 09/16] [skeleton] orchestration: config: add caliper.replot --- projects/skeleton/orchestration/config.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/projects/skeleton/orchestration/config.yaml b/projects/skeleton/orchestration/config.yaml index fc99b7c9..1a8230c4 100644 --- a/projects/skeleton/orchestration/config.yaml +++ b/projects/skeleton/orchestration/config.yaml @@ -39,6 +39,10 @@ caliper: parameters: {} metrics: {} + replot: + url: null # Set via /replot.url directive + keep: false # Set to true to keep download directory after replotting + postprocess: enabled: true artifacts_dir: null From 427a3a59db527c3e029eebfab7625f1cab7f6ad8 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Tue, 12 May 2026 15:07:14 +0200 Subject: [PATCH 10/16] [fournos_launcher] sanitize the job name before submitting it --- projects/fournos_launcher/orchestration/submit.py | 6 ++++-- .../toolbox/submit_and_wait/main.py | 15 ++++++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/projects/fournos_launcher/orchestration/submit.py b/projects/fournos_launcher/orchestration/submit.py index 2611f343..4a1bc671 100644 --- a/projects/fournos_launcher/orchestration/submit.py +++ b/projects/fournos_launcher/orchestration/submit.py @@ -7,6 +7,7 @@ import traceback from datetime import datetime +from projects.core.dsl.utils.k8s import sanitize_k8s_name from projects.core.library import config, env, run, vault from projects.core.library.run_parallel import Parallel from projects.fournos_launcher.orchestration import job_management, pr_args @@ -159,6 +160,7 @@ def submit_job(): # Generate timestamp for parallel job names (shared across all parallel jobs) timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") raw_name = f"forge-{project_name}-{timestamp}" + raw_name = sanitize_k8s_name(raw_name) # Track failures and job names across parallel jobs failure_lock = threading.Lock() @@ -179,7 +181,7 @@ def submit_parallel_job(job_index, job_args_list): parallel_display_name = f"{project_name} {args_str} (job {job_index})".strip() # Create unique job name with timestamp and index - unique_job_name = f"{raw_name}-{job_index}" + unique_job_name = sanitize_k8s_name(f"{raw_name}-{job_index}") try: # Track the job name for cleanup (job gets submitted even if waiting fails) @@ -240,7 +242,7 @@ def submit_parallel_job(job_index, job_args_list): # Generate unique job name for single job timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") - single_job_name = f"forge-{project_name}-{timestamp}" + single_job_name = sanitize_k8s_name(f"forge-{project_name}-{timestamp}") try: submit_and_wait( diff --git a/projects/fournos_launcher/toolbox/submit_and_wait/main.py b/projects/fournos_launcher/toolbox/submit_and_wait/main.py index 6693ef42..818fa126 100755 --- a/projects/fournos_launcher/toolbox/submit_and_wait/main.py +++ b/projects/fournos_launcher/toolbox/submit_and_wait/main.py @@ -18,7 +18,7 @@ task, template, ) -from projects.core.dsl.utils.k8s import sanitize_k8s_name +from projects.core.dsl.utils.k8s import is_valid_k8s_name, sanitize_k8s_name from projects.core.library import env as env_mod logger = logging.getLogger(__name__) @@ -133,14 +133,19 @@ def generate_job_name(args, ctx): """Generate job name if not provided and ensure K8s compatibility""" if args.job_name: - # Sanitize user-provided job name - raw_name = args.job_name + # Validate that user-provided job name is already normalized + if not is_valid_k8s_name(args.job_name): + normalized_name = sanitize_k8s_name(args.job_name) + raise ValueError( + f"Job name '{args.job_name}' is not valid for Kubernetes. " + f"Please use the normalized name: '{normalized_name}'" + ) + ctx.final_job_name = args.job_name else: # Generate and sanitize auto job name timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") raw_name = f"forge-{args.project}-{timestamp}" - - ctx.final_job_name = sanitize_k8s_name(raw_name) + ctx.final_job_name = sanitize_k8s_name(raw_name) return f"Job name: {ctx.final_job_name}" From 5b56f98b3e24aedb5fa9d846b997cbb73306d218 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Tue, 12 May 2026 20:18:01 +0200 Subject: [PATCH 11/16] [fournos_launcher] orchestration: pr_args: add /replot.url parsing --- .../fournos_launcher/orchestration/pr_args.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/projects/fournos_launcher/orchestration/pr_args.py b/projects/fournos_launcher/orchestration/pr_args.py index a4485c55..b0003ace 100644 --- a/projects/fournos_launcher/orchestration/pr_args.py +++ b/projects/fournos_launcher/orchestration/pr_args.py @@ -54,6 +54,10 @@ def get_supported_fournos_directives() -> dict[str, str]: Example: /parallel 1 gpu-basic cpu-intensive /parallel 2 memory-heavy network-test Effect: Sets fournos_launcher.parallel_jobs[idx] = [preset1, preset2, ...]""", + "/replot.url": """Set URL for downloading artifacts to replot. + Format: /replot.url URL + Example: /replot.url s3://bucket/path/to/artifacts + Effect: Sets caliper.replot.url in configuration.""", "/help": """Show all supported FOURNOS directives. Format: /help Effect: Logs available directive information.""", @@ -185,6 +189,29 @@ def handle_gpu_directive(line: str) -> dict[str, str]: return {"fournos.job.hardware.gpu_type": gpu_type, "fournos.job.hardware.gpu_count": gpu_count} +def handle_replot_url_directive(line: str) -> dict[str, str]: + """ + Handle /replot.url directive for setting replot URL. + + Format: /replot.url URL + + Args: + line: The directive line + + Returns: + Dictionary with replot URL configuration + + Raises: + ValueError: If URL is empty + """ + replot_url = line.removeprefix("/replot.url").strip() + + if not replot_url: + raise ValueError(f"Invalid /replot.url directive: URL cannot be empty in '{line}'") + + return {"caliper.replot.url": replot_url} + + def handle_help_directive(line: str) -> dict[str, str]: """Handle /help directive for FOURNOS directives.""" # Create help directive handler using the factory @@ -267,6 +294,7 @@ def get_fournos_directive_handlers() -> dict[str, callable]: "/pipeline": handle_pipeline_directive, "/gpu": handle_gpu_directive, "/parallel": handle_parallel_directive, + "/replot.url": handle_replot_url_directive, "/help": handle_help_directive, } From 2f2af26b06d5f6b1c12ce5e9b77f622d59c838d5 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Tue, 12 May 2026 15:38:53 +0200 Subject: [PATCH 12/16] [llm_d_legacy] orchestration: ci: don't request GPUs --- projects/llm_d_legacy/orchestration/ci.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/projects/llm_d_legacy/orchestration/ci.py b/projects/llm_d_legacy/orchestration/ci.py index afe45907..dba2bdd6 100755 --- a/projects/llm_d_legacy/orchestration/ci.py +++ b/projects/llm_d_legacy/orchestration/ci.py @@ -108,9 +108,6 @@ def resolve_hardware_request(hardware_spec: dict): # - Handle different hardware profiles (GPU, CPU, memory requirements) # - Example: return {"gpu": {"type": "nvidia-tesla-v100", "count": 1}, "memory": "32Gi"} - hardware_spec["gpuType"] = "h200" - hardware_spec["gpuCount"] = 4 - return hardware_spec def list_vaults(): From e449b9df3b429b5741c8ceaf53b482c5e7758aa7 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Tue, 12 May 2026 20:18:47 +0200 Subject: [PATCH 13/16] [llm_d_legacy] orchestration: ci: add the replot entrypoint --- projects/llm_d_legacy/orchestration/ci.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/projects/llm_d_legacy/orchestration/ci.py b/projects/llm_d_legacy/orchestration/ci.py index dba2bdd6..b949dc64 100755 --- a/projects/llm_d_legacy/orchestration/ci.py +++ b/projects/llm_d_legacy/orchestration/ci.py @@ -16,6 +16,7 @@ from projects.core.library import config as forge_config from projects.core.library import ci as ci_lib from projects.core.library.export import caliper_export_command +from projects.core.library.replot import replot_command from projects.legacy.library import config from projects.legacy.library import env as legacy_env from projects.caliper.orchestration.export import run_from_orchestration_config @@ -87,6 +88,7 @@ def test(ctx): main.add_command(caliper_export_command) +main.add_command(replot_command) def resolve_hardware_request(hardware_spec: dict): """ From 676bb5fd2040879b0fff0b6fd2cb793a3d53fa52 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Wed, 13 May 2026 06:49:02 +0200 Subject: [PATCH 14/16] [llm_d_legacy] testing: config: add the caliper post-configuration --- projects/llm_d_legacy/testing/config.yaml | 31 +++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/projects/llm_d_legacy/testing/config.yaml b/projects/llm_d_legacy/testing/config.yaml index 1bb657ad..2a408cb7 100644 --- a/projects/llm_d_legacy/testing/config.yaml +++ b/projects/llm_d_legacy/testing/config.yaml @@ -343,3 +343,34 @@ caliper: tags: {} parameters: {} metrics: {} + + replot: + url: null # Set via /replot.url directive + keep: false # Set to true to keep download directory after replotting + + postprocess: + enabled: true + artifacts_dir: null + plugin_module: projects.caliper.postprocess.guidellm.plugin + postprocess_config: null + parse: + enabled: true + no_cache: false + visualize: + enabled: true + output_dir: null + reports: [benchmark_summary, latency_breakdown] + report_group: null + + kpi: + enabled: false + generate: + enabled: false + output: kpis.jsonl + export: + enabled: false + + analyze: + enabled: false + baseline: null + output: kpi_analyze.json From da0cfafbfdd06226c9ca861d7d6d51e1c481e69e Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Wed, 13 May 2026 09:36:38 +0200 Subject: [PATCH 15/16] [llm_d_legacy] orchestration: ci: init in main --- projects/llm_d_legacy/orchestration/ci.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/llm_d_legacy/orchestration/ci.py b/projects/llm_d_legacy/orchestration/ci.py index b949dc64..a9f3501f 100755 --- a/projects/llm_d_legacy/orchestration/ci.py +++ b/projects/llm_d_legacy/orchestration/ci.py @@ -51,6 +51,7 @@ def log(message: str, level: str = "info"): def main(ctx): """Jump CI Project CI Operations for TOPSAIL-NG.""" ctx.ensure_object(dict) + init() inited = False def init(): @@ -81,7 +82,6 @@ def test(ctx): """Test phase - Trigger the project's test method.""" log("Starting test phase...") - init() failed = test_llmd.test() sys.exit(1 if failed else 0) From ed039a342a684973bcb91b9e90124019ef6fc4e9 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Wed, 13 May 2026 14:05:35 +0200 Subject: [PATCH 16/16] [caliper] orchestration: replot: new binding --- projects/caliper/orchestration/replot.py | 325 +++++++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100644 projects/caliper/orchestration/replot.py diff --git a/projects/caliper/orchestration/replot.py b/projects/caliper/orchestration/replot.py new file mode 100644 index 00000000..ff54c190 --- /dev/null +++ b/projects/caliper/orchestration/replot.py @@ -0,0 +1,325 @@ +""" +Config-driven Caliper artifact replot for FORGE orchestration projects. + +Handles MLflow artifact downloading and post-processing pipeline execution. +""" + +from __future__ import annotations + +import contextlib +import io +import logging +import os +import re +import shutil +from pathlib import Path + +from projects.caliper.engine.file_export.mlflow_secrets import ( + load_mlflow_secrets_yaml, + mlflow_connection_env, +) +from projects.caliper.orchestration.postprocess import run_postprocess_from_orchestration_config +from projects.caliper.orchestration.postprocess_outcome import TestPhaseOutcome +from projects.core.library import vault as vault_lib + +logger = logging.getLogger(__name__) + + +def _download_mlflow_artifacts( + replot_url: str, + replot_download_dir: Path, + mlflow_secrets_path: Path, + aws_credentials_path: Path | None = None, +) -> dict: + """ + Download MLflow artifacts from a replot URL. + + Args: + replot_url: MLflow URL containing run ID + replot_download_dir: Local directory to download artifacts to + mlflow_secrets_path: Path to MLflow secrets file + aws_credentials_path: Optional path to AWS credentials file + + Returns: + Dict containing download status information + + Raises: + ValueError: If URL parsing or configuration validation fails + FileNotFoundError: If vault secrets are not found + RuntimeError: If MLflow download fails + """ + # Handle MLflow web UI URLs like: https://mlflow.server.com/#/experiments/0/runs/RUN_ID + # or API URLs like: https://mlflow.server.com/runs/RUN_ID + + # Extract run ID from either fragment (#/experiments/N/runs/ID) or path (/runs/ID) + run_id_match = re.search(r"[/#]runs/([^/?#]+)", replot_url) + if not run_id_match: + raise ValueError(f"Could not parse MLflow run ID from URL: {replot_url}") + + run_id = run_id_match.group(1) + + # Extract base MLflow tracking URI (remove fragments and paths) + # For https://mlflow.server.com/#/experiments/0/runs/ID -> https://mlflow.server.com + # For https://mlflow.server.com/runs/ID -> https://mlflow.server.com + from urllib.parse import urlparse + + parsed = urlparse(replot_url) + mlflow_uri = f"{parsed.scheme}://{parsed.netloc}" + + # Load MLflow secrets and validate tracking URI + mlflow_secrets = load_mlflow_secrets_yaml(mlflow_secrets_path) + export_tracking_uri = mlflow_secrets.get("tracking_uri", "").rstrip("/") + replot_tracking_uri = mlflow_uri.rstrip("/") + + logger.debug(f"Parsed tracking URI from replot URL: {replot_tracking_uri}") + logger.debug(f"Export tracking URI from vault: {export_tracking_uri}") + + if export_tracking_uri != replot_tracking_uri: + raise ValueError( + f"Replot URL tracking URI ({replot_tracking_uri}) does not match " + f"export configuration tracking URI ({export_tracking_uri}). " + f"For security, replot can only download from the same MLflow server used for export." + ) + + logger.info(f"Downloading from MLflow: {mlflow_uri}, run_id: {run_id}") + + # Download artifacts using MLflow with proper authentication + try: + import mlflow + + # Suppress noisy connection warnings and progress bars + import urllib3 + from mlflow.tracking import MlflowClient + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) + logging.getLogger("botocore").setLevel(logging.WARNING) + logging.getLogger("boto3").setLevel(logging.WARNING) + + # Suppress MLflow progress bars by setting environment variable + os.environ["MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR"] = "false" + + # Set AWS shared credentials file for MLflow S3 artifact storage (if provided) + previous_aws_creds = os.environ.get("AWS_SHARED_CREDENTIALS_FILE") + try: + if aws_credentials_path is not None: + os.environ["AWS_SHARED_CREDENTIALS_FILE"] = str(aws_credentials_path) + + # Use the same MLflow connection setup as export + with mlflow_connection_env(mlflow_secrets): + # Set tracking URI before creating client (AWS credentials need to be set first) + mlflow.set_tracking_uri(mlflow_uri) + client = MlflowClient() + + # Download artifacts to the output directory (suppress progress bar output) + with ( + contextlib.redirect_stdout(io.StringIO()), + contextlib.redirect_stderr(io.StringIO()), + ): + downloaded_path = client.download_artifacts( + run_id=run_id, + path="", # Download all artifacts + dst_path=str(replot_download_dir), + ) + + # Count downloaded files + if Path(downloaded_path).is_file(): + downloaded_files = [Path(downloaded_path)] + else: + downloaded_files = list(Path(downloaded_path).rglob("*")) + downloaded_files = [f for f in downloaded_files if f.is_file()] + + logger.info(f"Downloaded {len(downloaded_files)} files to {replot_download_dir}") + if downloaded_files: + logger.info("Downloaded files:") + for file in downloaded_files[:10]: # Show first 10 + try: + relative_path = file.relative_to(replot_download_dir) + logger.info(f" {relative_path}") + except ValueError: + logger.info(f" {file}") + if len(downloaded_files) > 10: + logger.info(f" ... and {len(downloaded_files) - 10} more") + + return { + "download_status": "success", + "downloaded_files": len(downloaded_files), + "validated_tracking_uri": export_tracking_uri, + "run_id": run_id, + "tracking_uri": mlflow_uri, + } + + except Exception as e: + raise RuntimeError(f"MLflow artifact download failed: {e}") from e + finally: + # Restore previous AWS credentials environment variable + if previous_aws_creds is None: + os.environ.pop("AWS_SHARED_CREDENTIALS_FILE", None) + else: + os.environ["AWS_SHARED_CREDENTIALS_FILE"] = previous_aws_creds + + except Exception as e: + raise RuntimeError(f"MLflow artifact download failed: {e}") from e + + +def run_replot_from_orchestration_config( + replot_url: str, + artifact_directory: Path, + vault_name: str, + vault_mlflow_secret: str, + vault_aws_secret: str | None = None, + keep_replot_dir: bool = False, + postprocess_config: dict | None = None, +) -> dict: + """ + Run replotting logic with orchestration configuration. + + Args: + replot_url: MLflow URL to download artifacts from + artifact_directory: Directory for final artifacts output + vault_name: Name of the vault containing secrets + vault_mlflow_secret: MLflow secret key in the vault + vault_aws_secret: Optional AWS secret key in the vault + keep_replot_dir: Whether to keep the download directory after processing + postprocess_config: Configuration for post-processing + + Returns: + Dict containing replot operation status and results + """ + replot_download_dir = artifact_directory / "replot" + + logger.info(f"Replotting artifacts from URL: {replot_url}") + logger.info(f"Download directory: {replot_download_dir}") + logger.info(f"Output directory: {artifact_directory}") + + # Get MLflow secrets from vault + mlflow_secrets_path = vault_lib.get_vault_content_path(vault_name, vault_mlflow_secret) + if mlflow_secrets_path is None or not mlflow_secrets_path.exists(): + raise FileNotFoundError( + f"MLflow secrets not found in vault {vault_name}/{vault_mlflow_secret}" + ) + + # Get AWS credentials if provided + aws_credentials_path = None + if vault_aws_secret: + aws_credentials_path = vault_lib.get_vault_content_path(vault_name, vault_aws_secret) + + if aws_credentials_path is None: + raise ValueError(f"Vault {vault_name}/{vault_aws_secret} missing :/") + elif not aws_credentials_path.exists(): + raise FileNotFoundError(f"Vault {vault_name}/{vault_aws_secret} file missing :/") + + logger.info(f"Using AWS credentials from vault {vault_name}/{vault_aws_secret}") + + logger.info(f"Using MLflow secrets from vault {vault_name}/{vault_mlflow_secret}") + + status = { + "replot": { + "url": replot_url, + "download_directory": str(replot_download_dir), + "output_directory": str(artifact_directory), + "keep_download_dir": keep_replot_dir, + } + } + + try: + # Step 1: Download artifacts + logger.info("Downloading artifacts...") + + # Check if download directory already exists with content + if replot_download_dir.exists() and any(replot_download_dir.iterdir()): + logger.info( + f"Replot download directory already exists with content, skipping download: {replot_download_dir}" + ) + + # Count existing files for status + existing_files = list(replot_download_dir.rglob("*")) + existing_files = [f for f in existing_files if f.is_file()] + + logger.info(f"Found {len(existing_files)} existing files") + if existing_files: + logger.info("Existing files:") + for file in existing_files[:10]: # Show first 10 + try: + relative_path = file.relative_to(replot_download_dir) + logger.info(f" {relative_path}") + except ValueError: + logger.info(f" {file}") + if len(existing_files) > 10: + logger.info(f" ... and {len(existing_files) - 10} more") + + status["replot"].update( + { + "download_status": "skipped", + "downloaded_files": len(existing_files), + "skip_reason": "directory_already_exists", + } + ) + else: + # Create the download directory + replot_download_dir.mkdir(parents=True, exist_ok=True) + + # Download artifacts based on URL type + if "mlflow" in replot_url.lower() and "runs" in replot_url: + download_result = _download_mlflow_artifacts( + replot_url, replot_download_dir, mlflow_secrets_path, aws_credentials_path + ) + status["replot"].update(download_result) + else: + raise ValueError( + f"Unsupported replot URL type: {replot_url}. Only MLflow URLs are currently supported." + ) + + logger.info("Artifacts downloaded successfully") + + # Step 2: Run post-processing on downloaded artifacts + logger.info("Running post-processing...") + + postprocess_result = run_postprocess_from_orchestration_config( + postprocess_config_raw=postprocess_config or {}, + artifacts_dir=replot_download_dir, + visualize_output_dir=artifact_directory, + test_outcome=TestPhaseOutcome("SUCCESS"), + ) + + # Log post-processing results + if postprocess_result.get("steps", {}).get("visualize", {}).get("status") == "skipped": + logger.info("Post-processing completed (visualizations skipped)") + elif postprocess_result.get("steps", {}).get("visualize", {}).get("paths"): + viz_paths = postprocess_result["steps"]["visualize"]["paths"] + logger.info(f"Post-processing completed with {len(viz_paths)} visualizations generated") + else: + logger.info("Post-processing completed (parsing only)") + + status["replot"]["postprocess_status"] = "success" + status["replot"]["postprocess_result"] = postprocess_result + + # Step 3: Clean up download directory unless keeping + if not keep_replot_dir: + logger.info(f"Cleaning up download directory: {replot_download_dir}") + shutil.rmtree(replot_download_dir) + status["replot"]["cleanup_status"] = "completed" + else: + logger.info(f"Keeping download directory as requested: {replot_download_dir}") + status["replot"]["cleanup_status"] = "skipped" + + status["replot"]["status"] = "success" + status["replot"]["message"] = "Replot completed successfully" + + except Exception as e: + logger.error(f"Replot failed: {e}") + status["replot"]["status"] = "failed" + status["replot"]["message"] = str(e) + + # Try to clean up on failure unless keeping + if not keep_replot_dir and replot_download_dir.exists(): + try: + shutil.rmtree(replot_download_dir) + status["replot"]["cleanup_status"] = "completed_on_failure" + except Exception as cleanup_e: + logger.warning(f"Failed to cleanup download directory: {cleanup_e}") + status["replot"]["cleanup_status"] = "failed" + + raise + + return status