Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/microsoft/opentelemetry/_distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ def _initialize_sdkstats(enable_azure_monitor: bool) -> None:
manager.initialize(config)

# Since azure monitor is disabled, the cikey is not applicable.
_StatsbeatMetrics._COMMON_ATTRIBUTES["cikey"] = "n/a" # pylint: disable=protected-access
_StatsbeatMetrics._COMMON_ATTRIBUTES["cikey"] = "N/A" # pylint: disable=protected-access

# Register distro-owned network gauge on the manager's MeterProvider.
from microsoft.opentelemetry._sdkstats._network_metrics import (
Expand Down
97 changes: 18 additions & 79 deletions src/microsoft/opentelemetry/_sdkstats/_network_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
# license information.
# --------------------------------------------------------------------------

"""Distro-owned network SDKStats gauges.
"""Distro-owned network SDKStats observations.

The upstream statsbeat metrics only count requests sent to the Breeze
endpoint (the Azure Monitor exporter's destination). This distro also
ships OTLP and Agent365 exporters, whose per-export success counters
live in the distro's own ``_REQUESTS_MAP``. This module registers an
observable gauge on the upstream ``StatsbeatManager``'s ``MeterProvider``
so those counters are exported on the same statsbeat pipeline.
endpoint. This distro also ships OTLP and Agent365 exporters whose
per-export counters live in the distro's own ``_REQUESTS_MAP``. We
contribute extra rows to the upstream ``Request_Success_Count`` metric
via ``add_metric_callback`` so the backend treats them as part of the
same metric stream (same name + InstrumentationScope).
"""

from __future__ import annotations

import logging
import threading
from typing import Iterable, List

from opentelemetry.metrics import CallbackOptions, Observation
Expand All @@ -28,6 +27,7 @@
)

try:
from azure.monitor.opentelemetry.exporter._constants import _REQ_SUCCESS_NAME
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat_metrics import (
_StatsbeatMetrics,
)
Expand All @@ -36,9 +36,6 @@

logger = logging.getLogger(__name__)

_REGISTER_LOCK = threading.Lock()
_registered = False


def _get_common_attributes() -> dict:
if _StatsbeatMetrics is None:
Expand All @@ -47,9 +44,8 @@ def _get_common_attributes() -> dict:


def _observe_request_success_count(options: CallbackOptions) -> Iterable[Observation]:
"""Drain the per-endpoint success counts and emit one observation each."""
"""Drain per-endpoint success counts and emit one observation each."""
common = _get_common_attributes()

observations: List[Observation] = []
for key, value in drain(REQUEST_SUCCESS_NAME).items():
attributes = dict(common)
Expand All @@ -60,70 +56,13 @@ def _observe_request_success_count(options: CallbackOptions) -> Iterable[Observa
return observations


def register_network_gauges() -> bool:
"""Attach distro network-stats callbacks to upstream's gauges.

The distro emits per-endpoint ``Request_Success_Count`` observation
via the upstream statsbeat pipeline. We cannot create separate gauges
with the same names because the stats backend identifies metric streams by
InstrumentationScope, and rows from an unknown scope are silently
dropped. Instead we append our callbacks to the already-registered
upstream ``_success_count`` observable gauges
so our observations are emitted on the exact same instrument/scope
as upstream's breeze rows.

Idempotent — subsequent calls are no-ops. Returns ``True`` on the
call that performs registration, ``False`` if registration was
skipped (already registered, upstream unavailable, or upstream
hasn't created the gauges yet).
"""
global _registered # pylint: disable=global-statement

with _REGISTER_LOCK:
if _registered:
return False

try:
from azure.monitor.opentelemetry.exporter.statsbeat._manager import (
StatsbeatManager,
)
except ImportError:
logger.debug("Upstream statsbeat unavailable; skipping network gauges.")
return False

manager = StatsbeatManager()
meter_provider = manager._meter_provider # pylint: disable=protected-access
metrics = manager._metrics # pylint: disable=protected-access
if meter_provider is None or metrics is None:
logger.debug("StatsbeatManager not initialised; skipping network gauges.")
return False

attached: List[str] = []
for gauge_attr, callback in (
("_success_count", _observe_request_success_count),
):
gauge = getattr(metrics, gauge_attr, None)
if gauge is None:
logger.debug("Upstream %s gauge not yet created; skipping.", gauge_attr)
continue
try:
gauge._callbacks.append(callback) # pylint: disable=protected-access
except AttributeError:
logger.debug(
"Upstream %s gauge has no _callbacks list; cannot attach.", gauge_attr,
)
continue
attached.append(gauge_attr)

if not attached:
return False

_registered = True
return True


def _reset_for_tests() -> None:
"""Reset the module-level registration guard. Test-only."""
global _registered # pylint: disable=global-statement
with _REGISTER_LOCK:
_registered = False
def register_network_gauges():
try:
from azure.monitor.opentelemetry.exporter.statsbeat._manager import StatsbeatManager # type: ignore[import-not-found]
except ImportError:
logger.debug("Upstream statsbeat unavailable; skipping network gauges.")
Comment thread
rads-1996 marked this conversation as resolved.
return
manager = StatsbeatManager()
return manager.add_metric_callback(_REQ_SUCCESS_NAME[0], _observe_request_success_count)


53 changes: 40 additions & 13 deletions tests/test_sdkstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,21 @@ def _reset_upstream_singleton():


def _reset_network_metrics_guard():
from microsoft.opentelemetry._sdkstats import _network_metrics
"""Clear upstream's additional-callbacks store so tests can re-register."""
try:
from azure.monitor.opentelemetry.exporter.statsbeat import _manager as _upstream_manager
except ImportError:
return

_network_metrics._reset_for_tests()
callbacks = getattr(_upstream_manager, "_ADDITIONAL_CALLBACKS", None)
lock = getattr(_upstream_manager, "_ADDITIONAL_CALLBACKS_LOCK", None)
if callbacks is None:
return
if lock is not None:
with lock:
callbacks.clear()
else:
callbacks.clear()


class TestSdkStatsEnabled(unittest.TestCase):
Expand Down Expand Up @@ -307,29 +319,44 @@ def tearDown(self):
_reset_upstream_singleton()
_reset_network_metrics_guard()

def test_returns_false_when_manager_has_no_meter_provider(self):
def test_returns_true_then_false_on_repeat(self):
from microsoft.opentelemetry._sdkstats._network_metrics import (
register_network_gauges,
)

self.assertTrue(register_network_gauges())
self.assertFalse(register_network_gauges())

def test_returns_true_then_false_on_repeat(self):
from azure.monitor.opentelemetry.exporter.statsbeat._manager import (
StatsbeatManager,
)
from microsoft.opentelemetry._sdkstats._config import (
_build_default_sdkstats_config,
def test_registers_callback_under_request_success_metric_name(self):
from azure.monitor.opentelemetry.exporter._constants import _REQ_SUCCESS_NAME
from azure.monitor.opentelemetry.exporter.statsbeat import _manager as _upstream_manager
from microsoft.opentelemetry._sdkstats._network_metrics import (
_observe_request_success_count,
register_network_gauges,
)

register_network_gauges()
callbacks = _upstream_manager._ADDITIONAL_CALLBACKS.get(_REQ_SUCCESS_NAME[0], [])
self.assertIn(_observe_request_success_count, callbacks)

def test_returns_none_when_upstream_unavailable(self):
# Simulate upstream import failure by patching the import in
# ``register_network_gauges``'s try block.
import builtins

from microsoft.opentelemetry._sdkstats._network_metrics import (
register_network_gauges,
)

config = _build_default_sdkstats_config()
self.assertTrue(StatsbeatManager().initialize(config))
real_import = builtins.__import__

self.assertTrue(register_network_gauges())
self.assertFalse(register_network_gauges())
def fake_import(name, *args, **kwargs):
if name == "azure.monitor.opentelemetry.exporter.statsbeat._manager":
raise ImportError("simulated")
return real_import(name, *args, **kwargs)

with patch("builtins.__import__", side_effect=fake_import):
self.assertIsNone(register_network_gauges())


class TestObserveRequestSuccessCount(unittest.TestCase):
Expand Down
Loading