diff --git a/projects/caliper/cli/main.py b/projects/caliper/cli/main.py
index 64ec2545..b54a6100 100644
--- a/projects/caliper/cli/main.py
+++ b/projects/caliper/cli/main.py
@@ -60,7 +60,7 @@ def _require_artifacts_dir(ctx: click.Context) -> Path:
_exit_with_help(
ctx,
"This command requires the test artifact tree root: "
- "`--artifacts-dir DIR` or `--base-dir DIR` "
+ "`--artifacts-dir DIR` "
"(before or after the subcommand).",
code=1,
)
@@ -89,7 +89,6 @@ def _workspace_cli_options(cmd: Any) -> Any:
opts = (
click.option(
"--artifacts-dir",
- "--base-dir",
"artifacts_dir",
type=click.Path(path_type=Path, exists=True),
default=None,
@@ -105,12 +104,11 @@ def _workspace_cli_options(cmd: Any) -> Any:
help=_POSTPROCESS_CONFIG_HELP + " Overrides the global option when set here.",
),
click.option(
- "--plugin-module",
"--plugin",
"plugin_module_override",
metavar="MODULE",
default=None,
- help="Plugin import path; same as global --plugin-module / --plugin.",
+ help="Plugin import path; same as global --plugin.",
),
)
for opt in reversed(opts):
@@ -141,7 +139,6 @@ def _plugin_tuple(ctx: click.Context) -> tuple[str, Any]:
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.option(
"--artifacts-dir",
- "--base-dir",
"artifacts_dir",
type=click.Path(path_type=Path, exists=True),
default=None,
@@ -154,7 +151,6 @@ def _plugin_tuple(ctx: click.Context) -> tuple[str, Any]:
help=_POSTPROCESS_CONFIG_HELP,
)
@click.option(
- "--plugin-module",
"--plugin",
"plugin_module",
metavar="MODULE",
@@ -360,7 +356,7 @@ def kpi_analyze(
@main.group("artifacts")
@click.pass_context
def artifacts_group(ctx: click.Context) -> None:
- """File artifact export."""
+ """File artifact export and import."""
@artifacts_group.command("export")
@@ -521,7 +517,6 @@ def ai_eval_export(
plugin=plugin,
output=output,
use_cache=True,
- cache_path=None,
)
except Exception as e: # noqa: BLE001
click.echo(f"ai-eval-export failed: {e}", err=True)
@@ -529,6 +524,80 @@ def ai_eval_export(
click.echo(f"Wrote {output}")
+@artifacts_group.command("import")
+@click.option("--from-mlflow", "mlflow_run_id", help="MLflow run ID to download artifacts from.")
+@click.option(
+ "--output-dir",
+ type=click.Path(path_type=Path),
+ required=True,
+ help="Local directory to download artifacts to.",
+)
+@click.option(
+ "--mlflow-tracking-uri",
+ envvar="MLFLOW_TRACKING_URI",
+ help="MLflow tracking server URI (can be set via MLFLOW_TRACKING_URI).",
+)
+@click.option(
+ "--artifact-path",
+ default="",
+ help="Specific artifact path to download (default: download all artifacts).",
+)
+@click.pass_context
+def import_command(
+ ctx: click.Context,
+ mlflow_run_id: str | None,
+ output_dir: Path,
+ mlflow_tracking_uri: str | None,
+ artifact_path: str,
+) -> None:
+ """Download artifact files from MLflow."""
+ if mlflow_run_id:
+ if not mlflow_tracking_uri:
+ click.echo(
+ "Error: MLflow tracking URI required. Set --mlflow-tracking-uri or MLFLOW_TRACKING_URI.",
+ err=True,
+ )
+ sys.exit(1)
+
+ try:
+ import mlflow
+ from mlflow.tracking import MlflowClient
+
+ # Set tracking URI
+ mlflow.set_tracking_uri(mlflow_tracking_uri)
+ client = MlflowClient()
+
+ # Download artifacts
+ output_dir.mkdir(parents=True, exist_ok=True)
+
+ # Download artifacts to the output directory
+ downloaded_path = client.download_artifacts(
+ run_id=mlflow_run_id, path=artifact_path, dst_path=str(output_dir)
+ )
+
+ # Count downloaded files
+ if Path(downloaded_path).is_file():
+ downloaded_files = [Path(downloaded_path)]
+ else:
+ downloaded_files = list(Path(downloaded_path).rglob("*"))
+ downloaded_files = [f for f in downloaded_files if f.is_file()]
+
+ click.echo(f"Downloaded {len(downloaded_files)} files to {output_dir}")
+ if downloaded_files:
+ click.echo("Downloaded files:")
+ for file in downloaded_files[:10]: # Show first 10
+ click.echo(f" {file.relative_to(output_dir)}")
+ if len(downloaded_files) > 10:
+ click.echo(f" ... and {len(downloaded_files) - 10} more")
+
+ except Exception as e: # noqa: BLE001
+ click.echo(f"artifacts import failed: {e}", err=True)
+ sys.exit(2)
+ else:
+ click.echo("Error: Specify source backend: --from-mlflow RUN_ID", err=True)
+ sys.exit(1)
+
+
def run_cli() -> None:
"""Invoke CLI; on missing required options, print subcommand help."""
try:
@@ -537,14 +606,13 @@ def run_cli() -> None:
rv = main.main(standalone_mode=False, prog_name="caliper")
if isinstance(rv, int) and rv != 0:
sys.exit(rv)
- except click.MissingParameter as exc:
- msg = exc.format_message()
- sub = getattr(exc, "ctx", None)
- click.echo(f"Error: {msg}\n", err=True)
- if sub is not None:
- click.echo(sub.get_help(), err=True)
- ec = getattr(exc, "exit_code", 2)
- sys.exit(2 if ec is None else int(ec))
+ except click.ClickException as exc:
+ # Handle click exceptions including NoArgsIsHelpError and MissingParameter
+ if hasattr(exc, "ctx") and exc.ctx:
+ click.echo(exc.ctx.get_help(), err=True)
+ else:
+ exc.show(sys.stderr)
+ sys.exit(2)
except SystemExit:
raise
diff --git a/projects/caliper/engine/file_export/runner.py b/projects/caliper/engine/file_export/runner.py
index 539ba6b2..7ffb4f72 100644
--- a/projects/caliper/engine/file_export/runner.py
+++ b/projects/caliper/engine/file_export/runner.py
@@ -85,29 +85,29 @@ def run_file_export(
try:
if verbose:
print(f"caliper: starting backend {b!r} …", file=sys.stderr)
- detail, ml_meta = mlflow_backend.log_artifacts(
- artifact_root=source,
- paths=paths,
- tracking_uri=mlflow_tracking_uri,
- experiment=mlflow_experiment,
- run_id=mlflow_run_id,
- run_name=mlflow_run_name,
- insecure_tls=mlflow_insecure_tls,
- connection=mlflow_connection,
- verbose=verbose,
- upload_workers=upload_workers,
- run_metadata=mlflow_run_metadata,
- )
- results.append(
- FileExportBackendResult(
- backend="mlflow",
- status="success",
- detail=detail,
- metadata=ml_meta,
- )
+ detail, ml_meta = mlflow_backend.log_artifacts(
+ artifact_root=source,
+ paths=paths,
+ tracking_uri=mlflow_tracking_uri,
+ experiment=mlflow_experiment,
+ run_id=mlflow_run_id,
+ run_name=mlflow_run_name,
+ insecure_tls=mlflow_insecure_tls,
+ connection=mlflow_connection,
+ verbose=verbose,
+ upload_workers=upload_workers,
+ run_metadata=mlflow_run_metadata,
+ )
+ results.append(
+ FileExportBackendResult(
+ backend="mlflow",
+ status="success",
+ detail=detail,
+ metadata=ml_meta,
)
- if verbose:
- print(f"caliper: backend {b!r} finished ({detail})", file=sys.stderr)
+ )
+ if verbose:
+ print(f"caliper: backend {b!r} finished ({detail})", file=sys.stderr)
except Exception as e: # noqa: BLE001
# Full chain on stderr; stdout line stays a short message for automation.
traceback.print_exception(e, file=sys.stderr)
diff --git a/projects/caliper/orchestration/replot.py b/projects/caliper/orchestration/replot.py
new file mode 100644
index 00000000..ff54c190
--- /dev/null
+++ b/projects/caliper/orchestration/replot.py
@@ -0,0 +1,325 @@
+"""
+Config-driven Caliper artifact replot for FORGE orchestration projects.
+
+Handles MLflow artifact downloading and post-processing pipeline execution.
+"""
+
+from __future__ import annotations
+
+import contextlib
+import io
+import logging
+import os
+import re
+import shutil
+from pathlib import Path
+
+from projects.caliper.engine.file_export.mlflow_secrets import (
+ load_mlflow_secrets_yaml,
+ mlflow_connection_env,
+)
+from projects.caliper.orchestration.postprocess import run_postprocess_from_orchestration_config
+from projects.caliper.orchestration.postprocess_outcome import TestPhaseOutcome
+from projects.core.library import vault as vault_lib
+
+logger = logging.getLogger(__name__)
+
+
+def _download_mlflow_artifacts(
+ replot_url: str,
+ replot_download_dir: Path,
+ mlflow_secrets_path: Path,
+ aws_credentials_path: Path | None = None,
+) -> dict:
+ """
+ Download MLflow artifacts from a replot URL.
+
+ Args:
+ replot_url: MLflow URL containing run ID
+ replot_download_dir: Local directory to download artifacts to
+ mlflow_secrets_path: Path to MLflow secrets file
+ aws_credentials_path: Optional path to AWS credentials file
+
+ Returns:
+ Dict containing download status information
+
+ Raises:
+ ValueError: If URL parsing or configuration validation fails
+ FileNotFoundError: If vault secrets are not found
+ RuntimeError: If MLflow download fails
+ """
+ # Handle MLflow web UI URLs like: https://mlflow.server.com/#/experiments/0/runs/RUN_ID
+ # or API URLs like: https://mlflow.server.com/runs/RUN_ID
+
+ # Extract run ID from either fragment (#/experiments/N/runs/ID) or path (/runs/ID)
+ run_id_match = re.search(r"[/#]runs/([^/?#]+)", replot_url)
+ if not run_id_match:
+ raise ValueError(f"Could not parse MLflow run ID from URL: {replot_url}")
+
+ run_id = run_id_match.group(1)
+
+ # Extract base MLflow tracking URI (remove fragments and paths)
+ # For https://mlflow.server.com/#/experiments/0/runs/ID -> https://mlflow.server.com
+ # For https://mlflow.server.com/runs/ID -> https://mlflow.server.com
+ from urllib.parse import urlparse
+
+ parsed = urlparse(replot_url)
+ mlflow_uri = f"{parsed.scheme}://{parsed.netloc}"
+
+ # Load MLflow secrets and validate tracking URI
+ mlflow_secrets = load_mlflow_secrets_yaml(mlflow_secrets_path)
+ export_tracking_uri = mlflow_secrets.get("tracking_uri", "").rstrip("/")
+ replot_tracking_uri = mlflow_uri.rstrip("/")
+
+ logger.debug(f"Parsed tracking URI from replot URL: {replot_tracking_uri}")
+ logger.debug(f"Export tracking URI from vault: {export_tracking_uri}")
+
+ if export_tracking_uri != replot_tracking_uri:
+ raise ValueError(
+ f"Replot URL tracking URI ({replot_tracking_uri}) does not match "
+ f"export configuration tracking URI ({export_tracking_uri}). "
+ f"For security, replot can only download from the same MLflow server used for export."
+ )
+
+ logger.info(f"Downloading from MLflow: {mlflow_uri}, run_id: {run_id}")
+
+ # Download artifacts using MLflow with proper authentication
+ try:
+ import mlflow
+
+ # Suppress noisy connection warnings and progress bars
+ import urllib3
+ from mlflow.tracking import MlflowClient
+
+ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
+ logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
+ logging.getLogger("botocore").setLevel(logging.WARNING)
+ logging.getLogger("boto3").setLevel(logging.WARNING)
+
+ # Suppress MLflow progress bars by setting environment variable
+ os.environ["MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR"] = "false"
+
+ # Set AWS shared credentials file for MLflow S3 artifact storage (if provided)
+ previous_aws_creds = os.environ.get("AWS_SHARED_CREDENTIALS_FILE")
+ try:
+ if aws_credentials_path is not None:
+ os.environ["AWS_SHARED_CREDENTIALS_FILE"] = str(aws_credentials_path)
+
+ # Use the same MLflow connection setup as export
+ with mlflow_connection_env(mlflow_secrets):
+ # Set tracking URI before creating client (AWS credentials need to be set first)
+ mlflow.set_tracking_uri(mlflow_uri)
+ client = MlflowClient()
+
+ # Download artifacts to the output directory (suppress progress bar output)
+ with (
+ contextlib.redirect_stdout(io.StringIO()),
+ contextlib.redirect_stderr(io.StringIO()),
+ ):
+ downloaded_path = client.download_artifacts(
+ run_id=run_id,
+ path="", # Download all artifacts
+ dst_path=str(replot_download_dir),
+ )
+
+ # Count downloaded files
+ if Path(downloaded_path).is_file():
+ downloaded_files = [Path(downloaded_path)]
+ else:
+ downloaded_files = list(Path(downloaded_path).rglob("*"))
+ downloaded_files = [f for f in downloaded_files if f.is_file()]
+
+ logger.info(f"Downloaded {len(downloaded_files)} files to {replot_download_dir}")
+ if downloaded_files:
+ logger.info("Downloaded files:")
+ for file in downloaded_files[:10]: # Show first 10
+ try:
+ relative_path = file.relative_to(replot_download_dir)
+ logger.info(f" {relative_path}")
+ except ValueError:
+ logger.info(f" {file}")
+ if len(downloaded_files) > 10:
+ logger.info(f" ... and {len(downloaded_files) - 10} more")
+
+ return {
+ "download_status": "success",
+ "downloaded_files": len(downloaded_files),
+ "validated_tracking_uri": export_tracking_uri,
+ "run_id": run_id,
+ "tracking_uri": mlflow_uri,
+ }
+
+ except Exception as e:
+ raise RuntimeError(f"MLflow artifact download failed: {e}") from e
+ finally:
+ # Restore previous AWS credentials environment variable
+ if previous_aws_creds is None:
+ os.environ.pop("AWS_SHARED_CREDENTIALS_FILE", None)
+ else:
+ os.environ["AWS_SHARED_CREDENTIALS_FILE"] = previous_aws_creds
+
+ except Exception as e:
+ raise RuntimeError(f"MLflow artifact download failed: {e}") from e
+
+
+def run_replot_from_orchestration_config(
+ replot_url: str,
+ artifact_directory: Path,
+ vault_name: str,
+ vault_mlflow_secret: str,
+ vault_aws_secret: str | None = None,
+ keep_replot_dir: bool = False,
+ postprocess_config: dict | None = None,
+) -> dict:
+ """
+ Run replotting logic with orchestration configuration.
+
+ Args:
+ replot_url: MLflow URL to download artifacts from
+ artifact_directory: Directory for final artifacts output
+ vault_name: Name of the vault containing secrets
+ vault_mlflow_secret: MLflow secret key in the vault
+ vault_aws_secret: Optional AWS secret key in the vault
+ keep_replot_dir: Whether to keep the download directory after processing
+ postprocess_config: Configuration for post-processing
+
+ Returns:
+ Dict containing replot operation status and results
+ """
+ replot_download_dir = artifact_directory / "replot"
+
+ logger.info(f"Replotting artifacts from URL: {replot_url}")
+ logger.info(f"Download directory: {replot_download_dir}")
+ logger.info(f"Output directory: {artifact_directory}")
+
+ # Get MLflow secrets from vault
+ mlflow_secrets_path = vault_lib.get_vault_content_path(vault_name, vault_mlflow_secret)
+ if mlflow_secrets_path is None or not mlflow_secrets_path.exists():
+ raise FileNotFoundError(
+ f"MLflow secrets not found in vault {vault_name}/{vault_mlflow_secret}"
+ )
+
+ # Get AWS credentials if provided
+ aws_credentials_path = None
+ if vault_aws_secret:
+ aws_credentials_path = vault_lib.get_vault_content_path(vault_name, vault_aws_secret)
+
+ if aws_credentials_path is None:
+ raise ValueError(f"Vault {vault_name}/{vault_aws_secret} missing :/")
+ elif not aws_credentials_path.exists():
+ raise FileNotFoundError(f"Vault {vault_name}/{vault_aws_secret} file missing :/")
+
+ logger.info(f"Using AWS credentials from vault {vault_name}/{vault_aws_secret}")
+
+ logger.info(f"Using MLflow secrets from vault {vault_name}/{vault_mlflow_secret}")
+
+ status = {
+ "replot": {
+ "url": replot_url,
+ "download_directory": str(replot_download_dir),
+ "output_directory": str(artifact_directory),
+ "keep_download_dir": keep_replot_dir,
+ }
+ }
+
+ try:
+ # Step 1: Download artifacts
+ logger.info("Downloading artifacts...")
+
+ # Check if download directory already exists with content
+ if replot_download_dir.exists() and any(replot_download_dir.iterdir()):
+ logger.info(
+ f"Replot download directory already exists with content, skipping download: {replot_download_dir}"
+ )
+
+ # Count existing files for status
+ existing_files = list(replot_download_dir.rglob("*"))
+ existing_files = [f for f in existing_files if f.is_file()]
+
+ logger.info(f"Found {len(existing_files)} existing files")
+ if existing_files:
+ logger.info("Existing files:")
+ for file in existing_files[:10]: # Show first 10
+ try:
+ relative_path = file.relative_to(replot_download_dir)
+ logger.info(f" {relative_path}")
+ except ValueError:
+ logger.info(f" {file}")
+ if len(existing_files) > 10:
+ logger.info(f" ... and {len(existing_files) - 10} more")
+
+ status["replot"].update(
+ {
+ "download_status": "skipped",
+ "downloaded_files": len(existing_files),
+ "skip_reason": "directory_already_exists",
+ }
+ )
+ else:
+ # Create the download directory
+ replot_download_dir.mkdir(parents=True, exist_ok=True)
+
+ # Download artifacts based on URL type
+ if "mlflow" in replot_url.lower() and "runs" in replot_url:
+ download_result = _download_mlflow_artifacts(
+ replot_url, replot_download_dir, mlflow_secrets_path, aws_credentials_path
+ )
+ status["replot"].update(download_result)
+ else:
+ raise ValueError(
+ f"Unsupported replot URL type: {replot_url}. Only MLflow URLs are currently supported."
+ )
+
+ logger.info("Artifacts downloaded successfully")
+
+ # Step 2: Run post-processing on downloaded artifacts
+ logger.info("Running post-processing...")
+
+ postprocess_result = run_postprocess_from_orchestration_config(
+ postprocess_config_raw=postprocess_config or {},
+ artifacts_dir=replot_download_dir,
+ visualize_output_dir=artifact_directory,
+ test_outcome=TestPhaseOutcome("SUCCESS"),
+ )
+
+ # Log post-processing results
+ if postprocess_result.get("steps", {}).get("visualize", {}).get("status") == "skipped":
+ logger.info("Post-processing completed (visualizations skipped)")
+ elif postprocess_result.get("steps", {}).get("visualize", {}).get("paths"):
+ viz_paths = postprocess_result["steps"]["visualize"]["paths"]
+ logger.info(f"Post-processing completed with {len(viz_paths)} visualizations generated")
+ else:
+ logger.info("Post-processing completed (parsing only)")
+
+ status["replot"]["postprocess_status"] = "success"
+ status["replot"]["postprocess_result"] = postprocess_result
+
+ # Step 3: Clean up download directory unless keeping
+ if not keep_replot_dir:
+ logger.info(f"Cleaning up download directory: {replot_download_dir}")
+ shutil.rmtree(replot_download_dir)
+ status["replot"]["cleanup_status"] = "completed"
+ else:
+ logger.info(f"Keeping download directory as requested: {replot_download_dir}")
+ status["replot"]["cleanup_status"] = "skipped"
+
+ status["replot"]["status"] = "success"
+ status["replot"]["message"] = "Replot completed successfully"
+
+ except Exception as e:
+ logger.error(f"Replot failed: {e}")
+ status["replot"]["status"] = "failed"
+ status["replot"]["message"] = str(e)
+
+ # Try to clean up on failure unless keeping
+ if not keep_replot_dir and replot_download_dir.exists():
+ try:
+ shutil.rmtree(replot_download_dir)
+ status["replot"]["cleanup_status"] = "completed_on_failure"
+ except Exception as cleanup_e:
+ logger.warning(f"Failed to cleanup download directory: {cleanup_e}")
+ status["replot"]["cleanup_status"] = "failed"
+
+ raise
+
+ return status
diff --git a/projects/caliper/postprocess/__init__.py b/projects/caliper/postprocess/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/projects/caliper/postprocess/guidellm/__init__.py b/projects/caliper/postprocess/guidellm/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/projects/caliper/postprocess/guidellm/parsing/__init__.py b/projects/caliper/postprocess/guidellm/parsing/__init__.py
new file mode 100644
index 00000000..ef79517f
--- /dev/null
+++ b/projects/caliper/postprocess/guidellm/parsing/__init__.py
@@ -0,0 +1,6 @@
+"""GuideLLM parsing components."""
+
+from .kpis import GuideLLMKpiHandler
+from .parsers import GuideLLMParser
+
+__all__ = ["GuideLLMParser", "GuideLLMKpiHandler"]
diff --git a/projects/caliper/postprocess/guidellm/parsing/kpis.py b/projects/caliper/postprocess/guidellm/parsing/kpis.py
new file mode 100644
index 00000000..1eb85e05
--- /dev/null
+++ b/projects/caliper/postprocess/guidellm/parsing/kpis.py
@@ -0,0 +1,172 @@
+"""KPI definitions and computation for GuideLLM Caliper plugin."""
+
+from __future__ import annotations
+
+from datetime import UTC, datetime
+from typing import Any
+
+from projects.caliper.engine.model import UnifiedRunModel
+
+
+class GuideLLMKpiHandler:
+ """Handles KPI catalog and computation for GuideLLM benchmarks."""
+
+ @staticmethod
+ def get_catalog() -> list[dict[str, Any]]:
+ """
+ Return the KPI catalog for GuideLLM metrics.
+
+ Returns:
+ List of KPI definitions
+ """
+ return [
+ # Throughput KPIs
+ {
+ "kpi_id": "guidellm_request_rate",
+ "name": "Request Rate",
+ "unit": "req/s",
+ "higher_is_better": True,
+ },
+ {
+ "kpi_id": "guidellm_tokens_per_second",
+ "name": "Total Token Throughput",
+ "unit": "tokens/s",
+ "higher_is_better": True,
+ },
+ {
+ "kpi_id": "guidellm_input_tokens_per_second",
+ "name": "Input Token Throughput",
+ "unit": "tokens/s",
+ "higher_is_better": True,
+ },
+ {
+ "kpi_id": "guidellm_output_tokens_per_second",
+ "name": "Output Token Throughput",
+ "unit": "tokens/s",
+ "higher_is_better": True,
+ },
+ # Latency KPIs
+ {
+ "kpi_id": "guidellm_ttft_median",
+ "name": "Time to First Token (Median)",
+ "unit": "s",
+ "higher_is_better": False,
+ },
+ {
+ "kpi_id": "guidellm_ttft_p95",
+ "name": "Time to First Token (P95)",
+ "unit": "s",
+ "higher_is_better": False,
+ },
+ {
+ "kpi_id": "guidellm_itl_median",
+ "name": "Inter Token Latency (Median)",
+ "unit": "s",
+ "higher_is_better": False,
+ },
+ {
+ "kpi_id": "guidellm_tpot_median",
+ "name": "Time Per Output Token (Median)",
+ "unit": "s",
+ "higher_is_better": False,
+ },
+ {
+ "kpi_id": "guidellm_request_latency_median",
+ "name": "End-to-End Request Latency (Median)",
+ "unit": "s",
+ "higher_is_better": False,
+ },
+ {
+ "kpi_id": "guidellm_request_latency_p95",
+ "name": "End-to-End Request Latency (P95)",
+ "unit": "s",
+ "higher_is_better": False,
+ },
+ # Token efficiency KPIs
+ {
+ "kpi_id": "guidellm_input_tokens_per_request",
+ "name": "Input Tokens per Request",
+ "unit": "tokens",
+ "higher_is_better": False, # Generally want efficiency
+ },
+ {
+ "kpi_id": "guidellm_output_tokens_per_request",
+ "name": "Output Tokens per Request",
+ "unit": "tokens",
+ "higher_is_better": False, # Generally want conciseness
+ },
+ ]
+
+ @staticmethod
+ def compute_kpis(model: UnifiedRunModel) -> list[dict[str, Any]]:
+ """
+ Compute KPI values from the unified model.
+
+ Args:
+ model: Unified model containing parsed test results
+
+ Returns:
+ List of KPI records
+ """
+ ts = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
+ out: list[dict[str, Any]] = []
+
+ for r in model.unified_result_records:
+ # Skip records without GuideLLM data
+ if not r.run_identity.get("guidellm"):
+ continue
+
+ # Skip if no benchmarks found
+ if r.metrics.get("no_benchmarks_found"):
+ continue
+
+ base_labels = {**r.distinguishing_labels}
+
+ # Define KPI mappings
+ kpi_mappings = [
+ ("guidellm_request_rate", "request_rate", "req/s", True),
+ ("guidellm_tokens_per_second", "tokens_per_second", "tokens/s", True),
+ ("guidellm_input_tokens_per_second", "input_tokens_per_second", "tokens/s", True),
+ ("guidellm_output_tokens_per_second", "output_tokens_per_second", "tokens/s", True),
+ ("guidellm_ttft_median", "ttft_median", "s", False),
+ ("guidellm_ttft_p95", "ttft_p95", "s", False),
+ ("guidellm_itl_median", "itl_median", "s", False),
+ ("guidellm_tpot_median", "tpot_median", "s", False),
+ ("guidellm_request_latency_median", "request_latency_median", "s", False),
+ ("guidellm_request_latency_p95", "request_latency_p95", "s", False),
+ ("guidellm_input_tokens_per_request", "input_tokens_per_request", "tokens", False),
+ (
+ "guidellm_output_tokens_per_request",
+ "output_tokens_per_request",
+ "tokens",
+ False,
+ ),
+ ]
+
+ # Extract and create KPI records
+ for kpi_id, metric_key, unit, higher_is_better in kpi_mappings:
+ raw_value = r.metrics.get(metric_key, 0)
+
+ # Convert to float
+ try:
+ value = float(raw_value)
+ except (TypeError, ValueError):
+ value = 0.0
+
+ out.append(
+ {
+ "schema_version": "1",
+ "kpi_id": kpi_id,
+ "value": value,
+ "unit": unit,
+ "run_id": r.test_base_path,
+ "timestamp": ts,
+ "labels": {**base_labels, "higher_is_better": higher_is_better},
+ "source": {
+ "test_base_path": r.test_base_path,
+ "plugin_module": model.plugin_module,
+ },
+ }
+ )
+
+ return out
diff --git a/projects/caliper/postprocess/guidellm/parsing/models.py b/projects/caliper/postprocess/guidellm/parsing/models.py
new file mode 100644
index 00000000..d2c100e1
--- /dev/null
+++ b/projects/caliper/postprocess/guidellm/parsing/models.py
@@ -0,0 +1,103 @@
+"""Data models for GuideLLM benchmark results."""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Any
+
+
+@dataclass
+class GuideLLMBenchmark:
+ """Single GuideLLM benchmark result with extracted metrics."""
+
+ strategy: str
+ duration: float
+ warmup_time: float = 0.0
+ cooldown_time: float = 0.0
+
+ # Request metrics
+ request_rate: float = 0.0
+ request_concurrency: float = 0.0
+ completed_requests: int = 0
+ failed_requests: int = 0
+
+ # Token metrics (per request)
+ input_tokens_per_request: float = 0.0
+ output_tokens_per_request: float = 0.0
+ total_tokens_per_request: float = 0.0
+
+ # Latency metrics (in seconds)
+ request_latency_median: float = 0.0
+ request_latency_p95: float = 0.0
+ ttft_median: float = 0.0 # Time to First Token
+ ttft_p95: float = 0.0
+ itl_median: float = 0.0 # Inter Token Latency
+ itl_p95: float = 0.0
+ tpot_median: float = 0.0 # Time Per Output Token
+ tpot_p95: float = 0.0
+
+ # Additional TTFT percentiles
+ ttft_p10: float = 0.0
+ ttft_p25: float = 0.0
+ ttft_p50: float = 0.0
+ ttft_p75: float = 0.0
+ ttft_p90: float = 0.0
+
+ # Additional ITL percentiles
+ itl_p10: float = 0.0
+ itl_p25: float = 0.0
+ itl_p50: float = 0.0
+ itl_p75: float = 0.0
+ itl_p90: float = 0.0
+
+ # Throughput metrics
+ tokens_per_second: float = 0.0
+ input_tokens_per_second: float = 0.0
+ output_tokens_per_second: float = 0.0
+
+ # Output token percentiles
+ output_tokens_per_second_p10: float = 0.0
+ output_tokens_per_second_p25: float = 0.0
+ output_tokens_per_second_p50: float = 0.0
+ output_tokens_per_second_p75: float = 0.0
+ output_tokens_per_second_p90: float = 0.0
+
+ def to_dict(self) -> dict[str, Any]:
+ """Convert to dictionary for use in UnifiedResultRecord metrics."""
+ return {
+ "strategy": self.strategy,
+ "duration": self.duration,
+ "request_rate": self.request_rate,
+ "request_concurrency": self.request_concurrency,
+ "completed_requests": self.completed_requests,
+ "failed_requests": self.failed_requests,
+ "input_tokens_per_request": self.input_tokens_per_request,
+ "output_tokens_per_request": self.output_tokens_per_request,
+ "total_tokens_per_request": self.total_tokens_per_request,
+ "request_latency_median": self.request_latency_median,
+ "request_latency_p95": self.request_latency_p95,
+ "ttft_median": self.ttft_median,
+ "ttft_p95": self.ttft_p95,
+ "itl_median": self.itl_median,
+ "itl_p95": self.itl_p95,
+ "tpot_median": self.tpot_median,
+ "tpot_p95": self.tpot_p95,
+ "tokens_per_second": self.tokens_per_second,
+ "input_tokens_per_second": self.input_tokens_per_second,
+ "output_tokens_per_second": self.output_tokens_per_second,
+ }
+
+
+@dataclass
+class GuideLLMConfiguration:
+ """GuideLLM configuration extracted from benchmark files."""
+
+ args: dict[str, Any] | None = None
+ metadata: dict[str, Any] | None = None
+
+ def to_dict(self) -> dict[str, Any]:
+ """Convert to dictionary."""
+ return {
+ "args": self.args or {},
+ "metadata": self.metadata or {},
+ }
diff --git a/projects/caliper/postprocess/guidellm/parsing/parsers.py b/projects/caliper/postprocess/guidellm/parsing/parsers.py
new file mode 100644
index 00000000..77244b92
--- /dev/null
+++ b/projects/caliper/postprocess/guidellm/parsing/parsers.py
@@ -0,0 +1,292 @@
+"""GuideLLM benchmark parsers for Caliper plugin."""
+
+from __future__ import annotations
+
+import json
+import logging
+from pathlib import Path
+from typing import Any
+
+from projects.caliper.engine.model import (
+ ParseResult,
+ TestBaseNode,
+ UnifiedResultRecord,
+)
+
+from .models import GuideLLMBenchmark, GuideLLMConfiguration
+
+
+def _labels_from_node(node: TestBaseNode) -> dict[str, Any]:
+ """Extract labels from a test node."""
+ raw = node.labels
+ inner = raw.get("labels")
+ if isinstance(inner, dict):
+ return dict(inner)
+ if isinstance(raw, dict):
+ return dict(raw)
+ return {"facet": "default"}
+
+
+class GuideLLMParser:
+ """Parser for GuideLLM benchmark JSON artifacts."""
+
+ def parse_benchmarks_json(
+ self, file_path: Path
+ ) -> tuple[list[GuideLLMBenchmark], GuideLLMConfiguration | None, list[str]]:
+ """
+ Parse a GuideLLM benchmarks.json file.
+
+ Returns:
+ Tuple of (benchmarks list, configuration, warnings list)
+ """
+ warnings: list[str] = []
+ benchmarks: list[GuideLLMBenchmark] = []
+ configuration: GuideLLMConfiguration | None = None
+
+ try:
+ json_data = json.loads(file_path.read_text(encoding="utf-8"))
+ if not isinstance(json_data, dict):
+ warnings.append(f"{file_path}: benchmarks.json must be a JSON object")
+ return [], None, warnings
+
+ # Parse configuration from top-level fields
+ args = json_data.get("args")
+ metadata = json_data.get("metadata")
+ if args or metadata:
+ configuration = GuideLLMConfiguration(args=args, metadata=metadata)
+
+ # Parse each benchmark in the JSON
+ for benchmark_data in json_data.get("benchmarks", []):
+ try:
+ benchmark = self._parse_single_benchmark(benchmark_data)
+ benchmarks.append(benchmark)
+ except Exception as e:
+ warnings.append(f"Failed to parse benchmark in {file_path}: {e}")
+ logging.warning(f"Failed to parse benchmark data: {e}")
+ continue
+
+ logging.info(f"Parsed {len(benchmarks)} GuideLLM benchmarks from {file_path}")
+ return benchmarks, configuration, warnings
+
+ except json.JSONDecodeError as e:
+ warnings.append(f"Malformed JSON {file_path}: {e}")
+ return [], None, warnings
+ except Exception as e:
+ warnings.append(f"Failed to parse GuideLLM JSON {file_path}: {e}")
+ return [], None, warnings
+
+ def _parse_single_benchmark(self, benchmark_data: dict[str, Any]) -> GuideLLMBenchmark:
+ """Parse a single benchmark entry from the JSON data."""
+ # Extract strategy and concurrency info with fallback logic
+ scheduler = benchmark_data.get("scheduler", {})
+ config = benchmark_data.get("config", {})
+ strategy_info = config.get("strategy", {})
+ strategy = strategy_info.get("type_", "unknown")
+
+ # Extract concurrency (streams) with multiple fallback paths
+ concurrency = self._extract_concurrency(strategy_info, scheduler)
+
+ # Extract timing info
+ state = scheduler.get("state", {})
+ start_time = state.get("start_time", 0)
+ end_time = state.get("end_time", 0)
+ duration = end_time - start_time if end_time > start_time else 60.0
+
+ # Extract metrics
+ metrics = benchmark_data.get("metrics", {})
+
+ # Helper function to safely extract metric values
+ def get_metric_value(
+ metric_name: str, stat_type: str = "median", default: float = 0.0
+ ) -> float:
+ metric_data = metrics.get(metric_name, {}).get("successful", {})
+ if stat_type in ["p95", "p90", "p75", "p50", "p25", "p10"]:
+ percentiles = metric_data.get("percentiles", {})
+ return float(percentiles.get(stat_type, default))
+ else:
+ return float(metric_data.get(stat_type, default))
+
+ # Extract latency metrics (convert ms to seconds for consistency)
+ request_latency_median = get_metric_value("request_latency", "median") / 1000.0
+ request_latency_p95 = get_metric_value("request_latency", "p95") / 1000.0
+
+ # Extract TTFT percentiles
+ ttft_median = get_metric_value("time_to_first_token_ms", "median") / 1000.0
+ ttft_p10 = get_metric_value("time_to_first_token_ms", "p10") / 1000.0
+ ttft_p25 = get_metric_value("time_to_first_token_ms", "p25") / 1000.0
+ ttft_p50 = get_metric_value("time_to_first_token_ms", "p50") / 1000.0
+ ttft_p75 = get_metric_value("time_to_first_token_ms", "p75") / 1000.0
+ ttft_p90 = get_metric_value("time_to_first_token_ms", "p90") / 1000.0
+ ttft_p95 = get_metric_value("time_to_first_token_ms", "p95") / 1000.0
+
+ # Extract ITL percentiles
+ itl_median = get_metric_value("inter_token_latency_ms", "median") / 1000.0
+ itl_p10 = get_metric_value("inter_token_latency_ms", "p10") / 1000.0
+ itl_p25 = get_metric_value("inter_token_latency_ms", "p25") / 1000.0
+ itl_p50 = get_metric_value("inter_token_latency_ms", "p50") / 1000.0
+ itl_p75 = get_metric_value("inter_token_latency_ms", "p75") / 1000.0
+ itl_p90 = get_metric_value("inter_token_latency_ms", "p90") / 1000.0
+ itl_p95 = get_metric_value("inter_token_latency_ms", "p95") / 1000.0
+
+ # Extract TPOT percentiles
+ tpot_median = get_metric_value("time_per_output_token_ms", "median") / 1000.0
+ tpot_p95 = get_metric_value("time_per_output_token_ms", "p95") / 1000.0
+
+ # Extract throughput metrics
+ request_rate = get_metric_value("requests_per_second", "mean")
+ input_tokens_per_second = get_metric_value("input_tokens_per_second", "mean")
+ output_tokens_per_second = get_metric_value("output_tokens_per_second", "mean")
+ total_tokens_per_second = input_tokens_per_second + output_tokens_per_second
+
+ # Extract output token percentiles
+ output_tokens_per_second_p10 = get_metric_value("output_tokens_per_second", "p10")
+ output_tokens_per_second_p25 = get_metric_value("output_tokens_per_second", "p25")
+ output_tokens_per_second_p50 = get_metric_value("output_tokens_per_second", "p50")
+ output_tokens_per_second_p75 = get_metric_value("output_tokens_per_second", "p75")
+ output_tokens_per_second_p90 = get_metric_value("output_tokens_per_second", "p90")
+
+ # Calculate requests completed and tokens per request
+ completed_requests = int(request_rate * duration) if request_rate > 0 else 0
+ input_tokens_per_request = (
+ (input_tokens_per_second / request_rate) if request_rate > 0 else 0.0
+ )
+ output_tokens_per_request = (
+ (output_tokens_per_second / request_rate) if request_rate > 0 else 0.0
+ )
+ total_tokens_per_request = (
+ (total_tokens_per_second / request_rate) if request_rate > 0 else 0.0
+ )
+
+ # Create GuideLLMBenchmark object
+ return GuideLLMBenchmark(
+ strategy=strategy,
+ duration=duration,
+ warmup_time=0.0, # Not available in JSON format
+ cooldown_time=0.0, # Not available in JSON format
+ # Request metrics
+ request_rate=request_rate,
+ request_concurrency=concurrency,
+ completed_requests=completed_requests,
+ failed_requests=0, # Could extract from unsuccessful metrics if needed
+ # Token metrics per request
+ input_tokens_per_request=input_tokens_per_request,
+ output_tokens_per_request=output_tokens_per_request,
+ total_tokens_per_request=total_tokens_per_request,
+ # Latency metrics (already in seconds)
+ request_latency_median=request_latency_median,
+ request_latency_p95=request_latency_p95,
+ ttft_median=ttft_median,
+ ttft_p10=ttft_p10,
+ ttft_p25=ttft_p25,
+ ttft_p50=ttft_p50,
+ ttft_p75=ttft_p75,
+ ttft_p90=ttft_p90,
+ ttft_p95=ttft_p95,
+ itl_median=itl_median,
+ itl_p10=itl_p10,
+ itl_p25=itl_p25,
+ itl_p50=itl_p50,
+ itl_p75=itl_p75,
+ itl_p90=itl_p90,
+ itl_p95=itl_p95,
+ tpot_median=tpot_median,
+ tpot_p95=tpot_p95,
+ # Throughput metrics
+ tokens_per_second=total_tokens_per_second,
+ input_tokens_per_second=input_tokens_per_second,
+ output_tokens_per_second=output_tokens_per_second,
+ # Output token percentiles
+ output_tokens_per_second_p10=output_tokens_per_second_p10,
+ output_tokens_per_second_p25=output_tokens_per_second_p25,
+ output_tokens_per_second_p50=output_tokens_per_second_p50,
+ output_tokens_per_second_p75=output_tokens_per_second_p75,
+ output_tokens_per_second_p90=output_tokens_per_second_p90,
+ )
+
+ def _extract_concurrency(
+ self, strategy_info: dict[str, Any], scheduler: dict[str, Any]
+ ) -> float:
+ """Extract concurrency (streams) from strategy or scheduler info."""
+ # Try multiple paths for concurrency extraction
+ try:
+ # First try: config.strategy.streams
+ concurrency = float(strategy_info.get("streams", 0))
+ if concurrency > 0:
+ return concurrency
+ except (ValueError, TypeError):
+ pass
+
+ try:
+ # Second try: scheduler.strategy.streams
+ sched_strategy = scheduler.get("strategy", {})
+ streams = sched_strategy.get("streams")
+ if streams and streams > 0:
+ return float(streams)
+ except (ValueError, TypeError):
+ pass
+
+ logging.warning(
+ "Could not find concurrency 'streams' for benchmark. Using default value 1.0"
+ )
+ return 1.0
+
+ def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult:
+ """
+ Parse test nodes containing GuideLLM benchmarks.json files.
+
+ Args:
+ base_dir: Base directory for the test run
+ nodes: List of test nodes to parse
+
+ Returns:
+ ParseResult with unified records and warnings
+ """
+ records: list[UnifiedResultRecord] = []
+ warnings: list[str] = []
+
+ for node in nodes:
+ # Look for benchmarks.json files
+ benchmarks_files = [p for p in node.artifact_paths if p.name == "benchmarks.json"]
+
+ if not benchmarks_files:
+ # No benchmarks.json found, create empty record
+ labels = _labels_from_node(node)
+ records.append(
+ UnifiedResultRecord(
+ test_base_path=str(node.directory.relative_to(base_dir.resolve())),
+ distinguishing_labels=labels,
+ metrics={"no_benchmarks_found": True},
+ run_identity={"guidellm": True},
+ parse_notes=["No benchmarks.json file found"],
+ )
+ )
+ continue
+
+ for benchmarks_file in benchmarks_files:
+ benchmarks, config, file_warnings = self.parse_benchmarks_json(benchmarks_file)
+ warnings.extend(file_warnings)
+
+ # Create a unified record for each benchmark
+ for benchmark in benchmarks:
+ labels = _labels_from_node(node)
+ # Add strategy to distinguishing labels for grouping
+ labels["strategy"] = benchmark.strategy
+ labels["concurrency"] = benchmark.request_concurrency
+
+ # Convert benchmark to metrics dictionary
+ metrics = benchmark.to_dict()
+ if config:
+ metrics["configuration"] = config.to_dict()
+
+ records.append(
+ UnifiedResultRecord(
+ test_base_path=str(node.directory.relative_to(base_dir.resolve())),
+ distinguishing_labels=labels,
+ metrics=metrics,
+ run_identity={"guidellm": True},
+ parse_notes=[],
+ )
+ )
+
+ logging.info(f"GuideLLM parser created {len(records)} unified result records")
+ return ParseResult(records=records, warnings=warnings)
diff --git a/projects/caliper/postprocess/guidellm/plotting/__init__.py b/projects/caliper/postprocess/guidellm/plotting/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/projects/caliper/postprocess/guidellm/plugin.py b/projects/caliper/postprocess/guidellm/plugin.py
new file mode 100644
index 00000000..56d3fedb
--- /dev/null
+++ b/projects/caliper/postprocess/guidellm/plugin.py
@@ -0,0 +1,334 @@
+"""GuideLLM Caliper PostProcessingPlugin (`projects/caliper/postprocess/guidellm`)."""
+
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any
+
+from projects.caliper.engine.model import (
+ ParseResult,
+ PostProcessingPlugin,
+ TestBaseNode,
+ UnifiedRunModel,
+)
+
+from .parsing import GuideLLMKpiHandler, GuideLLMParser
+
+
+class GuideLLMPlugin(PostProcessingPlugin):
+ """
+ Parses GuideLLM benchmark artifacts containing ``benchmarks.json`` files.
+
+ Extracts comprehensive LLM inference performance metrics including:
+ * Request throughput and concurrency
+ * Token-level latencies (TTFT, ITL, TPOT)
+ * End-to-end request latency percentiles
+ * Token throughput metrics
+ * Request completion rates
+
+ Visual reports (future implementation):
+ * ``benchmark_summary`` — tabular view of all benchmark strategies and metrics
+ * ``latency_breakdown`` — breakdown of latency components across percentiles
+ * ``throughput_comparison`` — comparison of token and request throughput
+ """
+
+ def __init__(self):
+ self.parser = GuideLLMParser()
+ self.kpi_handler = GuideLLMKpiHandler()
+
+ def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult:
+ """Parse test nodes using the GuideLLM parser."""
+ return self.parser.parse(base_dir, nodes)
+
+ def visualize(
+ self,
+ model: UnifiedRunModel,
+ output_dir: Path,
+ report_ids: list[str] | None,
+ group_id: str | None,
+ visualize_config: dict[str, Any] | None,
+ ) -> list[str]:
+ """Generate visualization reports for GuideLLM benchmarks."""
+ output_dir.mkdir(parents=True, exist_ok=True)
+ paths: list[str] = []
+ wanted = frozenset(report_ids or ())
+
+ # Filter to only GuideLLM records with benchmarks
+ guidellm_records = [
+ r
+ for r in model.unified_result_records
+ if r.run_identity.get("guidellm") and not r.metrics.get("no_benchmarks_found")
+ ]
+
+ if not guidellm_records:
+ return paths
+
+ # Generate throughput vs concurrency chart
+ if "benchmark_summary" in wanted:
+ path = self._generate_throughput_vs_concurrency_chart(guidellm_records, output_dir)
+ if path:
+ paths.append(path)
+
+ # Generate latency breakdown chart
+ if "latency_breakdown" in wanted:
+ path = self._generate_latency_breakdown_chart(guidellm_records, output_dir)
+ if path:
+ paths.append(path)
+
+ return paths
+
+ def _generate_throughput_vs_concurrency_chart(
+ self, records: list, output_dir: Path
+ ) -> str | None:
+ """Generate mean throughput vs concurrency chart."""
+ try:
+ import plotly.graph_objects as go
+
+ # Extract data for plotting
+ data_by_strategy = {}
+ for record in records:
+ strategy = record.distinguishing_labels.get("strategy", "unknown")
+ concurrency = record.metrics.get(
+ "request_concurrency", record.distinguishing_labels.get("concurrency", 1)
+ )
+ tokens_per_second = record.metrics.get("tokens_per_second", 0)
+
+ if strategy not in data_by_strategy:
+ data_by_strategy[strategy] = {"concurrency": [], "tokens_per_second": []}
+
+ data_by_strategy[strategy]["concurrency"].append(concurrency)
+ data_by_strategy[strategy]["tokens_per_second"].append(tokens_per_second)
+
+ # Create figure
+ fig = go.Figure()
+
+ # Add traces for each strategy
+ colors = ["blue", "red", "green", "purple", "orange", "brown", "pink", "gray"]
+ for i, (strategy, data) in enumerate(data_by_strategy.items()):
+ color = colors[i % len(colors)]
+
+ fig.add_trace(
+ go.Scatter(
+ x=data["concurrency"],
+ y=data["tokens_per_second"],
+ mode="markers+lines",
+ name=strategy,
+ marker={"color": color, "size": 8},
+ line={"color": color, "width": 2},
+ )
+ )
+
+ # Update layout
+ fig.update_layout(
+ title="Throughput vs Concurrency",
+ xaxis_title="Concurrency Level",
+ yaxis_title="Tokens per Second",
+ width=800,
+ height=500,
+ )
+
+ # Save the plot
+ output_file = output_dir / "throughput_vs_concurrency.html"
+ fig.write_html(output_file, include_plotlyjs="cdn")
+
+ return str(output_file)
+
+ except ImportError:
+ return None
+ except Exception as e:
+ import logging
+
+ logging.warning(f"Failed to generate throughput chart: {e}")
+ return None
+
+ def _generate_latency_breakdown_chart(self, records: list, output_dir: Path) -> str | None:
+ """Generate latency breakdown visualization."""
+ try:
+ import plotly.graph_objects as go
+ from plotly.subplots import make_subplots
+
+ # Extract latency data
+ data_by_strategy = {}
+ for record in records:
+ strategy = record.distinguishing_labels.get("strategy", "unknown")
+ concurrency = record.metrics.get(
+ "request_concurrency", record.distinguishing_labels.get("concurrency", 1)
+ )
+
+ # Extract latency metrics (convert to ms for better readability)
+ ttft_median = record.metrics.get("ttft_median", 0) * 1000 # Convert to ms
+ itl_median = record.metrics.get("itl_median", 0) * 1000 # Convert to ms
+ request_latency_p95 = (
+ record.metrics.get("request_latency_p95", 0) * 1000
+ ) # Convert to ms
+
+ if strategy not in data_by_strategy:
+ data_by_strategy[strategy] = {
+ "concurrency": [],
+ "ttft_median": [],
+ "itl_median": [],
+ "request_latency_p95": [],
+ }
+
+ data_by_strategy[strategy]["concurrency"].append(concurrency)
+ data_by_strategy[strategy]["ttft_median"].append(ttft_median)
+ data_by_strategy[strategy]["itl_median"].append(itl_median)
+ data_by_strategy[strategy]["request_latency_p95"].append(request_latency_p95)
+
+ # Create subplots
+ fig = make_subplots(
+ rows=2,
+ cols=2,
+ subplot_titles=[
+ "Time to First Token (TTFT)",
+ "Inter-Token Latency (ITL)",
+ "Request Latency P95",
+ "Latency Comparison",
+ ],
+ )
+
+ colors = ["blue", "red", "green", "purple", "orange", "brown", "pink", "gray"]
+
+ for i, (strategy, data) in enumerate(data_by_strategy.items()):
+ color = colors[i % len(colors)]
+
+ # TTFT plot
+ fig.add_trace(
+ go.Scatter(
+ x=data["concurrency"],
+ y=data["ttft_median"],
+ mode="markers+lines",
+ name=f"{strategy} TTFT",
+ marker={"color": color},
+ showlegend=True,
+ ),
+ row=1,
+ col=1,
+ )
+
+ # ITL plot
+ fig.add_trace(
+ go.Scatter(
+ x=data["concurrency"],
+ y=data["itl_median"],
+ mode="markers+lines",
+ name=f"{strategy} ITL",
+ marker={"color": color},
+ showlegend=False,
+ ),
+ row=1,
+ col=2,
+ )
+
+ # Request latency P95 plot
+ fig.add_trace(
+ go.Scatter(
+ x=data["concurrency"],
+ y=data["request_latency_p95"],
+ mode="markers+lines",
+ name=f"{strategy} Req P95",
+ marker={"color": color},
+ showlegend=False,
+ ),
+ row=2,
+ col=1,
+ )
+
+ # Combined latency comparison
+ fig.add_trace(
+ go.Scatter(
+ x=data["concurrency"],
+ y=data["ttft_median"],
+ mode="lines",
+ name=f"{strategy} TTFT",
+ line={"color": color, "dash": "solid"},
+ showlegend=False,
+ ),
+ row=2,
+ col=2,
+ )
+ fig.add_trace(
+ go.Scatter(
+ x=data["concurrency"],
+ y=data["itl_median"],
+ mode="lines",
+ name=f"{strategy} ITL",
+ line={"color": color, "dash": "dash"},
+ showlegend=False,
+ ),
+ row=2,
+ col=2,
+ )
+
+ # Update layout
+ fig.update_layout(title="GuideLLM Latency Analysis", height=800, showlegend=True)
+
+ # Update axes labels
+ fig.update_xaxes(title_text="Concurrency", row=2, col=1)
+ fig.update_xaxes(title_text="Concurrency", row=2, col=2)
+ fig.update_yaxes(title_text="Latency (ms)", row=1, col=1)
+ fig.update_yaxes(title_text="Latency (ms)", row=1, col=2)
+ fig.update_yaxes(title_text="Latency (ms)", row=2, col=1)
+ fig.update_yaxes(title_text="Latency (ms)", row=2, col=2)
+
+ # Save the plot
+ output_file = output_dir / "latency_breakdown.html"
+ fig.write_html(output_file, include_plotlyjs="cdn")
+
+ return str(output_file)
+
+ except ImportError:
+ return None
+ except Exception as e:
+ import logging
+
+ logging.warning(f"Failed to generate latency breakdown chart: {e}")
+ return None
+
+ def kpi_catalog(self) -> list[dict[str, Any]]:
+ """Return the GuideLLM KPI catalog."""
+ return self.kpi_handler.get_catalog()
+
+ def compute_kpis(self, model: UnifiedRunModel) -> list[dict[str, Any]]:
+ """Compute KPI values from the unified model."""
+ return self.kpi_handler.compute_kpis(model)
+
+ def build_ai_eval_payload(self, model: UnifiedRunModel) -> dict[str, Any]:
+ """Build AI evaluation payload from the unified model."""
+ # Extract GuideLLM-specific metrics for AI evaluation
+ benchmarks = []
+ for r in model.unified_result_records:
+ if r.run_identity.get("guidellm") and not r.metrics.get("no_benchmarks_found"):
+ strategy_info = {
+ "strategy": r.distinguishing_labels.get("strategy", "unknown"),
+ "concurrency": r.distinguishing_labels.get("concurrency", 1.0),
+ "request_rate": r.metrics.get("request_rate", 0.0),
+ "tokens_per_second": r.metrics.get("tokens_per_second", 0.0),
+ "ttft_median": r.metrics.get("ttft_median", 0.0),
+ "itl_median": r.metrics.get("itl_median", 0.0),
+ "request_latency_p95": r.metrics.get("request_latency_p95", 0.0),
+ }
+ benchmarks.append(strategy_info)
+
+ return {
+ "schema_version": "1",
+ "run_id": model.base_directory,
+ "metrics": {
+ "record_count": len(model.unified_result_records),
+ "benchmark_count": len(benchmarks),
+ "strategies": [b["strategy"] for b in benchmarks],
+ "max_request_rate": max([b["request_rate"] for b in benchmarks], default=0.0),
+ "max_tokens_per_second": max(
+ [b["tokens_per_second"] for b in benchmarks], default=0.0
+ ),
+ "min_ttft_median": min(
+ [b["ttft_median"] for b in benchmarks if b["ttft_median"] > 0], default=0.0
+ ),
+ },
+ "benchmarks": benchmarks,
+ }
+
+
+def get_plugin() -> PostProcessingPlugin:
+ """Return the GuideLLM plugin instance."""
+ return GuideLLMPlugin()
diff --git a/projects/core/library/ci.py b/projects/core/library/ci.py
index 6425a806..fcf0707b 100644
--- a/projects/core/library/ci.py
+++ b/projects/core/library/ci.py
@@ -11,6 +11,8 @@
import sys
import traceback
+import click
+
from projects.core.dsl import toolbox as dsl_toolbox
from projects.core.dsl.runtime import TaskExecutionError
from projects.core.library import env
@@ -18,6 +20,22 @@
logger = logging.getLogger(__name__)
+class HelpfulGroup(click.Group):
+ """
+ A Click group that automatically shows help when a command is not found.
+ """
+
+ def get_command(self, ctx, cmd_name):
+ rv = click.Group.get_command(self, ctx, cmd_name)
+ if rv is not None:
+ return rv
+
+ # Command not found - show help
+ print(f"Error: No such command '{cmd_name}'.\n", file=sys.stderr)
+ print(ctx.get_help(), file=sys.stderr)
+ ctx.exit(2)
+
+
def handle_ci_exception(e: Exception) -> None:
"""
Handle CI exceptions with comprehensive logging and failure file creation.
diff --git a/projects/core/library/replot.py b/projects/core/library/replot.py
new file mode 100644
index 00000000..ecd1e869
--- /dev/null
+++ b/projects/core/library/replot.py
@@ -0,0 +1,103 @@
+"""
+Shared "replot" CLI for FORGE project orchestration.
+
+Registers a click subcommand for replotting artifacts and visualizations.
+"""
+
+from __future__ import annotations
+
+import logging
+from pathlib import Path
+
+import click
+
+from projects.caliper.orchestration.replot import run_replot_from_orchestration_config
+from projects.core.library import ci as ci_lib
+from projects.core.library import config, env
+
+logger = logging.getLogger(__name__)
+
+
+def run_replot(*, artifact_directory: Path | None):
+ """Run replotting logic on the specified artifact directory."""
+
+ if artifact_directory is None:
+ artifact_directory = env.ARTIFACT_DIR
+
+ # Get the replot URL from configuration
+ replot_url = config.project.get_config("caliper.replot.url", None, print=False, warn=False)
+ if not replot_url:
+ raise ValueError(
+ "caliper.replot.url is not configured. Use /replot.url directive or set in config."
+ )
+
+ keep_replot_dir = config.project.get_config(
+ "caliper.replot.keep", False, print=False, warn=False
+ )
+
+ # Load MLflow configuration from export settings to get vault secrets
+ export_config = config.project.get_config("caliper.export", {}, print=False, warn=False)
+ mlflow_backend = export_config.get("backend", {}).get("mlflow", {})
+
+ if not mlflow_backend.get("enabled", False):
+ raise ValueError(
+ "MLflow export is not enabled in configuration. Cannot determine authentication settings."
+ )
+
+ # Get vault secrets configuration (same as export)
+ secrets_config = mlflow_backend.get("secrets", {}).get("vault", {})
+ vault_name = secrets_config.get("name")
+ vault_mlflow_secret = secrets_config.get("mlflow_secret")
+
+ if not vault_name or not vault_mlflow_secret:
+ raise ValueError(
+ "MLflow vault configuration missing. Check caliper.export.backend.mlflow.secrets.vault settings."
+ )
+
+ # Get AWS credentials configuration (optional)
+ vault_aws_secret = secrets_config.get("aws_secret")
+
+ # Get post-processing configuration
+ postprocess_config = config.project.get_config(
+ "caliper.postprocess", {}, print=False, warn=False
+ )
+
+ # Run the actual replot operation
+ return run_replot_from_orchestration_config(
+ replot_url=replot_url,
+ artifact_directory=artifact_directory,
+ vault_name=vault_name,
+ vault_mlflow_secret=vault_mlflow_secret,
+ vault_aws_secret=vault_aws_secret,
+ keep_replot_dir=keep_replot_dir,
+ postprocess_config=postprocess_config,
+ )
+
+
+@click.command("replot")
+@click.option(
+ "--artifact-directory",
+ "artifact_directory",
+ type=click.Path(path_type=Path, exists=False, file_okay=True, dir_okay=True),
+ default=None,
+ help="Artifact root directory to replot from (defaults to ARTIFACT_DIR).",
+)
+@click.pass_context
+@ci_lib.safe_ci_command
+def replot_command(_ctx, artifact_directory: Path | None):
+ """Replot artifacts and visualizations from a remote URL."""
+
+ status = run_replot(artifact_directory=artifact_directory)
+
+ # Log the status
+ import yaml
+
+ logger.info("Replot status:")
+ logger.info(yaml.dump(status, indent=2))
+
+ # Check if replot was successful
+ replot_status = status.get("replot", {}).get("status", "unknown")
+ if replot_status != "success":
+ return 1
+
+ return 0
diff --git a/projects/foreign_testing/orchestration/ci.py b/projects/foreign_testing/orchestration/ci.py
index a8aa1353..a399f630 100755
--- a/projects/foreign_testing/orchestration/ci.py
+++ b/projects/foreign_testing/orchestration/ci.py
@@ -15,7 +15,7 @@
from projects.core.library import ci as ci_lib
-@click.group()
+@click.group(cls=ci_lib.HelpfulGroup)
@click.pass_context
@ci_lib.safe_ci_function
def main(ctx):
diff --git a/projects/fournos_launcher/orchestration/ci.py b/projects/fournos_launcher/orchestration/ci.py
index 86e833ee..a765bc77 100755
--- a/projects/fournos_launcher/orchestration/ci.py
+++ b/projects/fournos_launcher/orchestration/ci.py
@@ -51,7 +51,7 @@ def _set_job_owner_from_pull_request():
logger.info(f"Set job owner from pull request: {user_login}")
-@click.group()
+@click.group(cls=ci_lib.HelpfulGroup)
@click.pass_context
@ci_lib.safe_ci_function
def main(ctx):
diff --git a/projects/fournos_launcher/orchestration/pr_args.py b/projects/fournos_launcher/orchestration/pr_args.py
index a4485c55..b0003ace 100644
--- a/projects/fournos_launcher/orchestration/pr_args.py
+++ b/projects/fournos_launcher/orchestration/pr_args.py
@@ -54,6 +54,10 @@ def get_supported_fournos_directives() -> dict[str, str]:
Example: /parallel 1 gpu-basic cpu-intensive
/parallel 2 memory-heavy network-test
Effect: Sets fournos_launcher.parallel_jobs[idx] = [preset1, preset2, ...]""",
+ "/replot.url": """Set URL for downloading artifacts to replot.
+ Format: /replot.url URL
+ Example: /replot.url s3://bucket/path/to/artifacts
+ Effect: Sets caliper.replot.url in configuration.""",
"/help": """Show all supported FOURNOS directives.
Format: /help
Effect: Logs available directive information.""",
@@ -185,6 +189,29 @@ def handle_gpu_directive(line: str) -> dict[str, str]:
return {"fournos.job.hardware.gpu_type": gpu_type, "fournos.job.hardware.gpu_count": gpu_count}
+def handle_replot_url_directive(line: str) -> dict[str, str]:
+ """
+ Handle /replot.url directive for setting replot URL.
+
+ Format: /replot.url URL
+
+ Args:
+ line: The directive line
+
+ Returns:
+ Dictionary with replot URL configuration
+
+ Raises:
+ ValueError: If URL is empty
+ """
+ replot_url = line.removeprefix("/replot.url").strip()
+
+ if not replot_url:
+ raise ValueError(f"Invalid /replot.url directive: URL cannot be empty in '{line}'")
+
+ return {"caliper.replot.url": replot_url}
+
+
def handle_help_directive(line: str) -> dict[str, str]:
"""Handle /help directive for FOURNOS directives."""
# Create help directive handler using the factory
@@ -267,6 +294,7 @@ def get_fournos_directive_handlers() -> dict[str, callable]:
"/pipeline": handle_pipeline_directive,
"/gpu": handle_gpu_directive,
"/parallel": handle_parallel_directive,
+ "/replot.url": handle_replot_url_directive,
"/help": handle_help_directive,
}
diff --git a/projects/fournos_launcher/orchestration/submit.py b/projects/fournos_launcher/orchestration/submit.py
index 2611f343..4a1bc671 100644
--- a/projects/fournos_launcher/orchestration/submit.py
+++ b/projects/fournos_launcher/orchestration/submit.py
@@ -7,6 +7,7 @@
import traceback
from datetime import datetime
+from projects.core.dsl.utils.k8s import sanitize_k8s_name
from projects.core.library import config, env, run, vault
from projects.core.library.run_parallel import Parallel
from projects.fournos_launcher.orchestration import job_management, pr_args
@@ -159,6 +160,7 @@ def submit_job():
# Generate timestamp for parallel job names (shared across all parallel jobs)
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
raw_name = f"forge-{project_name}-{timestamp}"
+ raw_name = sanitize_k8s_name(raw_name)
# Track failures and job names across parallel jobs
failure_lock = threading.Lock()
@@ -179,7 +181,7 @@ def submit_parallel_job(job_index, job_args_list):
parallel_display_name = f"{project_name} {args_str} (job {job_index})".strip()
# Create unique job name with timestamp and index
- unique_job_name = f"{raw_name}-{job_index}"
+ unique_job_name = sanitize_k8s_name(f"{raw_name}-{job_index}")
try:
# Track the job name for cleanup (job gets submitted even if waiting fails)
@@ -240,7 +242,7 @@ def submit_parallel_job(job_index, job_args_list):
# Generate unique job name for single job
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
- single_job_name = f"forge-{project_name}-{timestamp}"
+ single_job_name = sanitize_k8s_name(f"forge-{project_name}-{timestamp}")
try:
submit_and_wait(
diff --git a/projects/fournos_launcher/toolbox/submit_and_wait/main.py b/projects/fournos_launcher/toolbox/submit_and_wait/main.py
index 6693ef42..818fa126 100755
--- a/projects/fournos_launcher/toolbox/submit_and_wait/main.py
+++ b/projects/fournos_launcher/toolbox/submit_and_wait/main.py
@@ -18,7 +18,7 @@
task,
template,
)
-from projects.core.dsl.utils.k8s import sanitize_k8s_name
+from projects.core.dsl.utils.k8s import is_valid_k8s_name, sanitize_k8s_name
from projects.core.library import env as env_mod
logger = logging.getLogger(__name__)
@@ -133,14 +133,19 @@ def generate_job_name(args, ctx):
"""Generate job name if not provided and ensure K8s compatibility"""
if args.job_name:
- # Sanitize user-provided job name
- raw_name = args.job_name
+ # Validate that user-provided job name is already normalized
+ if not is_valid_k8s_name(args.job_name):
+ normalized_name = sanitize_k8s_name(args.job_name)
+ raise ValueError(
+ f"Job name '{args.job_name}' is not valid for Kubernetes. "
+ f"Please use the normalized name: '{normalized_name}'"
+ )
+ ctx.final_job_name = args.job_name
else:
# Generate and sanitize auto job name
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
raw_name = f"forge-{args.project}-{timestamp}"
-
- ctx.final_job_name = sanitize_k8s_name(raw_name)
+ ctx.final_job_name = sanitize_k8s_name(raw_name)
return f"Job name: {ctx.final_job_name}"
diff --git a/projects/jump_ci/orchestration/ci.py b/projects/jump_ci/orchestration/ci.py
index a158a764..0a6b8f88 100755
--- a/projects/jump_ci/orchestration/ci.py
+++ b/projects/jump_ci/orchestration/ci.py
@@ -10,6 +10,8 @@
import click
+from projects.core.library import ci as ci_lib
+
# Add the testing directory to path for imports
testing_dir = Path(__file__).parent.parent / "testing"
if str(testing_dir) not in sys.path:
@@ -39,7 +41,7 @@ def run_and_catch(phase, fct, *args, **kwargs):
sys.exit(1)
-@click.group()
+@click.group(cls=ci_lib.HelpfulGroup)
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
@click.pass_context
def main(ctx, verbose):
diff --git a/projects/llm_d/orchestration/ci.py b/projects/llm_d/orchestration/ci.py
index 7623510f..0dd4c6bd 100755
--- a/projects/llm_d/orchestration/ci.py
+++ b/projects/llm_d/orchestration/ci.py
@@ -13,7 +13,7 @@
from projects.core.library import ci as ci_lib
-@click.group()
+@click.group(cls=ci_lib.HelpfulGroup)
@click.pass_context
@ci_lib.safe_ci_function
def main(ctx):
diff --git a/projects/llm_d_legacy/orchestration/ci.py b/projects/llm_d_legacy/orchestration/ci.py
index cbd73f8b..a9f3501f 100755
--- a/projects/llm_d_legacy/orchestration/ci.py
+++ b/projects/llm_d_legacy/orchestration/ci.py
@@ -16,6 +16,7 @@
from projects.core.library import config as forge_config
from projects.core.library import ci as ci_lib
from projects.core.library.export import caliper_export_command
+from projects.core.library.replot import replot_command
from projects.legacy.library import config
from projects.legacy.library import env as legacy_env
from projects.caliper.orchestration.export import run_from_orchestration_config
@@ -44,12 +45,13 @@ def log(message: str, level: str = "info"):
click.echo(f"{icon} [{project_name}] {message}")
-@click.group()
+@click.group(cls=ci_lib.HelpfulGroup)
@click.pass_context
@ci_lib.safe_ci_function
def main(ctx):
"""Jump CI Project CI Operations for TOPSAIL-NG."""
ctx.ensure_object(dict)
+ init()
inited = False
def init():
@@ -80,13 +82,13 @@ def test(ctx):
"""Test phase - Trigger the project's test method."""
log("Starting test phase...")
- init()
failed = test_llmd.test()
sys.exit(1 if failed else 0)
main.add_command(caliper_export_command)
+main.add_command(replot_command)
def resolve_hardware_request(hardware_spec: dict):
"""
@@ -108,9 +110,6 @@ def resolve_hardware_request(hardware_spec: dict):
# - Handle different hardware profiles (GPU, CPU, memory requirements)
# - Example: return {"gpu": {"type": "nvidia-tesla-v100", "count": 1}, "memory": "32Gi"}
- hardware_spec["gpuType"] = "h200"
- hardware_spec["gpuCount"] = 4
-
return hardware_spec
def list_vaults():
diff --git a/projects/llm_d_legacy/testing/config.yaml b/projects/llm_d_legacy/testing/config.yaml
index 1bb657ad..2a408cb7 100644
--- a/projects/llm_d_legacy/testing/config.yaml
+++ b/projects/llm_d_legacy/testing/config.yaml
@@ -343,3 +343,34 @@ caliper:
tags: {}
parameters: {}
metrics: {}
+
+ replot:
+ url: null # Set via /replot.url directive
+ keep: false # Set to true to keep download directory after replotting
+
+ postprocess:
+ enabled: true
+ artifacts_dir: null
+ plugin_module: projects.caliper.postprocess.guidellm.plugin
+ postprocess_config: null
+ parse:
+ enabled: true
+ no_cache: false
+ visualize:
+ enabled: true
+ output_dir: null
+ reports: [benchmark_summary, latency_breakdown]
+ report_group: null
+
+ kpi:
+ enabled: false
+ generate:
+ enabled: false
+ output: kpis.jsonl
+ export:
+ enabled: false
+
+ analyze:
+ enabled: false
+ baseline: null
+ output: kpi_analyze.json
diff --git a/projects/skeleton/orchestration/ci.py b/projects/skeleton/orchestration/ci.py
index 9c152242..f4a6d04d 100755
--- a/projects/skeleton/orchestration/ci.py
+++ b/projects/skeleton/orchestration/ci.py
@@ -19,7 +19,7 @@
from projects.core.library.export import caliper_export_command
-@click.group()
+@click.group(cls=ci_lib.HelpfulGroup)
@click.pass_context
@ci_lib.safe_ci_function
def main(ctx):
diff --git a/projects/skeleton/orchestration/config.yaml b/projects/skeleton/orchestration/config.yaml
index fc99b7c9..1a8230c4 100644
--- a/projects/skeleton/orchestration/config.yaml
+++ b/projects/skeleton/orchestration/config.yaml
@@ -39,6 +39,10 @@ caliper:
parameters: {}
metrics: {}
+ replot:
+ url: null # Set via /replot.url directive
+ keep: false # Set to true to keep download directory after replotting
+
postprocess:
enabled: true
artifacts_dir: null
diff --git a/projects/skeleton/postprocess/default/parsing/__init__.py b/projects/skeleton/postprocess/default/parsing/__init__.py
new file mode 100644
index 00000000..790f3752
--- /dev/null
+++ b/projects/skeleton/postprocess/default/parsing/__init__.py
@@ -0,0 +1,6 @@
+"""Parsing package for Skeleton Caliper plugin."""
+
+from .kpis import SkeletonKpiHandler
+from .parsers import SkeletonParser
+
+__all__ = ["SkeletonParser", "SkeletonKpiHandler"]
diff --git a/projects/skeleton/postprocess/default/parsing/kpis.py b/projects/skeleton/postprocess/default/parsing/kpis.py
new file mode 100644
index 00000000..9585f3b1
--- /dev/null
+++ b/projects/skeleton/postprocess/default/parsing/kpis.py
@@ -0,0 +1,103 @@
+"""KPI definitions and computation for Skeleton Caliper plugin."""
+
+from __future__ import annotations
+
+from datetime import UTC, datetime
+from typing import Any
+
+from projects.caliper.engine.model import UnifiedRunModel
+
+
+class SkeletonKpiHandler:
+ """Handles KPI catalog and computation for skeleton project."""
+
+ @staticmethod
+ def get_catalog() -> list[dict[str, Any]]:
+ """
+ Return the KPI catalog for skeleton metrics.
+
+ Returns:
+ List of KPI definitions
+ """
+ return [
+ {
+ "kpi_id": "skeleton_throughput_rps",
+ "name": "Skeleton throughput",
+ "unit": "req/s",
+ "higher_is_better": True,
+ },
+ {
+ "kpi_id": "skeleton_latency_ms",
+ "name": "Skeleton latency",
+ "unit": "ms",
+ "higher_is_better": False,
+ },
+ ]
+
+ @staticmethod
+ def compute_kpis(model: UnifiedRunModel) -> list[dict[str, Any]]:
+ """
+ Compute KPI values from the unified model.
+
+ Args:
+ model: Unified model containing parsed test results
+
+ Returns:
+ List of KPI records
+ """
+ ts = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
+ out: list[dict[str, Any]] = []
+
+ for r in model.unified_result_records:
+ tp_raw = r.metrics.get("throughput", 0)
+ lat_raw = r.metrics.get("latency_ms", 0)
+
+ # Convert throughput to float
+ try:
+ tp = float(tp_raw)
+ except (TypeError, ValueError):
+ tp = 0.0
+
+ # Convert latency to float
+ try:
+ lat = float(lat_raw)
+ except (TypeError, ValueError):
+ lat = 0.0
+
+ base_labels = {**r.distinguishing_labels}
+
+ # Add throughput KPI
+ out.append(
+ {
+ "schema_version": "1",
+ "kpi_id": "skeleton_throughput_rps",
+ "value": tp,
+ "unit": "req/s",
+ "run_id": r.test_base_path,
+ "timestamp": ts,
+ "labels": {**base_labels, "higher_is_better": True},
+ "source": {
+ "test_base_path": r.test_base_path,
+ "plugin_module": model.plugin_module,
+ },
+ }
+ )
+
+ # Add latency KPI
+ out.append(
+ {
+ "schema_version": "1",
+ "kpi_id": "skeleton_latency_ms",
+ "value": lat,
+ "unit": "ms",
+ "run_id": r.test_base_path,
+ "timestamp": ts,
+ "labels": {**base_labels, "higher_is_better": False},
+ "source": {
+ "test_base_path": r.test_base_path,
+ "plugin_module": model.plugin_module,
+ },
+ }
+ )
+
+ return out
diff --git a/projects/skeleton/postprocess/default/parsing/parsers.py b/projects/skeleton/postprocess/default/parsing/parsers.py
new file mode 100644
index 00000000..2be3df21
--- /dev/null
+++ b/projects/skeleton/postprocess/default/parsing/parsers.py
@@ -0,0 +1,85 @@
+"""File parsers for Skeleton Caliper plugin."""
+
+from __future__ import annotations
+
+import json
+from pathlib import Path
+from typing import Any
+
+from projects.caliper.engine.model import (
+ ParseResult,
+ TestBaseNode,
+ UnifiedResultRecord,
+)
+
+
+def _labels_from_node(node: TestBaseNode) -> dict[str, Any]:
+ """Extract labels from a test node."""
+ raw = node.labels
+ inner = raw.get("labels")
+ if isinstance(inner, dict):
+ return dict(inner)
+ if isinstance(raw, dict):
+ return dict(raw)
+ return {"facet": "default"}
+
+
+class SkeletonParser:
+ """Parser for skeleton project test artifacts."""
+
+ def parse_metrics_json(self, file_path: Path) -> tuple[dict[str, Any], list[str]]:
+ """
+ Parse a metrics.json file.
+
+ Returns:
+ Tuple of (metrics dict, warnings list)
+ """
+ warnings: list[str] = []
+
+ try:
+ metrics = json.loads(file_path.read_text(encoding="utf-8"))
+ if not isinstance(metrics, dict):
+ warnings.append(f"{file_path}: metrics.json must be a JSON object")
+ return {}, warnings
+ return metrics, warnings
+ except json.JSONDecodeError as e:
+ warnings.append(f"Malformed JSON {file_path}: {e}")
+ return {"_parse_error": True}, warnings
+
+ def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult:
+ """
+ Parse test nodes containing metrics.json files.
+
+ Args:
+ base_dir: Base directory for the test run
+ nodes: List of test nodes to parse
+
+ Returns:
+ ParseResult with unified records and warnings
+ """
+ records: list[UnifiedResultRecord] = []
+ warnings: list[str] = []
+
+ for node in nodes:
+ metrics: dict[str, Any] = {}
+ for p in node.artifact_paths:
+ if p.name != "metrics.json":
+ continue
+
+ node_metrics, node_warnings = self.parse_metrics_json(p)
+ metrics = node_metrics
+ warnings.extend(node_warnings)
+ break
+
+ labels = _labels_from_node(node)
+ records.append(
+ UnifiedResultRecord(
+ test_base_path=str(node.directory.relative_to(base_dir.resolve())),
+ distinguishing_labels=labels,
+ metrics=dict(metrics) if metrics else {"throughput": 0.0},
+ run_identity={"skeleton_sample": True},
+ parse_notes=[],
+ )
+ )
+
+ return ParseResult(records=records, warnings=warnings)
diff --git a/projects/skeleton/postprocess/default/plotting/__init__.py b/projects/skeleton/postprocess/default/plotting/__init__.py
new file mode 100644
index 00000000..4a16f3bd
--- /dev/null
+++ b/projects/skeleton/postprocess/default/plotting/__init__.py
@@ -0,0 +1,6 @@
+"""Plotting package for Skeleton Caliper plugin."""
+
+from .summary_table import SummaryTablePlot
+from .throughput_chart import ThroughputChartPlot
+
+__all__ = ["SummaryTablePlot", "ThroughputChartPlot"]
diff --git a/projects/skeleton/postprocess/default/plotting/summary_table.py b/projects/skeleton/postprocess/default/plotting/summary_table.py
new file mode 100644
index 00000000..8e19ef2a
--- /dev/null
+++ b/projects/skeleton/postprocess/default/plotting/summary_table.py
@@ -0,0 +1,54 @@
+"""Summary table plot for Skeleton Caliper plugin."""
+
+from __future__ import annotations
+
+import html as html_lib
+from pathlib import Path
+
+from projects.caliper.engine.model import UnifiedRunModel
+
+
+class SummaryTablePlot:
+ """Generates an HTML summary table of test results."""
+
+ @staticmethod
+ def generate(model: UnifiedRunModel, output_dir: Path) -> str:
+ """
+ Generate a summary table HTML report.
+
+ Args:
+ model: Unified model containing parsed test results
+ output_dir: Directory to write the output file
+
+ Returns:
+ Path to the generated HTML file
+ """
+ rows = []
+ for r in model.unified_result_records:
+ scenario = r.distinguishing_labels.get("scenario", "")
+ tp = r.metrics.get("throughput", "")
+ lat = r.metrics.get("latency_ms", "")
+ rows.append(
+ "
"
+ f"| {html_lib.escape(str(r.test_base_path))} | "
+ f"{html_lib.escape(str(scenario))} | "
+ f"{html_lib.escape(str(tp))} | "
+ f"{html_lib.escape(str(lat))} | "
+ "
"
+ )
+
+ table_html = (
+ ""
+ "Skeleton sample — summary"
+ ""
+ "Skeleton default plugin — summary
"
+ ""
+ "| test_base_path | scenario | throughput | latency_ms | "
+ "
" + "".join(rows) + "
"
+ )
+
+ output_file = output_dir / "summary_table.html"
+ output_file.write_text(table_html, encoding="utf-8")
+ return str(output_file)
diff --git a/projects/skeleton/postprocess/default/plotting/throughput_chart.py b/projects/skeleton/postprocess/default/plotting/throughput_chart.py
new file mode 100644
index 00000000..26acb524
--- /dev/null
+++ b/projects/skeleton/postprocess/default/plotting/throughput_chart.py
@@ -0,0 +1,51 @@
+"""Throughput chart plot for Skeleton Caliper plugin."""
+
+from __future__ import annotations
+
+from pathlib import Path
+
+import plotly.graph_objects as go
+
+from projects.caliper.engine.model import UnifiedRunModel
+
+
+class ThroughputChartPlot:
+ """Generates a Plotly bar chart of throughput by scenario."""
+
+ @staticmethod
+ def generate(model: UnifiedRunModel, output_dir: Path) -> str:
+ """
+ Generate a throughput chart HTML report.
+
+ Args:
+ model: Unified model containing parsed test results
+ output_dir: Directory to write the output file
+
+ Returns:
+ Path to the generated HTML file
+ """
+ xs: list[str] = []
+ ys: list[float] = []
+
+ for r in model.unified_result_records:
+ label = str(r.distinguishing_labels.get("scenario") or r.test_base_path)
+ raw = r.metrics.get("throughput", 0)
+
+ try:
+ y = float(raw)
+ except (TypeError, ValueError):
+ y = 0.0
+
+ xs.append(label)
+ ys.append(y)
+
+ fig = go.Figure(data=[go.Bar(x=xs, y=ys)])
+ fig.update_layout(
+ title="Throughput by scenario",
+ xaxis_title="Scenario",
+ yaxis_title="Throughput",
+ )
+
+ output_file = output_dir / "throughput_chart.html"
+ fig.write_html(output_file, include_plotlyjs="cdn")
+ return str(output_file)
diff --git a/projects/skeleton/postprocess/default/plugin.py b/projects/skeleton/postprocess/default/plugin.py
index 254c05c2..d83473ab 100644
--- a/projects/skeleton/postprocess/default/plugin.py
+++ b/projects/skeleton/postprocess/default/plugin.py
@@ -2,8 +2,6 @@
from __future__ import annotations
-import json
-from datetime import UTC, datetime
from pathlib import Path
from typing import Any
@@ -11,19 +9,11 @@
ParseResult,
PostProcessingPlugin,
TestBaseNode,
- UnifiedResultRecord,
UnifiedRunModel,
)
-
-def _labels_from_node(node: TestBaseNode) -> dict[str, Any]:
- raw = node.labels
- inner = raw.get("labels")
- if isinstance(inner, dict):
- return dict(inner)
- if isinstance(raw, dict):
- return dict(raw)
- return {"facet": "default"}
+from .parsing import SkeletonKpiHandler, SkeletonParser
+from .plotting import SummaryTablePlot, ThroughputChartPlot
class SkeletonDefaultPlugin(PostProcessingPlugin):
@@ -36,34 +26,17 @@ class SkeletonDefaultPlugin(PostProcessingPlugin):
* ``throughput_chart`` — bar chart of ``throughput`` when present.
"""
+ def __init__(self):
+ self.parser = SkeletonParser()
+ self.kpi_handler = SkeletonKpiHandler()
+ self.plots = {
+ "summary_table": SummaryTablePlot,
+ "throughput_chart": ThroughputChartPlot,
+ }
+
def parse(self, base_dir: Path, nodes: list[TestBaseNode]) -> ParseResult:
- records: list[UnifiedResultRecord] = []
- warnings: list[str] = []
- for node in nodes:
- metrics: dict[str, Any] = {}
- for p in node.artifact_paths:
- if p.name != "metrics.json":
- continue
- try:
- metrics = json.loads(p.read_text(encoding="utf-8"))
- if not isinstance(metrics, dict):
- warnings.append(f"{p}: metrics.json must be a JSON object")
- metrics = {}
- except json.JSONDecodeError as e:
- warnings.append(f"Malformed JSON {p}: {e}")
- metrics = {"_parse_error": True}
- break
- labels = _labels_from_node(node)
- records.append(
- UnifiedResultRecord(
- test_base_path=str(node.directory.relative_to(base_dir.resolve())),
- distinguishing_labels=labels,
- metrics=dict(metrics) if metrics else {"throughput": 0.0},
- run_identity={"skeleton_sample": True},
- parse_notes=[],
- )
- )
- return ParseResult(records=records, warnings=warnings)
+ """Parse test nodes using the skeleton parser."""
+ return self.parser.parse(base_dir, nodes)
def visualize(
self,
@@ -73,133 +46,29 @@ def visualize(
group_id: str | None,
visualize_config: dict[str, Any] | None,
) -> list[str]:
- import html as html_lib
-
- import plotly.graph_objects as go
-
+ """Generate visualization reports."""
output_dir.mkdir(parents=True, exist_ok=True)
paths: list[str] = []
wanted = frozenset(report_ids or ())
- if "summary_table" in wanted:
- rows = []
- for r in model.unified_result_records:
- scenario = r.distinguishing_labels.get("scenario", "")
- tp = r.metrics.get("throughput", "")
- lat = r.metrics.get("latency_ms", "")
- rows.append(
- ""
- f"| {html_lib.escape(str(r.test_base_path))} | "
- f"{html_lib.escape(str(scenario))} | "
- f"{html_lib.escape(str(tp))} | "
- f"{html_lib.escape(str(lat))} | "
- "
"
- )
- table_html = (
- ""
- "Skeleton sample — summary"
- ""
- "Skeleton default plugin — summary
"
- ""
- "| test_base_path | scenario | throughput | latency_ms | "
- "
" + "".join(rows) + "
"
- )
- out = output_dir / "summary_table.html"
- out.write_text(table_html, encoding="utf-8")
- paths.append(str(out))
-
- if "throughput_chart" in wanted:
- xs: list[str] = []
- ys: list[float] = []
- for r in model.unified_result_records:
- label = str(r.distinguishing_labels.get("scenario") or r.test_base_path)
- raw = r.metrics.get("throughput", 0)
- try:
- y = float(raw)
- except (TypeError, ValueError):
- y = 0.0
- xs.append(label)
- ys.append(y)
- fig = go.Figure(data=[go.Bar(x=xs, y=ys)])
- fig.update_layout(
- title="Throughput by scenario",
- xaxis_title="Scenario",
- yaxis_title="Throughput",
- )
- out = output_dir / "throughput_chart.html"
- fig.write_html(out, include_plotlyjs="cdn")
- paths.append(str(out))
+ for report_id in wanted:
+ if report_id in self.plots:
+ plot_class = self.plots[report_id]
+ path = plot_class.generate(model, output_dir)
+ paths.append(path)
return paths
def kpi_catalog(self) -> list[dict[str, Any]]:
- return [
- {
- "kpi_id": "skeleton_throughput_rps",
- "name": "Skeleton throughput",
- "unit": "req/s",
- "higher_is_better": True,
- },
- {
- "kpi_id": "skeleton_latency_ms",
- "name": "Skeleton latency",
- "unit": "ms",
- "higher_is_better": False,
- },
- ]
+ """Return the KPI catalog."""
+ return self.kpi_handler.get_catalog()
def compute_kpis(self, model: UnifiedRunModel) -> list[dict[str, Any]]:
- ts = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
- out: list[dict[str, Any]] = []
- for r in model.unified_result_records:
- tp_raw = r.metrics.get("throughput", 0)
- lat_raw = r.metrics.get("latency_ms", 0)
- try:
- tp = float(tp_raw)
- except (TypeError, ValueError):
- tp = 0.0
- try:
- lat = float(lat_raw)
- except (TypeError, ValueError):
- lat = 0.0
- base_labels = {
- **r.distinguishing_labels,
- }
- out.append(
- {
- "schema_version": "1",
- "kpi_id": "skeleton_throughput_rps",
- "value": tp,
- "unit": "req/s",
- "run_id": r.test_base_path,
- "timestamp": ts,
- "labels": {**base_labels, "higher_is_better": True},
- "source": {
- "test_base_path": r.test_base_path,
- "plugin_module": model.plugin_module,
- },
- }
- )
- out.append(
- {
- "schema_version": "1",
- "kpi_id": "skeleton_latency_ms",
- "value": lat,
- "unit": "ms",
- "run_id": r.test_base_path,
- "timestamp": ts,
- "labels": {**base_labels, "higher_is_better": False},
- "source": {
- "test_base_path": r.test_base_path,
- "plugin_module": model.plugin_module,
- },
- }
- )
- return out
+ """Compute KPI values from the unified model."""
+ return self.kpi_handler.compute_kpis(model)
def build_ai_eval_payload(self, model: UnifiedRunModel) -> dict[str, Any]:
+ """Build AI evaluation payload from the unified model."""
return {
"schema_version": "1",
"run_id": model.base_directory,
@@ -215,4 +84,5 @@ def build_ai_eval_payload(self, model: UnifiedRunModel) -> dict[str, Any]:
def get_plugin() -> PostProcessingPlugin:
+ """Return the skeleton plugin instance."""
return SkeletonDefaultPlugin()