diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py index d3ecaa6f..cce34580 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/__init__.py @@ -4,6 +4,7 @@ # Microsoft Agent 365 Python SDK for OpenTelemetry tracing. from .agent_details import AgentDetails +from .apply_guardrail_scope import ApplyGuardrailScope from .config import ( configure, get_tracer, @@ -19,6 +20,11 @@ unregister_span_enricher, ) from .exporters.spectra_exporter_options import SpectraExporterOptions +from .guardrail_decision_type import GuardrailDecisionType +from .guardrail_details import GuardrailDetails +from .guardrail_finding import GuardrailFinding +from .guardrail_risk_severity import GuardrailRiskSeverity +from .guardrail_target_type import GuardrailTargetType from .inference_call_details import InferenceCallDetails from .models.service_endpoint import ServiceEndpoint from .inference_operation_type import InferenceOperationType @@ -80,6 +86,7 @@ # Base scope class "OpenTelemetryScope", # Specific scope classes + "ApplyGuardrailScope", "ExecuteToolScope", "InvokeAgentScope", "InferenceScope", @@ -98,7 +105,12 @@ "SpanDetails", "InferenceCallDetails", "ServiceEndpoint", + "GuardrailDetails", + "GuardrailFinding", # Enums + "GuardrailDecisionType", + "GuardrailRiskSeverity", + "GuardrailTargetType", "InferenceOperationType", "ToolType", # OTEL gen-ai message format types diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/apply_guardrail_scope.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/apply_guardrail_scope.py new file mode 100644 index 00000000..59f092dd --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/apply_guardrail_scope.py @@ -0,0 +1,240 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from opentelemetry.trace import SpanKind + +from .agent_details import AgentDetails +from .constants import ( + APPLY_GUARDRAIL_OPERATION_NAME, + CHANNEL_LINK_KEY, + CHANNEL_NAME_KEY, + GEN_AI_CONVERSATION_ID_KEY, + GUARDIAN_ID_KEY, + GUARDIAN_NAME_KEY, + GUARDIAN_PROVIDER_NAME_KEY, + GUARDIAN_VERSION_KEY, + SECURITY_CONTENT_INPUT_HASH_KEY, + SECURITY_CONTENT_INPUT_VALUE_KEY, + SECURITY_CONTENT_MODIFIED_KEY, + SECURITY_CONTENT_OUTPUT_VALUE_KEY, + SECURITY_DECISION_CODE_KEY, + SECURITY_DECISION_REASON_KEY, + SECURITY_DECISION_TYPE_KEY, + SECURITY_EXTERNAL_EVENT_ID_KEY, + SECURITY_FINDING_EVENT_NAME, + SECURITY_POLICY_DECISION_TYPE_KEY, + SECURITY_POLICY_ID_KEY, + SECURITY_POLICY_NAME_KEY, + SECURITY_POLICY_VERSION_KEY, + SECURITY_RISK_CATEGORY_KEY, + SECURITY_RISK_METADATA_KEY, + SECURITY_RISK_SCORE_KEY, + SECURITY_RISK_SEVERITY_KEY, + SECURITY_TARGET_ID_KEY, + SECURITY_TARGET_TYPE_KEY, +) +from .guardrail_decision_type import GuardrailDecisionType +from .guardrail_details import GuardrailDetails +from .guardrail_finding import GuardrailFinding +from .models.user_details import UserDetails +from .opentelemetry_scope import OpenTelemetryScope +from .request import Request +from .span_details import SpanDetails +from .utils import safe_json_dumps + + +class ApplyGuardrailScope(OpenTelemetryScope): + """Provides OpenTelemetry tracing scope for security guardrail evaluation operations. + + Describes a security guardian evaluation. Multiple guardian spans MAY exist under a + single operation span if multiple guardians are chained. + + Guardian spans SHOULD be children of the operation span they are protecting + (e.g., inference or execute_tool spans). + """ + + @staticmethod + def start( + details: GuardrailDetails, + agent_details: AgentDetails, + request: Request | None = None, + user_details: UserDetails | None = None, + span_details: SpanDetails | None = None, + ) -> "ApplyGuardrailScope": + """Creates and starts a new scope for guardrail evaluation tracing. + + Args: + details: Details of the guardrail evaluation (target, decision, guardian + info, policy). + agent_details: Information about the agent being guarded. + request: Optional request details for conversation context. + user_details: Optional human user details. + span_details: Optional span configuration (parent context, timing, kind, + span links). + + Returns: + A new ApplyGuardrailScope instance. + """ + return ApplyGuardrailScope(details, agent_details, request, user_details, span_details) + + def __init__( + self, + details: GuardrailDetails, + agent_details: AgentDetails, + request: Request | None = None, + user_details: UserDetails | None = None, + span_details: SpanDetails | None = None, + ): + """Initialize the guardrail evaluation scope. + + Args: + details: Details of the guardrail evaluation. + agent_details: Information about the agent being guarded. + request: Optional request details for conversation context. + user_details: Optional human user details. + span_details: Optional span configuration. + """ + resolved_span_details = SpanDetails( + span_kind=( + span_details.span_kind + if span_details and span_details.span_kind + else SpanKind.INTERNAL + ), + parent_context=span_details.parent_context if span_details else None, + start_time=span_details.start_time if span_details else None, + end_time=span_details.end_time if span_details else None, + span_links=span_details.span_links if span_details else None, + ) + + activity_name = self._build_activity_name(details) + + super().__init__( + operation_name=APPLY_GUARDRAIL_OPERATION_NAME, + activity_name=activity_name, + agent_details=agent_details, + span_details=resolved_span_details, + ) + + # Required attributes + self.set_tag_maybe(SECURITY_DECISION_TYPE_KEY, details.decision_type.value) + self.set_tag_maybe(SECURITY_TARGET_TYPE_KEY, details.target_type) + + # Guardian attributes + self.set_tag_maybe(GUARDIAN_ID_KEY, details.guardian_id) + self.set_tag_maybe(GUARDIAN_NAME_KEY, details.guardian_name) + self.set_tag_maybe(GUARDIAN_PROVIDER_NAME_KEY, details.guardian_provider_name) + self.set_tag_maybe(GUARDIAN_VERSION_KEY, details.guardian_version) + + # Target attributes + self.set_tag_maybe(SECURITY_TARGET_ID_KEY, details.target_id) + + # Decision attributes + self.set_tag_maybe(SECURITY_DECISION_REASON_KEY, details.decision_reason) + self.set_tag_maybe(SECURITY_DECISION_CODE_KEY, details.decision_code) + + # Policy attributes + self.set_tag_maybe(SECURITY_POLICY_ID_KEY, details.policy_id) + self.set_tag_maybe(SECURITY_POLICY_NAME_KEY, details.policy_name) + self.set_tag_maybe(SECURITY_POLICY_VERSION_KEY, details.policy_version) + + # Content attributes + self.set_tag_maybe(SECURITY_CONTENT_INPUT_HASH_KEY, details.content_input_hash) + if details.content_modified is not None: + self.set_tag_maybe(SECURITY_CONTENT_MODIFIED_KEY, details.content_modified) + + # Correlation + self.set_tag_maybe(SECURITY_EXTERNAL_EVENT_ID_KEY, details.external_event_id) + + # Request context + if request is not None: + content_value = self._serialize_content(request.content) + self.set_tag_maybe(SECURITY_CONTENT_INPUT_VALUE_KEY, content_value) + self.set_tag_maybe(GEN_AI_CONVERSATION_ID_KEY, request.conversation_id) + if request.channel is not None: + self.set_tag_maybe(CHANNEL_NAME_KEY, request.channel.name) + self.set_tag_maybe(CHANNEL_LINK_KEY, request.channel.link) + + def record_decision( + self, decision_type: GuardrailDecisionType, reason: str | None = None + ) -> None: + """Records an updated decision on the guardrail span. + + Use this when the guardrail decision is determined after span creation. + + Args: + decision_type: The decision type made by the guardian. + reason: Optional human-readable explanation for the decision. + """ + self.set_tag_maybe(SECURITY_DECISION_TYPE_KEY, decision_type.value) + self.set_tag_maybe(SECURITY_DECISION_REASON_KEY, reason) + + def record_content_output(self, output_value: str) -> None: + """Records the output content value for the guardrail evaluation (opt-in). + + Args: + output_value: The output content after guardrail processing. + """ + self.set_tag_maybe(SECURITY_CONTENT_OUTPUT_VALUE_KEY, output_value) + + def record_finding(self, finding: GuardrailFinding) -> None: + """Records a security finding event on the current span. + + Multiple findings may be recorded per guardrail evaluation. + + Args: + finding: The security finding to record. + + Raises: + ValueError: When finding is None. + """ + if finding is None: + raise ValueError("finding must not be None") + + if self._span is None or not self._is_telemetry_enabled(): + return + + attributes: dict[str, str | float | list[str]] = { + SECURITY_RISK_CATEGORY_KEY: finding.risk_category, + SECURITY_RISK_SEVERITY_KEY: finding.risk_severity, + } + + if finding.policy_decision_type is not None: + attributes[SECURITY_POLICY_DECISION_TYPE_KEY] = finding.policy_decision_type + + if finding.policy_id is not None: + attributes[SECURITY_POLICY_ID_KEY] = finding.policy_id + + if finding.policy_name is not None: + attributes[SECURITY_POLICY_NAME_KEY] = finding.policy_name + + if finding.policy_version is not None: + attributes[SECURITY_POLICY_VERSION_KEY] = finding.policy_version + + if finding.risk_score is not None: + attributes[SECURITY_RISK_SCORE_KEY] = finding.risk_score + + if finding.risk_metadata is not None: + attributes[SECURITY_RISK_METADATA_KEY] = finding.risk_metadata + + self._span.add_event(SECURITY_FINDING_EVENT_NAME, attributes=attributes) + + @staticmethod + def _build_activity_name(details: GuardrailDetails) -> str: + """Build the display name for the span.""" + if details.guardian_name: + return f"{APPLY_GUARDRAIL_OPERATION_NAME} {details.guardian_name} {details.target_type}" + return f"{APPLY_GUARDRAIL_OPERATION_NAME} {details.target_type}" + + @staticmethod + def _serialize_content(content: object) -> str | None: + """Serialize request content to a string suitable for an OTel attribute. + + OTel attributes only accept primitive values or sequences of primitives. + If content is already a string it is returned as-is; otherwise it is + JSON-serialized so that structured InputMessages objects are captured. + """ + if content is None: + return None + if isinstance(content, str): + return content + return safe_json_dumps(content) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/constants.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/constants.py index a1afbe7b..85871ba8 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/constants.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/constants.py @@ -7,6 +7,7 @@ INVOKE_AGENT_OPERATION_NAME = "invoke_agent" EXECUTE_TOOL_OPERATION_NAME = "execute_tool" OUTPUT_MESSAGES_OPERATION_NAME = "output_messages" +APPLY_GUARDRAIL_OPERATION_NAME = "apply_guardrail" CHAT_OPERATION_NAME = "chat" # OpenTelemetry semantic conventions @@ -112,3 +113,32 @@ TELEMETRY_SDK_VERSION_KEY = "telemetry.sdk.version" TELEMETRY_SDK_NAME_VALUE = "A365ObservabilitySDK" TELEMETRY_SDK_LANGUAGE_VALUE = "python" + +# Guardian attributes +GUARDIAN_ID_KEY = "microsoft.guardian.id" +GUARDIAN_NAME_KEY = "microsoft.guardian.name" +GUARDIAN_PROVIDER_NAME_KEY = "microsoft.guardian.provider.name" +GUARDIAN_VERSION_KEY = "microsoft.guardian.version" + +# Security attributes +SECURITY_DECISION_TYPE_KEY = "microsoft.security.decision.type" +SECURITY_DECISION_REASON_KEY = "microsoft.security.decision.reason" +SECURITY_DECISION_CODE_KEY = "microsoft.security.decision.code" +SECURITY_TARGET_TYPE_KEY = "microsoft.security.target.type" +SECURITY_TARGET_ID_KEY = "microsoft.security.target.id" +SECURITY_POLICY_ID_KEY = "microsoft.security.policy.id" +SECURITY_POLICY_NAME_KEY = "microsoft.security.policy.name" +SECURITY_POLICY_VERSION_KEY = "microsoft.security.policy.version" +SECURITY_CONTENT_INPUT_HASH_KEY = "microsoft.security.content.input.hash" +SECURITY_CONTENT_MODIFIED_KEY = "microsoft.security.content.modified" +SECURITY_EXTERNAL_EVENT_ID_KEY = "microsoft.security.external_event_id" +SECURITY_CONTENT_INPUT_VALUE_KEY = "microsoft.security.content.input.value" +SECURITY_CONTENT_OUTPUT_VALUE_KEY = "microsoft.security.content.output.value" + +# Security finding event +SECURITY_FINDING_EVENT_NAME = "microsoft.security.finding" +SECURITY_RISK_CATEGORY_KEY = "microsoft.security.risk.category" +SECURITY_RISK_SEVERITY_KEY = "microsoft.security.risk.severity" +SECURITY_RISK_SCORE_KEY = "microsoft.security.risk.score" +SECURITY_RISK_METADATA_KEY = "microsoft.security.risk.metadata" +SECURITY_POLICY_DECISION_TYPE_KEY = "microsoft.security.policy.decision.type" diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py index 2d84b074..1e357176 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py @@ -14,6 +14,7 @@ from opentelemetry.trace import SpanKind, StatusCode from ..constants import ( + APPLY_GUARDRAIL_OPERATION_NAME, CHAT_OPERATION_NAME, ENABLE_A365_OBSERVABILITY_EXPORTER, EXECUTE_TOOL_OPERATION_NAME, @@ -38,6 +39,7 @@ INVOKE_AGENT_OPERATION_NAME, EXECUTE_TOOL_OPERATION_NAME, OUTPUT_MESSAGES_OPERATION_NAME, + APPLY_GUARDRAIL_OPERATION_NAME, CHAT_OPERATION_NAME, InferenceOperationType.CHAT.value, } diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_decision_type.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_decision_type.py new file mode 100644 index 00000000..e955afd3 --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_decision_type.py @@ -0,0 +1,23 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from enum import Enum + + +class GuardrailDecisionType(Enum): + """The decision made by a security guardian during guardrail evaluation.""" + + ALLOW = "allow" + """Content or action is allowed to proceed.""" + + AUDIT = "audit" + """Content or action is logged for review but allowed to proceed.""" + + DENY = "deny" + """Content or action is denied/blocked.""" + + MODIFY = "modify" + """Content was modified (e.g., redacted, sanitized, rewritten).""" + + WARN = "warn" + """Content or action triggered a warning but is allowed to proceed.""" diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_details.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_details.py new file mode 100644 index 00000000..f4fe12bd --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_details.py @@ -0,0 +1,47 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from dataclasses import dataclass + +from .guardrail_decision_type import GuardrailDecisionType + + +@dataclass(frozen=True) +class GuardrailDetails: + """Details of a guardrail evaluation for security operations tracing. + + Args: + target_type: The type of content or action the guardrail is applied to (required). + See GuardrailTargetType for well-known values. + decision_type: The decision made by the guardian (required). + guardian_name: Human-readable name of the guardian. + guardian_id: Unique identifier of the guardian. + guardian_provider_name: Provider of the guardian service + (e.g., azure.ai.content_safety). + guardian_version: Version of the guardian. + target_id: Identifier of the target being guarded. + decision_reason: Human-readable explanation for the decision. + decision_code: Machine-readable decision code. + policy_id: Identifier of the policy that triggered the decision. + policy_name: Human-readable name of the policy. + policy_version: Version of the policy. + content_input_hash: Hash of the input content for forensic correlation. + content_modified: Whether content was modified by the guardrail. + external_event_id: External correlation identifier for SIEM systems. + """ + + target_type: str + decision_type: GuardrailDecisionType + guardian_name: str | None = None + guardian_id: str | None = None + guardian_provider_name: str | None = None + guardian_version: str | None = None + target_id: str | None = None + decision_reason: str | None = None + decision_code: str | None = None + policy_id: str | None = None + policy_name: str | None = None + policy_version: str | None = None + content_input_hash: str | None = None + content_modified: bool | None = None + external_event_id: str | None = None diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_finding.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_finding.py new file mode 100644 index 00000000..1210da1c --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_finding.py @@ -0,0 +1,35 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class GuardrailFinding: + """Represents a single security finding detected during guardian evaluation. + + Multiple findings may be emitted for a single guardrail span. + + Args: + risk_category: The category of security risk detected (required). + Common values: prompt_injection, sensitive_info_disclosure, jailbreak, + toxicity, pii. + risk_severity: The severity level of the detected risk (required). + See GuardrailRiskSeverity for well-known values. + policy_decision_type: The decision type for this specific policy finding. + policy_id: Identifier of the policy that triggered the finding. + policy_name: Human-readable name of the triggered policy. + policy_version: Version of the policy. + risk_score: Numeric risk/confidence score (0.0 to 1.0). + risk_metadata: Non-content metadata about the detected risk + (MUST NOT contain PII). + """ + + risk_category: str + risk_severity: str + policy_decision_type: str | None = None + policy_id: str | None = None + policy_name: str | None = None + policy_version: str | None = None + risk_score: float | None = None + risk_metadata: list[str] | None = None diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_risk_severity.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_risk_severity.py new file mode 100644 index 00000000..650823bf --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_risk_severity.py @@ -0,0 +1,21 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +class GuardrailRiskSeverity: + """Well-known severity levels for security risks detected by guardrails.""" + + NONE = "none" + """No risk detected.""" + + LOW = "low" + """Low severity risk.""" + + MEDIUM = "medium" + """Medium severity risk.""" + + HIGH = "high" + """High severity risk.""" + + CRITICAL = "critical" + """Critical severity risk requiring immediate action.""" diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_target_type.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_target_type.py new file mode 100644 index 00000000..c1b471c4 --- /dev/null +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/guardrail_target_type.py @@ -0,0 +1,38 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +class GuardrailTargetType: + """Well-known values for the type of content or action a guardrail is applied to. + + This is a free-form field per the OpenTelemetry semantic conventions. + These class attributes provide discoverability for common values, but custom + strings are also accepted when constructing GuardrailDetails. + """ + + LLM_INPUT = "llm_input" + """Input to a language model.""" + + LLM_OUTPUT = "llm_output" + """Output from a language model.""" + + TOOL_CALL = "tool_call" + """A tool call action.""" + + TOOL_DEFINITION = "tool_definition" + """A tool definition.""" + + MEMORY_STORE = "memory_store" + """A memory store operation.""" + + MEMORY_RETRIEVE = "memory_retrieve" + """A memory retrieval operation.""" + + KNOWLEDGE_QUERY = "knowledge_query" + """A knowledge query.""" + + KNOWLEDGE_RESULT = "knowledge_result" + """A knowledge retrieval result.""" + + MESSAGE = "message" + """A message.""" diff --git a/tests/observability/core/test_apply_guardrail_scope.py b/tests/observability/core/test_apply_guardrail_scope.py new file mode 100644 index 00000000..61acd60a --- /dev/null +++ b/tests/observability/core/test_apply_guardrail_scope.py @@ -0,0 +1,365 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import os +import unittest + +from microsoft_agents_a365.observability.core import ( + AgentDetails, + ApplyGuardrailScope, + Channel, + Request, + configure, + get_tracer_provider, +) +from microsoft_agents_a365.observability.core.config import _telemetry_manager +from microsoft_agents_a365.observability.core.constants import ( + APPLY_GUARDRAIL_OPERATION_NAME, + CHANNEL_LINK_KEY, + CHANNEL_NAME_KEY, + GEN_AI_CONVERSATION_ID_KEY, + GEN_AI_OPERATION_NAME_KEY, + GUARDIAN_ID_KEY, + GUARDIAN_NAME_KEY, + GUARDIAN_PROVIDER_NAME_KEY, + GUARDIAN_VERSION_KEY, + SECURITY_CONTENT_INPUT_HASH_KEY, + SECURITY_CONTENT_INPUT_VALUE_KEY, + SECURITY_CONTENT_MODIFIED_KEY, + SECURITY_CONTENT_OUTPUT_VALUE_KEY, + SECURITY_DECISION_REASON_KEY, + SECURITY_DECISION_TYPE_KEY, + SECURITY_EXTERNAL_EVENT_ID_KEY, + SECURITY_FINDING_EVENT_NAME, + SECURITY_POLICY_DECISION_TYPE_KEY, + SECURITY_POLICY_ID_KEY, + SECURITY_POLICY_NAME_KEY, + SECURITY_POLICY_VERSION_KEY, + SECURITY_RISK_CATEGORY_KEY, + SECURITY_RISK_SCORE_KEY, + SECURITY_RISK_SEVERITY_KEY, + SECURITY_TARGET_TYPE_KEY, +) +from microsoft_agents_a365.observability.core.guardrail_decision_type import ( + GuardrailDecisionType, +) +from microsoft_agents_a365.observability.core.guardrail_details import GuardrailDetails +from microsoft_agents_a365.observability.core.guardrail_finding import GuardrailFinding +from microsoft_agents_a365.observability.core.guardrail_risk_severity import ( + GuardrailRiskSeverity, +) +from microsoft_agents_a365.observability.core.guardrail_target_type import ( + GuardrailTargetType, +) +from microsoft_agents_a365.observability.core.opentelemetry_scope import ( + OpenTelemetryScope, +) +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import SpanKind + + +class TestApplyGuardrailScope(unittest.TestCase): + """Unit tests for ApplyGuardrailScope.""" + + @classmethod + def setUpClass(cls): + """Set up test environment once for all tests.""" + os.environ["ENABLE_A365_OBSERVABILITY"] = "true" + configure( + service_name="test-guardrail-service", + service_namespace="test-namespace", + ) + cls.agent_details = AgentDetails( + agent_id="test-agent-123", + agent_name="Test Agent", + agent_description="A test agent", + ) + + def setUp(self): + super().setUp() + _telemetry_manager._tracer_provider = None + _telemetry_manager._span_processors = {} + OpenTelemetryScope._tracer = None + + configure( + service_name="test-guardrail-service", + service_namespace="test-namespace", + ) + + self.span_exporter = InMemorySpanExporter() + tracer_provider = get_tracer_provider() + tracer_provider.add_span_processor(SimpleSpanProcessor(self.span_exporter)) + + def tearDown(self): + super().tearDown() + self.span_exporter.clear() + + def _get_attributes(self, span): + """Helper to get span attributes as a dict.""" + return dict(span.attributes) if span.attributes else {} + + def test_start_sets_required_attributes(self): + """Test that Start sets required attributes.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.DENY, + ) + + with ApplyGuardrailScope.start(details, self.agent_details): + pass + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + attrs = self._get_attributes(spans[0]) + self.assertEqual(attrs[GEN_AI_OPERATION_NAME_KEY], APPLY_GUARDRAIL_OPERATION_NAME) + self.assertEqual(attrs[SECURITY_DECISION_TYPE_KEY], "deny") + self.assertEqual(attrs[SECURITY_TARGET_TYPE_KEY], GuardrailTargetType.LLM_INPUT) + + def test_start_sets_guardian_attributes(self): + """Test that guardian attributes are set when provided.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_OUTPUT, + decision_type=GuardrailDecisionType.ALLOW, + guardian_name="PII Filter", + guardian_id="guard_abc123", + guardian_provider_name="azure.ai.content_safety", + guardian_version="2.1.0", + ) + + with ApplyGuardrailScope.start(details, self.agent_details): + pass + + spans = self.span_exporter.get_finished_spans() + attrs = self._get_attributes(spans[0]) + self.assertEqual(attrs[GUARDIAN_NAME_KEY], "PII Filter") + self.assertEqual(attrs[GUARDIAN_ID_KEY], "guard_abc123") + self.assertEqual(attrs[GUARDIAN_PROVIDER_NAME_KEY], "azure.ai.content_safety") + self.assertEqual(attrs[GUARDIAN_VERSION_KEY], "2.1.0") + + def test_start_sets_policy_attributes(self): + """Test that policy attributes are set when provided.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.TOOL_CALL, + decision_type=GuardrailDecisionType.MODIFY, + policy_id="policy_pii_v2", + policy_name="PII Protection Policy", + policy_version="1.0", + ) + + with ApplyGuardrailScope.start(details, self.agent_details): + pass + + spans = self.span_exporter.get_finished_spans() + attrs = self._get_attributes(spans[0]) + self.assertEqual(attrs[SECURITY_POLICY_ID_KEY], "policy_pii_v2") + self.assertEqual(attrs[SECURITY_POLICY_NAME_KEY], "PII Protection Policy") + self.assertEqual(attrs[SECURITY_POLICY_VERSION_KEY], "1.0") + + def test_start_sets_content_attributes(self): + """Test that content attributes are set when provided.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.ALLOW, + content_input_hash="sha256:abc123", + content_modified=True, + external_event_id="ext-001", + ) + + with ApplyGuardrailScope.start(details, self.agent_details): + pass + + spans = self.span_exporter.get_finished_spans() + attrs = self._get_attributes(spans[0]) + self.assertEqual(attrs[SECURITY_CONTENT_INPUT_HASH_KEY], "sha256:abc123") + self.assertEqual(attrs[SECURITY_CONTENT_MODIFIED_KEY], True) + self.assertEqual(attrs[SECURITY_EXTERNAL_EVENT_ID_KEY], "ext-001") + + def test_start_builds_activity_name_with_guardian_name(self): + """Test activity name includes guardian name when provided.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.DENY, + guardian_name="Azure Content Safety", + ) + + with ApplyGuardrailScope.start(details, self.agent_details): + pass + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(spans[0].name, "apply_guardrail Azure Content Safety llm_input") + + def test_start_builds_activity_name_without_guardian_name(self): + """Test activity name without guardian name.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.TOOL_CALL, + decision_type=GuardrailDecisionType.ALLOW, + ) + + with ApplyGuardrailScope.start(details, self.agent_details): + pass + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(spans[0].name, "apply_guardrail tool_call") + + def test_record_decision_updates_decision_type(self): + """Test that RecordDecision updates the decision type.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.ALLOW, + ) + + with ApplyGuardrailScope.start(details, self.agent_details) as scope: + scope.record_decision(GuardrailDecisionType.DENY, "Content blocked") + + spans = self.span_exporter.get_finished_spans() + attrs = self._get_attributes(spans[0]) + self.assertEqual(attrs[SECURITY_DECISION_TYPE_KEY], "deny") + self.assertEqual(attrs[SECURITY_DECISION_REASON_KEY], "Content blocked") + + def test_record_content_output_sets_output_value(self): + """Test that RecordContentOutput sets the output value.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_OUTPUT, + decision_type=GuardrailDecisionType.MODIFY, + ) + + with ApplyGuardrailScope.start(details, self.agent_details) as scope: + scope.record_content_output("sanitized content") + + spans = self.span_exporter.get_finished_spans() + attrs = self._get_attributes(spans[0]) + self.assertEqual(attrs[SECURITY_CONTENT_OUTPUT_VALUE_KEY], "sanitized content") + + def test_record_finding_adds_event_with_attributes(self): + """Test that RecordFinding adds an event with correct attributes.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.DENY, + ) + + finding = GuardrailFinding( + risk_category="hate_speech", + risk_severity=GuardrailRiskSeverity.HIGH, + policy_decision_type="deny", + policy_id="policy-abc", + risk_score=0.95, + risk_metadata=['{"category":"hate","confidence":0.95}'], + ) + + with ApplyGuardrailScope.start(details, self.agent_details) as scope: + scope.record_finding(finding) + + spans = self.span_exporter.get_finished_spans() + events = spans[0].events + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, SECURITY_FINDING_EVENT_NAME) + + event_attrs = dict(event.attributes) + self.assertEqual(event_attrs[SECURITY_RISK_CATEGORY_KEY], "hate_speech") + self.assertEqual(event_attrs[SECURITY_RISK_SEVERITY_KEY], GuardrailRiskSeverity.HIGH) + self.assertEqual(event_attrs[SECURITY_POLICY_DECISION_TYPE_KEY], "deny") + self.assertEqual(event_attrs[SECURITY_POLICY_ID_KEY], "policy-abc") + self.assertEqual(event_attrs[SECURITY_RISK_SCORE_KEY], 0.95) + + def test_record_finding_multiple_findings(self): + """Test that multiple findings can be recorded.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.DENY, + ) + + with ApplyGuardrailScope.start(details, self.agent_details) as scope: + scope.record_finding(GuardrailFinding("hate_speech", GuardrailRiskSeverity.HIGH)) + scope.record_finding(GuardrailFinding("pii", GuardrailRiskSeverity.MEDIUM)) + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(len(spans[0].events), 2) + + def test_record_finding_raises_on_none(self): + """Test that RecordFinding raises on None.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.ALLOW, + ) + + with self.assertRaises(ValueError): + with ApplyGuardrailScope.start(details, self.agent_details) as scope: + scope.record_finding(None) + + def test_start_sets_request_context(self): + """Test that request context attributes are set when provided.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.ALLOW, + ) + + request = Request( + content="test input", + conversation_id="conv-123", + channel=Channel(name="msteams", link="https://test.link"), + ) + + with ApplyGuardrailScope.start(details, self.agent_details, request=request): + pass + + spans = self.span_exporter.get_finished_spans() + attrs = self._get_attributes(spans[0]) + self.assertEqual(attrs[SECURITY_CONTENT_INPUT_VALUE_KEY], "test input") + self.assertEqual(attrs[GEN_AI_CONVERSATION_ID_KEY], "conv-123") + self.assertEqual(attrs[CHANNEL_NAME_KEY], "msteams") + self.assertEqual(attrs[CHANNEL_LINK_KEY], "https://test.link") + + def test_span_kind_is_internal(self): + """Test that the span kind defaults to INTERNAL.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.ALLOW, + ) + + with ApplyGuardrailScope.start(details, self.agent_details): + pass + + spans = self.span_exporter.get_finished_spans() + self.assertEqual(spans[0].kind, SpanKind.INTERNAL) + + def test_all_decision_types(self): + """Test all guardrail decision types are serialized correctly.""" + for decision_type in GuardrailDecisionType: + self.span_exporter.clear() + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=decision_type, + ) + with ApplyGuardrailScope.start(details, self.agent_details): + pass + + spans = self.span_exporter.get_finished_spans() + attrs = self._get_attributes(spans[0]) + self.assertEqual(attrs[SECURITY_DECISION_TYPE_KEY], decision_type.value) + + def test_request_content_serializes_structured_input(self): + """Test that non-string request content is JSON-serialized.""" + details = GuardrailDetails( + target_type=GuardrailTargetType.LLM_INPUT, + decision_type=GuardrailDecisionType.ALLOW, + ) + + # Use a list of strings (InputMessagesParam allows list[str]) + request = Request( + content=["hello", "world"], + conversation_id="conv-456", + ) + + with ApplyGuardrailScope.start(details, self.agent_details, request=request): + pass + + spans = self.span_exporter.get_finished_spans() + attrs = self._get_attributes(spans[0]) + # Should be JSON-serialized since it's not a plain string + self.assertEqual(attrs[SECURITY_CONTENT_INPUT_VALUE_KEY], '["hello", "world"]') + + +if __name__ == "__main__": + unittest.main()