Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runtime/orchestrator/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Metrics extraction — parse test outputs into structured gate data."""
102 changes: 102 additions & 0 deletions runtime/orchestrator/metrics/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Parse junit XML and JMeter JTL into structured metrics for gate enforcement."""

from __future__ import annotations

import statistics
import xml.etree.ElementTree as ET
from typing import Any


def parse_junit(xml_text: str) -> dict[str, Any]:
"""Extract test counts and pass rate from junit XML.

Returns: {total, passed, failed, errors, skipped, rate}
"""
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
return {}

total = int(root.attrib.get("tests", 0))
failures = int(root.attrib.get("failures", 0))
errors = int(root.attrib.get("errors", 0))
skipped = int(root.attrib.get("skipped", 0))
failed = failures + errors
passed = total - failed - skipped

return {
"total": total,
"passed": passed,
"failed": failed,
"errors": errors,
"skipped": skipped,
"rate": passed / total if total > 0 else 0.0,
}


def parse_jmeter_jtl(csv_text: str) -> dict[str, Any]:
"""Extract sample counts, latency stats, and success rate from JMeter JTL.

Returns: {samples, failures, avg_ms, p95_ms, min_ms, max_ms, rate}
"""
lines = [l.strip() for l in csv_text.strip().split("\n") if l.strip()]
if len(lines) < 2:
return {"samples": 0, "failures": 0, "avg_ms": 0, "p95_ms": 0, "min_ms": 0, "max_ms": 0, "rate": 0.0}

header = lines[0].split(",")
data_lines = lines[1:]

try:
elapsed_idx = header.index("elapsed")
success_idx = header.index("success")
except ValueError:
return {}

elapsed_values = []
failures = 0
for line in data_lines:
fields = line.split(",")
if len(fields) <= max(elapsed_idx, success_idx):
continue
try:
elapsed_values.append(int(fields[elapsed_idx]))
except ValueError:
continue
if fields[success_idx].strip().lower() != "true":
failures += 1

if not elapsed_values:
return {"samples": 0, "failures": 0, "avg_ms": 0, "p95_ms": 0, "min_ms": 0, "max_ms": 0, "rate": 0.0}

elapsed_values.sort()
n = len(elapsed_values)
p95_idx = int(n * 0.95)

return {
"samples": n,
"failures": failures,
"avg_ms": int(statistics.mean(elapsed_values)),
"p95_ms": elapsed_values[min(p95_idx, n - 1)],
"min_ms": elapsed_values[0],
"max_ms": elapsed_values[-1],
"rate": (n - failures) / n if n > 0 else 0.0,
}


def extract_metrics(outcome: dict[str, Any]) -> dict[str, Any]:
"""Auto-detect format and extract metrics from node execution outcome.

Detects junit XML (contains '<testsuite') vs JMeter JTL (contains 'timeStamp,elapsed').
Returns empty dict for unrecognized formats.
"""
stdout = str(outcome.get("stdout", ""))
if not stdout.strip():
return {}

kind = outcome.get("kind", "")
if kind == "junit" or "<testsuite" in stdout:
return parse_junit(stdout)
if kind == "jmeter" or "timeStamp,elapsed" in stdout:
return parse_jmeter_jtl(stdout)

return {}
18 changes: 9 additions & 9 deletions runtime/orchestrator/workflows/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,21 @@ def _execute_node(self, name: str, kind: str, target: str) -> dict[str, Any]:
inputs={"target": target, "pipeline_step": name},
run_id=f"tc-{int(t0)}",
)
return {
stdout = getattr(outcome, "stdout", "")
result = {
"ok": getattr(outcome, "ok", True),
"stdout": getattr(outcome, "stdout", ""),
"stdout": stdout,
"duration_ms": (time.time() - t0) * 1000,
}
# Extract structured metrics from test outputs for gate enforcement
from runtime.orchestrator.metrics.parser import extract_metrics
result["metrics"] = extract_metrics({"stdout": str(stdout)})
return result
except Exception as exc:
return {"ok": False, "stdout": str(exc), "duration_ms": 0}
return {"ok": False, "stdout": str(exc), "duration_ms": 0, "metrics": {}}

def _check_gates(self, step_name: str, outcome: dict) -> str | None:
"""Check gate conditions after specific steps. Returns block reason or None.

TODO: extract real metrics from outcome['stdout'] (junit XML / JMeter JTL).
Currently uses stub values — gates always pass. See PR #191 review.
"""
# Try to extract structured metrics from outcome
"""Check gate conditions after specific steps. Returns block reason or None."""
metrics = outcome.get("metrics", {}) if isinstance(outcome, dict) else {}

if step_name == "smoke-test":
Expand Down
110 changes: 110 additions & 0 deletions runtime/tests/test_metrics_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""TDD: Metrics parser — extract test results from junit XML and JMeter JTL."""

from __future__ import annotations

import pytest

JUNIT_XML_PASSING = """<?xml version="1.0"?>
<testsuite name="smoke" tests="10" failures="0" errors="0" skipped="0">
<testcase classname="test_auth" name="test_login" time="0.5"/>
<testcase classname="test_auth" name="test_logout" time="0.3"/>
<testcase classname="test_auth" name="test_register" time="1.2"/>
</testsuite>"""

JUNIT_XML_FAILING = """<?xml version="1.0"?>
<testsuite name="smoke" tests="10" failures="3" errors="1" skipped="1">
<testcase classname="test_auth" name="test_login" time="0.5"/>
<testcase classname="test_auth" name="test_fail1" time="0.3">
<failure message="assert 200 == 404"/>
</testcase>
</testsuite>"""

JUNIT_XML_EMPTY = """<?xml version="1.0"?>
<testsuite name="smoke" tests="0" failures="0" errors="0" skipped="0">
</testsuite>"""

JMETER_JTL_SAMPLE = """timeStamp,elapsed,label,responseCode,responseMessage,success,bytes,grpThreads,allThreads,Latency
1680000000000,120,Login,200,OK,true,1024,1,1,100
1680000001000,350,Search,200,OK,true,2048,3,3,300
1680000002000,80,Logout,200,OK,true,512,1,1,60
1680000003000,2500,HeavyQuery,500,Error,false,0,5,5,2400
"""


class TestJunitParser:
def test_parse_passing(self):
from runtime.orchestrator.metrics.parser import parse_junit
m = parse_junit(JUNIT_XML_PASSING)
assert m["total"] == 10
assert m["passed"] == 10
assert m["failed"] == 0
assert m["rate"] == 1.0

def test_parse_failing(self):
from runtime.orchestrator.metrics.parser import parse_junit
m = parse_junit(JUNIT_XML_FAILING)
assert m["total"] == 10
assert m["failed"] == 4 # 3 failures + 1 error
assert m["skipped"] == 1
assert m["rate"] == 0.5 # 5 passed / 10 total

def test_parse_empty(self):
from runtime.orchestrator.metrics.parser import parse_junit
m = parse_junit(JUNIT_XML_EMPTY)
assert m["total"] == 0
assert m["passed"] == 0

def test_parse_invalid_xml(self):
from runtime.orchestrator.metrics.parser import parse_junit
m = parse_junit("not xml at all")
assert m == {}


class TestJmeterParser:
def test_parse_jtl(self):
from runtime.orchestrator.metrics.parser import parse_jmeter_jtl
m = parse_jmeter_jtl(JMETER_JTL_SAMPLE)
assert m["samples"] == 4
assert m["failures"] == 1
assert 100 <= m["avg_ms"] <= 800 # (120+350+80+2500)/4 = 762.5
assert m["p95_ms"] >= 2000 # HeavyQuery is 2500
assert 0 <= m["rate"] <= 1.0

def test_parse_jtl_percentile(self):
from runtime.orchestrator.metrics.parser import parse_jmeter_jtl
m = parse_jmeter_jtl(JMETER_JTL_SAMPLE)
# p95 should be >= 2500 (the slowest request)
assert m["p95_ms"] >= 2000

def test_parse_empty_jtl(self):
from runtime.orchestrator.metrics.parser import parse_jmeter_jtl
m = parse_jmeter_jtl("timeStamp,elapsed,label\n")
assert m["samples"] == 0


class TestMetricsExtractor:
def test_extract_from_outcome_junit(self):
from runtime.orchestrator.metrics.parser import extract_metrics
m = extract_metrics({"stdout": JUNIT_XML_PASSING, "kind": "junit"})
assert m["total"] == 10
assert m["rate"] == 1.0

def test_extract_from_outcome_auto_detect_junit(self):
from runtime.orchestrator.metrics.parser import extract_metrics
m = extract_metrics({"stdout": JUNIT_XML_PASSING})
assert m["total"] == 10 # auto-detected as junit

def test_extract_from_outcome_auto_detect_jmeter(self):
from runtime.orchestrator.metrics.parser import extract_metrics
m = extract_metrics({"stdout": JMETER_JTL_SAMPLE})
assert m["samples"] == 4 # auto-detected as jmeter

def test_extract_empty_outcome(self):
from runtime.orchestrator.metrics.parser import extract_metrics
m = extract_metrics({"stdout": ""})
assert m == {}

def test_extract_none_stdout(self):
from runtime.orchestrator.metrics.parser import extract_metrics
m = extract_metrics({})
assert m == {}