Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,44 @@ jobs:
if-no-files-found: ignore


patch-coverage:
name: Patch coverage (changed lines ≥ 80%)
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'

steps:
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
# Full history so diff-cover can diff against the base branch.
fetch-depth: 0

- name: Set up Python 3.11
uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0
with:
python-version: "3.11"

- name: Install uv
uses: astral-sh/setup-uv@d4b2f3b6ecc6e67c4457f6d3e41ec42d3d0fcb86 # v5.4.2
with:
enable-cache: true
cache-dependency-glob: |
pyproject.toml
uv.lock

- name: Install dependencies
run: uv sync --frozen --all-extras --dev

- name: Run pytest with coverage
run: uv run pytest --cov --cov-report=xml

- name: Enforce 80% coverage on changed lines
run: >
uv run diff-cover coverage.xml
--compare-branch origin/${{ github.base_ref }}
--fail-under 80
--show-uncovered

playground-integration:
name: Playground integration tests
runs-on: ubuntu-latest
Expand Down Expand Up @@ -98,7 +136,7 @@ jobs:
GOOGLE_DRIVE_SA_JSON: ${{ secrets.GOOGLE_DRIVE_SA_JSON }}
GOOGLE_DRIVE_FOLDER_ID: ${{ secrets.GOOGLE_DRIVE_FOLDER_ID }}
GDRIVE_TEST_RECIPIENT_EMAIL: ${{ secrets.GDRIVE_TEST_RECIPIENT_EMAIL }}

#stripe
STRIPE_API_KEY: ${{ secrets.STRIPE_API_KEY }}
STRIPE_TEST_CUSTOMER_ID: ${{ secrets.STRIPE_TEST_CUSTOMER_ID }}
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies = [
"opentelemetry-instrumentation-fastapi>=0.45b0",
"opentelemetry-exporter-otlp>=1.24.0",
"traceloop-sdk>=0.53.0",
"grpcio-health-checking>=1.62.0",
]

[project.urls]
Expand Down Expand Up @@ -107,6 +108,7 @@ dev = [
"bandit[toml]>=1.7.9",
"pre-commit>=4.0.0",
"pytest-playwright>=0.4.0",
"diff-cover>=9.0.0",
"cyclonedx-bom==4.6.1",
"reuse>=5.0.0",
"licenseheaders>=0.8.8",
Expand Down Expand Up @@ -142,6 +144,9 @@ fail_under = 80
omit = [
"*/__pycache__/*",
"*/node_wire.egg-info/*",
# Generated gRPC stubs — also excluded from Ruff, Mypy, and CodeQL.
"*_pb2.py",
"*_pb2_grpc.py",
]

[tool.ruff]
Expand Down
7 changes: 7 additions & 0 deletions src/bindings/grpc_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from typing import Any

import grpc
from grpc_health.v1 import health as grpc_health
from grpc_health.v1 import health_pb2, health_pb2_grpc

from bindings.factory import ConnectorFactory
from node_wire_runtime.connector_registry import auto_register
Expand Down Expand Up @@ -117,6 +119,11 @@ def serve(port: int = 50051) -> None:
host = resolve_grpc_host()
configure_grpc_server_port(server, port=port, host=host, cert_path=cert_path, key_path=key_path)

health_servicer = grpc_health.HealthServicer()
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
health_servicer.set("", health_pb2.HealthCheckResponse.SERVING)
health_servicer.set("aot.connectors.ConnectorService", health_pb2.HealthCheckResponse.SERVING)

server.start()
server.wait_for_termination()

Expand Down
14 changes: 14 additions & 0 deletions src/bindings/rest_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ async def health() -> Dict[str, str]:
return {"status": "ok"}


@app.get("/ready", tags=["system"])
async def ready() -> Dict[str, str]:
try:
factory = get_factory()
if not factory.list_for_protocol("rest") and not factory.list_for_protocol("grpc"):
raise HTTPException(status_code=503, detail="no connectors loaded")
except HTTPException:
raise
except Exception as exc:
logger.warning("Readiness check failed: %s", exc)
raise HTTPException(status_code=503, detail="factory not ready")
return {"status": "ready"}


def _http_status_for_category(category: ErrorCategory | None) -> int:
if category is None:
return 200
Expand Down
63 changes: 56 additions & 7 deletions src/node_wire_runtime/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import contextvars
import inspect
import logging
import time
import uuid
from abc import ABC
from collections import defaultdict
Expand All @@ -26,8 +27,9 @@
List,
)

from opentelemetry import trace
from opentelemetry import metrics, trace
from opentelemetry.trace import Tracer
from opentelemetry.util.types import AttributeValue
from pybreaker import CircuitBreaker
from pydantic import BaseModel, Field, RootModel, ValidationError

Expand All @@ -41,6 +43,17 @@

logger = logging.getLogger("runtime.base_connector")
tracer: Tracer = trace.get_tracer("runtime")
_meter = metrics.get_meter("runtime")
_invocation_counter = _meter.create_counter(
"connector.invocations",
unit="1",
description="Total number of connector invocations",
)
_invocation_duration = _meter.create_histogram(
"connector.duration_ms",
unit="ms",
description="Connector invocation wall-clock time in milliseconds",
)
ErrorMapper.register(PolicyDenied, ErrorCategory.AUTH, code="POLICY_DENIED")


Expand Down Expand Up @@ -441,6 +454,7 @@ async def run(
- Maps exceptions into the standard error taxonomy
"""
trace_id = str(uuid.uuid4())
_start = time.monotonic()

with tracer.start_as_current_span(
"connector.run",
Expand All @@ -458,9 +472,15 @@ async def run(
"trace_id": trace_id,
"connector_id": self.connector_id,
"action": self.action,
"principal": principal,
"tenant_id": tenant_id,
"scopes": list(scopes) if scopes else [],
"audit": True,
"audit_event": "invocation_start",
},
)

_response: Optional[ConnectorResponse] = None
token = _caller_execution_ctx.set((principal, tenant_id, scopes))
try:
try:
Expand All @@ -474,19 +494,22 @@ async def run(
"action": self.action,
"error_type": type(exc).__name__,
"error_message": str(exc),
"audit": True,
"audit_event": "invocation_validation_failure",
},
)
details = [
{"loc": e["loc"], "msg": e["msg"], "type": e["type"]} for e in exc.errors()
]
return ConnectorResponse(
_response = ConnectorResponse(
success=False,
error_code="VALIDATION_ERROR",
error_category=ErrorCategory.BUSINESS,
message="Input validation failed; please check the request payload.",
trace_id=trace_id,
details=details,
)
return _response

# Policy hook
if self._policy_hook is not None:
Expand Down Expand Up @@ -518,15 +541,20 @@ async def run(
},
)
mapped = ErrorMapper.resolve(exc)
return ConnectorResponse(
_response = ConnectorResponse(
success=False,
error_code=mapped.code,
error_category=mapped.category,
message=str(exc),
trace_id=trace_id,
)
return _response

execute_with_resilience = with_resilience(self._breaker_for_tenant(tenant_id))
execute_with_resilience = with_resilience(
self._breaker_for_tenant(tenant_id),
connector_id=self.connector_id,
action=self.action,
)

@execute_with_resilience
async def _do_execute(*, trace_id: str) -> Any:
Expand All @@ -540,14 +568,18 @@ async def _do_execute(*, trace_id: str) -> Any:
"trace_id": trace_id,
"connector_id": self.connector_id,
"action": self.action,
"duration_ms": round((time.monotonic() - _start) * 1000, 2),
"audit": True,
"audit_event": "invocation_success",
},
)

return ConnectorResponse(
_response = ConnectorResponse(
success=True,
data=output_model.model_dump(),
trace_id=trace_id,
)
return _response
except NestedConnectorActionError as exc:
nested = exc.response
logger.warning(
Expand All @@ -559,14 +591,15 @@ async def _do_execute(*, trace_id: str) -> Any:
"nested_trace_id": nested.trace_id,
},
)
return ConnectorResponse(
_response = ConnectorResponse(
success=False,
error_code=nested.error_code,
error_category=nested.error_category,
message=nested.message,
trace_id=trace_id,
details=_merge_nested_failure_details(nested),
)
return _response
except Exception as exc: # noqa: BLE001
mapped = ErrorMapper.resolve(exc)
logger.error(
Expand All @@ -579,17 +612,33 @@ async def _do_execute(*, trace_id: str) -> Any:
"error_category": mapped.category.value,
"error_type": type(exc).__name__,
"error_message": str(exc),
"duration_ms": round((time.monotonic() - _start) * 1000, 2),
"audit": True,
"audit_event": "invocation_failure",
},
)
return ConnectorResponse(
_response = ConnectorResponse(
success=False,
error_code=mapped.code,
error_category=mapped.category,
message=str(exc),
trace_id=trace_id,
)
return _response
finally:
_caller_execution_ctx.reset(token)
if _response is not None:
_duration_ms = (time.monotonic() - _start) * 1000
_metric_attrs: Dict[str, AttributeValue] = {
"connector.id": self.connector_id,
"connector.action": self.action,
"success": _response.success,
"error_category": (
_response.error_category.value if _response.error_category else "none"
),
}
_invocation_counter.add(1, attributes=_metric_attrs)
_invocation_duration.record(_duration_ms, attributes=_metric_attrs)

@classmethod
def get_registry(cls) -> Dict[str, Type[BaseConnector]]:
Expand Down
59 changes: 58 additions & 1 deletion src/node_wire_runtime/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
from typing import Optional, cast

from opentelemetry._logs import set_logger_provider
from opentelemetry import trace
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, LogExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import MetricExporter, PeriodicExportingMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter
from opentelemetry.sdk.trace.sampling import ParentBased, TraceIdRatioBased
Expand Down Expand Up @@ -85,6 +88,36 @@ def force_flush(self, timeout_millis: int = 30000):
return True


class SanitizingMetricExporter(MetricExporter):
def __init__(self, delegate: MetricExporter):
self._delegate = delegate

@property
def _preferred_temporality(self): # type: ignore[override]
return self._delegate._preferred_temporality

@property
def _preferred_aggregation(self): # type: ignore[override]
return self._delegate._preferred_aggregation

def export(self, metrics_data, timeout_millis=10_000, **kwargs):
for rm in metrics_data.resource_metrics:
for sm in rm.scope_metrics:
for metric in sm.metrics:
if hasattr(metric.data, "data_points"):
for dp in metric.data.data_points:
attrs = getattr(dp, "attributes", None)
if attrs and isinstance(attrs, dict):
_sanitize_otlp_attributes(attrs)
return self._delegate.export(metrics_data, timeout_millis=timeout_millis, **kwargs)

def shutdown(self, timeout_millis=30_000, **kwargs):
return self._delegate.shutdown(timeout_millis=timeout_millis, **kwargs)

def force_flush(self, timeout_millis: float = 10_000):
return self._delegate.force_flush(timeout_millis)


def init_observability(app_name: str = "node_wire") -> None:
"""
Initialize OpenTelemetry + OpenLLMetry/Traceloop for the process.
Expand Down Expand Up @@ -153,6 +186,30 @@ def init_observability(app_name: str = "node_wire") -> None:
root_logger.addFilter(_OtelContextFilter())
root_logger.addHandler(LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider))

# Metrics: export to the local OTLP collector alongside traces and logs.
metric_interval_str: str = os.getenv("AOT_METRIC_EXPORT_INTERVAL_MS", "60000")
try:
metric_interval_ms = int(metric_interval_str)
except ValueError:
logger.warning(
"Invalid AOT_METRIC_EXPORT_INTERVAL_MS %r, falling back to 60000", metric_interval_str
)
metric_interval_ms = 60000

metric_exporter = SanitizingMetricExporter(
OTLPMetricExporter(
headers=dict(header.split("=", 1) for header in otlp_headers.split(","))
if otlp_headers
else None,
)
)
metric_reader = PeriodicExportingMetricReader(
metric_exporter,
export_interval_millis=metric_interval_ms,
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)

# Initialize Traceloop/OpenLLMetry in metadata-only mode. Advanced AI features
# (prompt logging, workflows, tools) are intentionally deferred.
# Skip silently when no API key is configured — Traceloop is optional.
Expand Down
Loading
Loading