diff --git a/cmd/sentinel/main.go b/cmd/sentinel/main.go index c7291e0..c6e8713 100755 --- a/cmd/sentinel/main.go +++ b/cmd/sentinel/main.go @@ -6,13 +6,16 @@ import ( "net/http" "os" "os/signal" + "strconv" "strings" "syscall" "time" + "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/telemetry" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" + "go.opentelemetry.io/otel/sdk/trace" "github.com/openshift-hyperfleet/hyperfleet-broker/broker" "github.com/openshift-hyperfleet/hyperfleet-sentinel/internal/client" @@ -143,6 +146,15 @@ func initLogging(flagLevel, flagFormat, flagOutput string) (*logger.LogConfig, e cfg.Output = output } + // TRACING_ENABLED=true enables tracing + if otelEnabled := os.Getenv("TRACING_ENABLED"); otelEnabled != "" { + enabled, err := strconv.ParseBool(otelEnabled) + if err != nil { + return nil, fmt.Errorf("invalid TRACING_ENABLED value %q: %w", otelEnabled, err) + } + cfg.OTel.Enabled = enabled + } + // Set global config so all loggers use the same configuration logger.SetGlobalConfig(cfg) @@ -154,6 +166,31 @@ func runServe(cfg *config.SentinelConfig, logCfg *logger.LogConfig, healthBindAd ctx := context.Background() log := logger.NewHyperFleetLoggerWithConfig(logCfg) + serviceName := "hyperfleet-sentinel" + // Use OTEL_SERVICE_NAME if set, otherwise default + if envServiceName := os.Getenv("OTEL_SERVICE_NAME"); envServiceName != "" { + serviceName = envServiceName + } + + var tp *trace.TracerProvider + if logCfg.OTel.Enabled { + traceProvider, err := telemetry.InitTraceProvider(ctx, serviceName, version) + if err != nil { + log.Extra("error", err).Warn(ctx, "Failed to initialize OpenTelemetry") + } else { + tp = traceProvider + defer func() { + otelShutdownCtx, otelShutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer otelShutdownCancel() + if err := telemetry.Shutdown(otelShutdownCtx, tp); err != nil { + log.Extra("error", err).Error(otelShutdownCtx, "Failed to shutdown OpenTelemetry") + } + }() + } + } else { + log.Extra("tracing_enabled", false).Info(ctx, "OpenTelemetry disabled") + } + log.Extra("commit", commit). Extra("log_level", logCfg.Level.String()). Extra("log_format", logCfg.Format.String()). diff --git a/configs/dev-example.yaml b/configs/dev-example.yaml index 5111b3d..cdccca0 100644 --- a/configs/dev-example.yaml +++ b/configs/dev-example.yaml @@ -26,6 +26,12 @@ poll_interval: 2s max_age_not_ready: 5s max_age_ready: 2m +# Messaging system type for OpenTelemetry tracing attributes. +# Should match the actual message broker being used for accurate observability. +# Valid values: rabbitmq, gcp_pubsub, kafka, activemq, aws_sqs, etc. +# Can be overridden via MESSAGING_SYSTEM environment variable. +messaging_system: "rabbitmq" + # No resource selector - watch all resources in development. # resource_selector: [] diff --git a/configs/gcp-pubsub-example.yaml b/configs/gcp-pubsub-example.yaml index 2ebf6fc..6f957a3 100644 --- a/configs/gcp-pubsub-example.yaml +++ b/configs/gcp-pubsub-example.yaml @@ -26,6 +26,12 @@ max_age_not_ready: 10s # Stable resources are checked less frequently to reduce API load. max_age_ready: 30m +# Messaging system type for OpenTelemetry tracing attributes. +# Should match the actual message broker being used for accurate observability. +# Valid values: rabbitmq, gcp_pubsub, kafka, activemq, aws_sqs, etc. +# Can be overridden via MESSAGING_SYSTEM environment variable. +messaging_system: "gcp_pubsub" + # Resource selector (optional) - filter resources by labels. # If empty or omitted, all resources of the specified type are watched. # This enables horizontal scaling by having multiple Sentinels watch different resource subsets. diff --git a/configs/rabbitmq-example.yaml b/configs/rabbitmq-example.yaml index d194330..ff50c3a 100644 --- a/configs/rabbitmq-example.yaml +++ b/configs/rabbitmq-example.yaml @@ -26,6 +26,12 @@ max_age_not_ready: 10s # Stable resources are checked less frequently to reduce API load. max_age_ready: 30m +# Messaging system type for OpenTelemetry tracing attributes. +# Should match the actual message broker being used for accurate observability. +# Valid values: rabbitmq, gcp_pubsub, kafka, activemq, aws_sqs, etc. +# Can be overridden via MESSAGING_SYSTEM environment variable. +messaging_system: "rabbitmq" + # Resource selector (optional) - filter resources by labels. # If empty or omitted, all resources of the specified type are watched. # This enables horizontal scaling by having multiple Sentinels watch different resource subsets. diff --git a/go.mod b/go.mod index 2d89e20..c6d164d 100755 --- a/go.mod +++ b/go.mod @@ -14,6 +14,12 @@ require ( github.com/spf13/viper v1.21.0 github.com/testcontainers/testcontainers-go v0.40.0 github.com/testcontainers/testcontainers-go/modules/rabbitmq v0.40.0 + go.opentelemetry.io/otel v1.42.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.42.0 + go.opentelemetry.io/otel/sdk v1.42.0 + go.opentelemetry.io/otel/trace v1.42.0 ) require ( @@ -61,6 +67,7 @@ require ( github.com/google/s2a-go v0.1.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.12 // indirect github.com/googleapis/gax-go/v2 v2.17.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -107,16 +114,16 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.65.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 // indirect - go.opentelemetry.io/otel v1.40.0 // indirect - go.opentelemetry.io/otel/metric v1.40.0 // indirect - go.opentelemetry.io/otel/trace v1.40.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0 // indirect + go.opentelemetry.io/otel/metric v1.42.0 // indirect + go.opentelemetry.io/proto/otlp v1.9.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.1 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.48.0 // indirect - golang.org/x/net v0.50.0 // indirect + golang.org/x/net v0.51.0 // indirect golang.org/x/oauth2 v0.35.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect @@ -126,7 +133,7 @@ require ( google.golang.org/genproto v0.0.0-20260209200024-4cfbd4190f57 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect - google.golang.org/grpc v1.78.0 // indirect + google.golang.org/grpc v1.79.2 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 14d525a..5532d02 100755 --- a/go.sum +++ b/go.sum @@ -49,8 +49,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cloudevents/sdk-go/v2 v2.16.2 h1:ZYDFrYke4FD+jM8TZTJJO6JhKHzOQl2oqpFK1D+NnQM= github.com/cloudevents/sdk-go/v2 v2.16.2/go.mod h1:laOcGImm4nVJEU+PHnUrKL56CKmRL65RlQF0kRmW/kg= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f h1:Y8xYupdHxryycyPlc9Y+bSQAYZnetRJ70VMVKm5CKI0= -github.com/cncf/xds/go v0.0.0-20251022180443-0feb69152e9f/go.mod h1:HlzOvOjVBOfTGSRXRyY0OiCS/3J1akRGQQpRO/7zyF4= +github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5 h1:6xNmx7iTtyBRev0+D/Tv1FZd4SCg8axKApyNyRsAt/w= +github.com/cncf/xds/go v0.0.0-20251210132809-ee656c7534f5/go.mod h1:KdCmV+x/BuvyMxRnYBlmVaq4OLiKW6iRQfvC62cvdkI= github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= @@ -80,12 +80,12 @@ github.com/ebitengine/purego v0.9.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= -github.com/envoyproxy/go-control-plane v0.13.5-0.20251024222203-75eaa193e329 h1:K+fnvUM0VZ7ZFJf0n4L/BRlnsb9pL/GuDG6FqaH+PwM= -github.com/envoyproxy/go-control-plane/envoy v1.35.0 h1:ixjkELDE+ru6idPxcHLj8LBVc2bFP7iBytj353BoHUo= -github.com/envoyproxy/go-control-plane/envoy v1.35.0/go.mod h1:09qwbGVuSWWAyN5t/b3iyVfz5+z8QWGrzkoqm/8SbEs= +github.com/envoyproxy/go-control-plane v0.14.0 h1:hbG2kr4RuFj222B6+7T83thSPqLjwBIfQawTkC++2HA= +github.com/envoyproxy/go-control-plane/envoy v1.36.0 h1:yg/JjO5E7ubRyKX3m07GF3reDNEnfOboJ0QySbH736g= +github.com/envoyproxy/go-control-plane/envoy v1.36.0/go.mod h1:ty89S1YCCVruQAm9OtKeEkQLTb+Lkz0k8v9W0Oxsv98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8= -github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU= +github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4= +github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -139,8 +139,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.12 h1:Fg+zsqzYEs1Znvmczt github.com/googleapis/enterprise-certificate-proxy v0.3.12/go.mod h1:vqVt9yG9480NtzREnTlmGSBmFrA+bzb0yl0TxoBQXOg= github.com/googleapis/gax-go/v2 v2.17.0 h1:RksgfBpxqff0EZkDWYuz9q/uWsTVz+kf43LsZ1J6SMc= github.com/googleapis/gax-go/v2 v2.17.0/go.mod h1:mzaqghpQp4JDh3HvADwrat+6M3MOIDp5YKHhb9PAgDY= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.8 h1:NpbJl/eVbvrGE0MJ6X16X9SAifesl6Fwxg/YmCvubRI= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.8/go.mod h1:mi7YA+gCzVem12exXy46ZespvGtX/lZmD/RLnQhVW7U= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -283,22 +283,26 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.6 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.65.0/go.mod h1:KDgtbWKTQs4bM+VPUr6WlL9m/WXcmkCcBlIzqxPGzmI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 h1:7iP2uCb7sGddAr30RRS6xjKy7AZ2JtTOPA3oolgVSw8= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0/go.mod h1:c7hN3ddxs/z6q9xwvfLPk+UHlWRQyaeR1LdgfL/66l0= -go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= -go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= -go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= -go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= -go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= -go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= -go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= -go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= -go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= -go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= -go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= -go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= +go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= +go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0 h1:THuZiwpQZuHPul65w4WcwEnkX2QIuMT+UFoOrygtoJw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.42.0/go.mod h1:J2pvYM5NGHofZ2/Ru6zw/TNWnEQp5crgyDeSrYpXkAw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0 h1:zWWrB1U6nqhS/k6zYB74CjRpuiitRtLLi68VcgmOEto= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.42.0/go.mod h1:2qXPNBX1OVRC0IwOnfo1ljoid+RD0QK3443EaqVlsOU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0 h1:uLXP+3mghfMf7XmV4PkGfFhFKuNWoCvvx5wP/wOXo0o= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.42.0/go.mod h1:v0Tj04armyT59mnURNUJf7RCKcKzq+lgJs6QSjHjaTc= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.42.0 h1:s/1iRkCKDfhlh1JF26knRneorus8aOwVIDhvYx9WoDw= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.42.0/go.mod h1:UI3wi0FXg1Pofb8ZBiBLhtMzgoTm1TYkMvn71fAqDzs= +go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= +go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI= +go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo= +go.opentelemetry.io/otel/sdk v1.42.0/go.mod h1:rGHCAxd9DAph0joO4W6OPwxjNTYWghRWmkHuGbayMts= +go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA= +go.opentelemetry.io/otel/sdk/metric v1.42.0/go.mod h1:Ua6AAlDKdZ7tdvaQKfSmnFTdHx37+J4ba8MwVCYM5hc= +go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= +go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -325,8 +329,8 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= -golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= +golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= +golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ= golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= @@ -379,8 +383,8 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= -google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU= +google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/internal/client/client.go b/internal/client/client.go index 511462f..39a1d6a 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -13,6 +13,7 @@ import ( "github.com/cenkalti/backoff/v5" "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/api/openapi" "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/logger" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) // Retry configuration constants @@ -49,7 +50,8 @@ type HyperFleetClient struct { // NewHyperFleetClient creates a new HyperFleet API client using OpenAPI-generated client func NewHyperFleetClient(endpoint string, timeout time.Duration) (*HyperFleetClient, error) { httpClient := &http.Client{ - Timeout: timeout, + Timeout: timeout, + Transport: otelhttp.NewTransport(http.DefaultTransport), } client, err := openapi.NewClientWithResponses(endpoint, openapi.WithHTTPClient(httpClient)) diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 6d52e4b..5af9b8e 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -9,6 +9,10 @@ import ( "strings" "testing" "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" ) // createMockCluster creates a mock cluster response with all required fields @@ -752,3 +756,180 @@ func newTestClient(t *testing.T, url string, timeout time.Duration) *HyperFleetC } return client } + +func TestNewHyperFleetClient_HTTPInstrumentation(t *testing.T) { + // Capture the previous global tracer provider + previousProvider := otel.GetTracerProvider() + + // Setup in-memory trace exporter + exporter := tracetest.NewInMemoryExporter() + tp := trace.NewTracerProvider( + trace.WithSampler(trace.AlwaysSample()), + trace.WithBatcher(exporter), + ) + otel.SetTracerProvider(tp) + defer func(tp *trace.TracerProvider, ctx context.Context) { + err := tp.Shutdown(ctx) + if err != nil { + t.Errorf("Error shutting down tracer: %v", err) + } + // Restore the previous global tracer provider + otel.SetTracerProvider(previousProvider) + }(tp, context.Background()) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Errorf("Expected GET request, got %s", r.Method) + } + if r.URL.Path != "/api/hyperfleet/v1/clusters" { + t.Errorf("Expected path /api/hyperfleet/v1/clusters, got %s", r.URL.Path) + } + + cluster := createMockCluster("test-cluster-1") + response := createMockClusterList([]map[string]interface{}{cluster}) + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + t.Errorf("Failed to encode response: %v", err) + } + })) + defer server.Close() + + client, err := NewHyperFleetClient(server.URL, 10*time.Second) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + // Verify client was created + if client == nil { + t.Fatal("Expected client to be created") + } + + // Make actual HTTP call to test instrumentation + ctx := context.Background() + resources, err := client.FetchResources(ctx, ResourceTypeClusters, nil) + + // Verify functional behavior + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + if len(resources) != 1 { + t.Fatalf("Expected 1 resource, got %d", len(resources)) + } + if resources[0].ID != "test-cluster-1" { + t.Errorf("Expected ID test-cluster-1, got %s", resources[0].ID) + } + + // Force flush to get spans + err = tp.ForceFlush(ctx) + if err != nil { + t.Fatalf("Error force flush: %v", err) + } + + // Verify HTTP spans were created by instrumentation + spans := exporter.GetSpans() + if len(spans) == 0 { + t.Fatal("Expected HTTP spans to be created, got none") + } + + // Look for HTTP GET span + var httpSpan *tracetest.SpanStub + for i := range spans { + if strings.Contains(spans[i].Name, "GET") { + httpSpan = &spans[i] + break + } + } + + if httpSpan == nil { + spanNames := make([]string, len(spans)) + for i, span := range spans { + spanNames[i] = span.Name + } + t.Fatalf("Expected HTTP GET span, got spans: %v", spanNames) + } + + // Verify HTTP span attributes (following OpenTelemetry HTTP conventions) + foundMethodAttr := false + foundURLAttr := false + for _, attr := range httpSpan.Attributes { + switch string(attr.Key) { + case "http.request.method", "http.method": // Different versions of OTel use different names + if attr.Value.AsString() == "GET" { + foundMethodAttr = true + } + case "url.full", "http.url": // Different versions of OTel use different names + if strings.Contains(attr.Value.AsString(), "/api/hyperfleet/v1/clusters") { + foundURLAttr = true + } + } + } + + if !foundMethodAttr { + t.Error("Expected HTTP method attribute in span") + } + if !foundURLAttr { + t.Error("Expected URL attribute in span") + } + + // Verify span completed successfully (no error status) + if httpSpan.Status.Code.String() == "ERROR" { + t.Errorf("Expected successful HTTP span, got error status: %s", httpSpan.Status.Description) + } +} + +func TestNewHyperFleetClient_HTTPInstrumentation_ErrorCase(t *testing.T) { + // Capture the previous global tracer provider + previousProvider := otel.GetTracerProvider() + + // Setup tracing + exporter := tracetest.NewInMemoryExporter() + tp := trace.NewTracerProvider( + trace.WithSampler(trace.AlwaysSample()), + trace.WithBatcher(exporter), + ) + otel.SetTracerProvider(tp) + defer func(tp *trace.TracerProvider, ctx context.Context) { + err := tp.Shutdown(ctx) + if err != nil { + t.Errorf("Error shutting down tracer: %v", err) + } + // Restore the previous global tracer provider + otel.SetTracerProvider(previousProvider) + }(tp, context.Background()) + + // Server returns 500 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, err := w.Write([]byte(`{"error": "internal server error"}`)) + if err != nil { + t.Errorf("Failed to write response: %v", err) + return + } + })) + defer server.Close() + + client, err := NewHyperFleetClient(server.URL, 10*time.Second) + if err != nil { + t.Fatalf("Failed to create client: %v", err) + } + + ctx := context.Background() + _, err = client.FetchResources(ctx, ResourceTypeClusters, nil) + + // Verify error behavior (like existing tests) + if err == nil { + t.Fatal("Expected error, got nil") + } + + // Verify spans still created on error + err = tp.ForceFlush(ctx) + if err != nil { + t.Fatalf("Error force flush: %v", err) + } + spans := exporter.GetSpans() + + if len(spans) == 0 { + t.Fatal("Expected HTTP spans even on error") + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 3aae158..eac15c8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -11,6 +11,10 @@ import ( "github.com/spf13/viper" ) +const ( + defaultMessagingSystem = "gcp_pubsub" +) + // LabelSelector represents a label key-value pair for resource filtering type LabelSelector struct { Label string `mapstructure:"label"` @@ -30,6 +34,7 @@ type SentinelConfig struct { HyperFleetAPI *HyperFleetAPIConfig `mapstructure:"hyperfleet_api"` MessageData map[string]interface{} `mapstructure:"message_data"` Topic string `mapstructure:"topic"` + MessagingSystem string `mapstructure:"messaging_system"` } // HyperFleetAPIConfig defines the HyperFleet API client configuration @@ -65,6 +70,7 @@ func NewSentinelConfig() *SentinelConfig { // Endpoint is required and must be set in config file Timeout: 5 * time.Second, }, + MessagingSystem: defaultMessagingSystem, } } @@ -108,6 +114,13 @@ func LoadConfig(configFile string) (*SentinelConfig, error) { cfg.Topic = topic } + if messagingSystem, ok := os.LookupEnv("MESSAGING_SYSTEM"); ok { + messagingSystem = strings.TrimSpace(messagingSystem) + if messagingSystem != "" { + cfg.MessagingSystem = messagingSystem + } + } + // Validate configuration if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("invalid config: %w", err) diff --git a/internal/sentinel/sentinel.go b/internal/sentinel/sentinel.go index f0dcf60..67e10ae 100644 --- a/internal/sentinel/sentinel.go +++ b/internal/sentinel/sentinel.go @@ -15,6 +15,9 @@ import ( "github.com/openshift-hyperfleet/hyperfleet-sentinel/internal/metrics" "github.com/openshift-hyperfleet/hyperfleet-sentinel/internal/payload" "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/logger" + "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/telemetry" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) // Sentinel polls the HyperFleet API and triggers reconciliation events @@ -27,7 +30,7 @@ type Sentinel struct { mu sync.RWMutex lastSuccessfulPoll time.Time - payloadBuilder *payload.Builder + payloadBuilder *payload.Builder } // NewSentinel creates a new sentinel @@ -93,6 +96,11 @@ func (s *Sentinel) Start(ctx context.Context) error { func (s *Sentinel) trigger(ctx context.Context) error { startTime := time.Now() + // span: sentinel.poll + ctx, pollSpan := telemetry.StartSpan(ctx, "sentinel.poll", + attribute.String("hyperfleet.resource_type", s.config.ResourceType)) + defer pollSpan.End() + // Get metric labels resourceType := s.config.ResourceType resourceSelector := metrics.GetResourceSelectorLabel(s.config.ResourceSelector) @@ -111,6 +119,8 @@ func (s *Sentinel) trigger(ctx context.Context) error { resources, err := s.client.FetchResources(ctx, client.ResourceType(s.config.ResourceType), labelSelector) if err != nil { // Record API error + pollSpan.RecordError(err) + pollSpan.SetStatus(codes.Error, "fetch resources failed") metrics.UpdateAPIErrorsMetric(resourceType, resourceSelector, "fetch_error") return fmt.Errorf("failed to fetch resources: %w", err) } @@ -125,14 +135,20 @@ func (s *Sentinel) trigger(ctx context.Context) error { // Evaluate each resource for i := range resources { resource := &resources[i] + // span: sentinel.evaluate + evalCtx, evalSpan := telemetry.StartSpan(ctx, "sentinel.evaluate", + attribute.String("hyperfleet.resource_type", s.config.ResourceType), + attribute.String("hyperfleet.resource_id", resource.ID), + ) decision := s.decisionEngine.Evaluate(resource, now) + evalSpan.SetAttributes(attribute.String("hyperfleet.decision_reason", decision.Reason)) if decision.ShouldPublish { pending++ // Add decision reason to context for structured logging - eventCtx := logger.WithDecisionReason(ctx, decision.Reason) + eventCtx := logger.WithDecisionReason(evalCtx, decision.Reason) eventData := s.buildEventData(eventCtx, resource, decision) @@ -144,17 +160,38 @@ func (s *Sentinel) trigger(ctx context.Context) error { event.SetID(uuid.New().String()) if err := event.SetData(cloudevents.ApplicationJSON, eventData); err != nil { s.logger.Errorf(eventCtx, "Failed to set event data resource_id=%s error=%v", resource.ID, err) + evalSpan.RecordError(err) + evalSpan.SetStatus(codes.Error, "set event data failed") + evalSpan.End() continue } + // span: publish (child of sentinel.evaluate) + publishCtx, publishSpan := telemetry.StartSpan(eventCtx, fmt.Sprintf("%s publish", topic), + attribute.String("messaging.system", s.config.MessagingSystem), + attribute.String("messaging.operation.type", "publish"), + attribute.String("messaging.destination.name", topic), + attribute.String("messaging.message.id", event.ID()), + ) + + if publishSpan.SpanContext().IsValid() { + telemetry.SetTraceContext(&event, publishSpan) + } + // Publish to broker using configured topic - if err := s.publisher.Publish(eventCtx, topic, &event); err != nil { + if err := s.publisher.Publish(publishCtx, topic, &event); err != nil { + publishSpan.RecordError(err) + publishSpan.SetStatus(codes.Error, "publish failed") // Record broker error metrics.UpdateBrokerErrorsMetric(resourceType, resourceSelector, "publish_error") - s.logger.Errorf(eventCtx, "Failed to publish event resource_id=%s error=%v", resource.ID, err) + s.logger.Errorf(publishCtx, "Failed to publish event resource_id=%s error=%v", resource.ID, err) + publishSpan.End() + evalSpan.End() continue } + publishSpan.End() + // Record successful event publication metrics.UpdateEventsPublishedMetric(resourceType, resourceSelector, decision.Reason) @@ -163,7 +200,7 @@ func (s *Sentinel) trigger(ctx context.Context) error { published++ } else { // Add decision reason to context for structured logging - skipCtx := logger.WithDecisionReason(ctx, decision.Reason) + skipCtx := logger.WithDecisionReason(evalCtx, decision.Reason) // Record skipped resource metrics.UpdateResourcesSkippedMetric(resourceType, resourceSelector, decision.Reason) @@ -172,6 +209,8 @@ func (s *Sentinel) trigger(ctx context.Context) error { resource.ID, resource.Status.Ready) skipped++ } + + evalSpan.End() } // Record pending resources count diff --git a/internal/sentinel/sentinel_test.go b/internal/sentinel/sentinel_test.go index ba11af9..321e404 100644 --- a/internal/sentinel/sentinel_test.go +++ b/internal/sentinel/sentinel_test.go @@ -17,6 +17,9 @@ import ( "github.com/openshift-hyperfleet/hyperfleet-sentinel/internal/metrics" "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/logger" "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" ) // createMockCluster creates a mock cluster response matching the new API spec. @@ -634,3 +637,146 @@ func TestTrigger_ContextPropagationToBroker(t *testing.T) { t.Errorf("span_id not propagated: got %v", spanID) } } + +func TestTrigger_CreatesRequiredSpans(t *testing.T) { + ctx := context.Background() + + // Setup in-memory trace exporter for span verification + exporter := tracetest.NewInMemoryExporter() + tp := trace.NewTracerProvider( + trace.WithSampler(trace.AlwaysSample()), + trace.WithBatcher(exporter), + ) + previousProvider := otel.GetTracerProvider() + otel.SetTracerProvider(tp) + defer func() { + if err := tp.Shutdown(ctx); err != nil { + t.Errorf("shutdown of tracer provider: %v", err) + } + otel.SetTracerProvider(previousProvider) + }() + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Cluster exceeds max age (31 minutes ago) - should trigger publishing + cluster := createMockCluster("cluster-1", 2, 2, true, time.Now().Add(-31*time.Minute)) + response := createMockClusterList([]map[string]interface{}{cluster}) + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + t.Logf("Error encoding response: %v", err) + } + })) + defer server.Close() + + hyperfleetClient, err := client.NewHyperFleetClient(server.URL, 10*time.Second) + if err != nil { + t.Fatalf("failed to create HyperFleet client: %v", err) + } + decisionEngine := engine.NewDecisionEngine(10*time.Second, 30*time.Minute) + + mockPublisher := &MockPublisher{} + log := logger.NewHyperFleetLogger() + + // Create metrics with a test registry + registry := prometheus.NewRegistry() + metrics.NewSentinelMetrics(registry, "test") + + cfg := &config.SentinelConfig{ + ResourceType: "clusters", + Topic: "hyperfleet-clusters", + MaxAgeNotReady: 10 * time.Second, + MaxAgeReady: 30 * time.Minute, + MessagingSystem: "rabbitmq", + } + + s, err := NewSentinel(cfg, hyperfleetClient, decisionEngine, mockPublisher, log) + if err != nil { + t.Fatalf("NewSentinel failed: %v", err) + } + + // Execute trigger + err = s.trigger(ctx) + if err != nil { + t.Fatalf("trigger failed: %v", err) + } + + // Force flush spans to exporter + err = tp.ForceFlush(ctx) + if err != nil { + t.Fatalf("force flush error: %v", err) + } + + // Verify spans were created + spans := exporter.GetSpans() + + // Build map of span names for easy checking + spanNames := make(map[string]bool) + for _, span := range spans { + spanNames[span.Name] = true + } + + // Verify required spans exist + requiredSpans := []string{ + "sentinel.poll", + "sentinel.evaluate", + "hyperfleet-clusters publish", + } + + for _, requiredSpan := range requiredSpans { + if !spanNames[requiredSpan] { + t.Errorf("Required span '%s' not found. Found spans: %v", requiredSpan, getSpanNames(spans)) + } + } + + // Verify we got the expected number of spans + // Should have: 1 poll + 1 evaluate + 1 publish = 3 spans minimum + if len(spans) < 3 { + t.Errorf("Expected at least 3 spans, got %d. Spans: %v", len(spans), getSpanNames(spans)) + } + + validateSpanAttribute(t, spans, "hyperfleet-clusters publish", "messaging.system", cfg.MessagingSystem) + validateSpanAttribute(t, spans, "hyperfleet-clusters publish", "messaging.operation.type", "publish") + validateSpanAttribute(t, spans, "hyperfleet-clusters publish", "messaging.destination.name", cfg.Topic) + + // Verify the CloudEvent was published (basic functional verification) + if len(mockPublisher.publishedEvents) != 1 { + t.Errorf("Expected 1 published event, got %d", len(mockPublisher.publishedEvents)) + } + + // Verify CloudEvent has traceparent extension + if len(mockPublisher.publishedEvents) > 0 { + event := mockPublisher.publishedEvents[0] + extensions := event.Extensions() + if traceparent, exists := extensions["traceparent"]; !exists { + t.Error("Expected CloudEvent to contain traceparent extension for trace propagation") + } else if traceparentStr, ok := traceparent.(string); !ok || len(traceparentStr) != 55 { + t.Errorf("Expected valid W3C traceparent format, got: %v", traceparent) + } + } +} + +func validateSpanAttribute(t *testing.T, spans []tracetest.SpanStub, spanName, attrKey, expectedValue string) { + for _, span := range spans { + if span.Name == spanName { + for _, attr := range span.Attributes { + if string(attr.Key) == attrKey { + if attr.Value.AsString() != expectedValue { + t.Errorf("Span '%s': expected %s=%s, got %s", spanName, attrKey, expectedValue, attr.Value.AsString()) + } + return + } + } + t.Errorf("Span '%s': attribute '%s' not found", spanName, attrKey) + return + } + } + t.Errorf("Span '%s' not found", spanName) +} + +// Helper function for span name extraction +func getSpanNames(spans []tracetest.SpanStub) []string { + names := make([]string, len(spans)) + for i, span := range spans { + names[i] = span.Name + } + return names +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index a5531d5..a764cac 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -43,6 +43,11 @@ const ( FormatJSON ) +// OTelConfig holds OpenTelemetry configuration +type OTelConfig struct { + Enabled bool `json:"enabled"` +} + // LogConfig holds the logging configuration type LogConfig struct { Level LogLevel @@ -51,6 +56,7 @@ type LogConfig struct { Component string Version string Hostname string + OTel OTelConfig } // HyperFleetLogger interface for structured logging @@ -98,6 +104,10 @@ func DefaultConfig() *LogConfig { Component: "sentinel", Version: "dev", Hostname: hostname, + OTel: OTelConfig{ + // TODO: Enable OTelConfig to true as a default standard post the rollout phase + Enabled: false, + }, } } @@ -253,11 +263,13 @@ func (l *logger) buildEntry(ctx context.Context, level LogLevel, message string) if txid, ok := ctx.Value(TxIDKey).(int64); ok { entry.TxID = txid } - if traceID, ok := ctx.Value(TraceIDCtxKey).(string); ok { - entry.TraceID = traceID - } - if spanID, ok := ctx.Value(SpanIDCtxKey).(string); ok { - entry.SpanID = spanID + if l.config.OTel.Enabled { + if traceID, ok := ctx.Value(TraceIDCtxKey).(string); ok { + entry.TraceID = traceID + } + if spanID, ok := ctx.Value(SpanIDCtxKey).(string); ok { + entry.SpanID = spanID + } } // Sentinel-specific fields @@ -548,7 +560,7 @@ type MockLoggerWithContext struct { func NewMockLogger() *MockLoggerWithContext { return &MockLoggerWithContext{ - CapturedLogs: &[]string{}, + CapturedLogs: &[]string{}, CapturedContexts: &[]context.Context{}, } } diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index 7f4a682..ff6485a 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -520,6 +520,7 @@ func TestLoggerCorrelationFields(t *testing.T) { Component: "test", Version: "1.0.0", Hostname: "testhost", + OTel: OTelConfig{Enabled: true}, } log := NewHyperFleetLoggerWithConfig(cfg) @@ -553,6 +554,7 @@ func TestLoggerSentinelFieldsTextFormat(t *testing.T) { Component: "sentinel", Version: "1.0.0", Hostname: "testhost", + OTel: OTelConfig{Enabled: true}, } log := NewHyperFleetLoggerWithConfig(cfg) diff --git a/pkg/telemetry/otel.go b/pkg/telemetry/otel.go new file mode 100644 index 0000000..8066f70 --- /dev/null +++ b/pkg/telemetry/otel.go @@ -0,0 +1,182 @@ +package telemetry + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/logger" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + oteltrace "go.opentelemetry.io/otel/trace" +) + +const ( + samplerAlwaysOn = "always_on" + samplerAlwaysOff = "always_off" + samplerTraceIDRatio = "traceidratio" + envOtelTracesSampler = "OTEL_TRACES_SAMPLER" + envOtelTracesSamplerArg = "OTEL_TRACES_SAMPLER_ARG" + envOtelExporterOtlpEndpoint = "OTEL_EXPORTER_OTLP_ENDPOINT" + envOtelExporterOtlpProtocol = "OTEL_EXPORTER_OTLP_PROTOCOL" + parentBasedTraceIDRatio = "parentbased_traceidratio" + parentBasedAlwaysOn = "parentbased_always_on" + parentBasedAlwaysOff = "parentbased_always_off" + defaultSamplingRate = 1.0 +) + +// InitTraceProvider initializes OpenTelemetry trace provider +func InitTraceProvider(ctx context.Context, serviceName, serviceVersion string) (*trace.TracerProvider, error) { + + var exporter trace.SpanExporter + var err error + + log := logger.NewHyperFleetLogger() + + if otlpEndpoint := os.Getenv(envOtelExporterOtlpEndpoint); otlpEndpoint != "" { + protocol := os.Getenv(envOtelExporterOtlpProtocol) + switch strings.ToLower(protocol) { + case "http", "http/protobuf": + exporter, err = otlptracehttp.New(ctx) + case "grpc", "": // Default to gRPC per standard + exporter, err = otlptracegrpc.New(ctx) + // Uses gRPC exporter (port 4317) following OpenTelemetry standards + // This is compatible with standard OTEL Collector configurations + default: + log.Warnf(ctx, "Unrecognized OTEL_EXPORTER_OTLP_PROTOCOL %q, using default grpc", protocol) + exporter, err = otlptracegrpc.New(ctx) + } + if err != nil { + log.Errorf(ctx, "Failed to create OTLP exporter (protocol=%s): %v", protocol, err) + return nil, fmt.Errorf("failed to create OTLP exporter (protocol=%s): %w", protocol, err) + } + } else { + // Create stdout exporter + exporter, err = stdouttrace.New( + stdouttrace.WithPrettyPrint(), // Formatted output + ) + if err != nil { + log.Errorf(ctx, "Failed to create OpenTelemetry stdout exporter: %v", err) + return nil, fmt.Errorf("failed to create OpenTelemetry stdout exporter: %w", err) + } + } + + // Create resource (service information) + res, err := resource.New(ctx, + resource.WithFromEnv(), // parse OTEL_RESOURCE_ATTRIBUTES + resource.WithAttributes( + semconv.ServiceNameKey.String(serviceName), + semconv.ServiceVersionKey.String(serviceVersion), + ), + ) + if err != nil { + if shutdownErr := exporter.Shutdown(ctx); shutdownErr != nil { + log.Warnf(ctx, "Failed to shutdown exporter: %v", shutdownErr) + } + log.Extra("service_name", serviceName).Extra("service_version", serviceVersion).Errorf(ctx, "Failed to create OpenTelemetry resource: %v", err) + return nil, fmt.Errorf("failed to create OTel resource: %w", err) + } + + var sampler trace.Sampler + samplerType := strings.ToLower(os.Getenv(envOtelTracesSampler)) + + switch samplerType { + case samplerAlwaysOn: + sampler = trace.AlwaysSample() + case samplerAlwaysOff: + sampler = trace.NeverSample() + case samplerTraceIDRatio: + sampler = trace.ParentBased(trace.TraceIDRatioBased(parseSamplingRate(ctx, log))) + case parentBasedTraceIDRatio, "": + // Default per tracing standard + sampler = trace.ParentBased(trace.TraceIDRatioBased(parseSamplingRate(ctx, log))) + case parentBasedAlwaysOn: + sampler = trace.ParentBased(trace.AlwaysSample()) + case parentBasedAlwaysOff: + sampler = trace.ParentBased(trace.NeverSample()) + default: + log.Warnf(ctx, "Unrecognized sampler %q, using default", samplerType) + sampler = trace.ParentBased(trace.TraceIDRatioBased(parseSamplingRate(ctx, log))) + } + + // Create trace provider + tp := trace.NewTracerProvider( + trace.WithBatcher(exporter), + trace.WithResource(res), + trace.WithSampler(sampler), + ) + + // Set global trace provider + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + return tp, nil +} + +// Shutdown gracefully shuts down the trace provider +func Shutdown(ctx context.Context, tp *trace.TracerProvider) error { + if tp == nil { + return nil + } + return tp.Shutdown(ctx) +} + +// StartSpan starts a span and enriches context with trace/span IDs for logging +func StartSpan(ctx context.Context, spanName string, attrs ...attribute.KeyValue) (context.Context, oteltrace.Span) { + tracer := otel.Tracer("hyperfleet-sentinel") + ctx, span := tracer.Start(ctx, spanName) + + // Add attributes if provided + if len(attrs) > 0 { + span.SetAttributes(attrs...) + } + + // Enrich context with trace/span IDs for logging + if span.SpanContext().IsValid() { + traceID := span.SpanContext().TraceID().String() + spanID := span.SpanContext().SpanID().String() + ctx = logger.WithTraceID(ctx, traceID) + ctx = logger.WithSpanID(ctx, spanID) + } + + return ctx, span +} + +// SetTraceContext adds W3C traceParent extension to CloudEvent for distributed tracing +func SetTraceContext(event *cloudevents.Event, span oteltrace.Span) { + if event == nil || span == nil { + return + } + if span.SpanContext().IsValid() { + traceParent := fmt.Sprintf("00-%s-%s-%02x", + span.SpanContext().TraceID().String(), + span.SpanContext().SpanID().String(), + uint8(span.SpanContext().TraceFlags())) + event.SetExtension("traceparent", traceParent) + } +} + +// Helper to parse sampling rate from env +func parseSamplingRate(ctx context.Context, log logger.HyperFleetLogger) float64 { + rate := defaultSamplingRate + if arg := os.Getenv(envOtelTracesSamplerArg); arg != "" { + if parsedRate, err := strconv.ParseFloat(arg, 64); err == nil && parsedRate >= 0.0 && parsedRate <= 1.0 { + rate = parsedRate + } else { + log.Warnf(ctx, "Invalid %s value=%q, using default: %v", envOtelTracesSamplerArg, arg, rate) + } + } + return rate +} diff --git a/pkg/telemetry/otel_test.go b/pkg/telemetry/otel_test.go new file mode 100644 index 0000000..a61c8ad --- /dev/null +++ b/pkg/telemetry/otel_test.go @@ -0,0 +1,333 @@ +package telemetry + +import ( + "context" + "os" + "testing" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/logger" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + oteltrace "go.opentelemetry.io/otel/trace" +) + +func TestInitTraceProvider_StdoutExporter(t *testing.T) { + ctx := context.Background() + + // Test stdout exporter (default) + tp, err := InitTraceProvider(ctx, "test-service", "v1.0.0") + if err != nil { + t.Fatalf("Failed to initialize trace provider: %v", err) + } + if tp == nil { + t.Fatal("Expected trace provider, got nil") + } + + // Cleanup + defer func() { + if err := Shutdown(ctx, tp); err != nil { + t.Errorf("Failed to shutdown trace provider: %v", err) + } + }() + + // Verify tracer is available + tracer := otel.Tracer("test") + if tracer == nil { + t.Error("Expected tracer to be available") + } +} + +func TestInitTraceProvider_OTLPExporter(t *testing.T) { + ctx := context.Background() + + // Set OTLP endpoint (can be fake - we're just testing initialization) + err := os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://fake-otel-collector:4317") + if err != nil { + t.Fatalf("Failed to set OTEL_EXPORTER_OTLP_ENDPOINT: %v", err) + } + defer func() { + err := os.Unsetenv("OTEL_EXPORTER_OTLP_ENDPOINT") + if err != nil { + t.Errorf("Failed to unset OTEL_EXPORTER_OTLP_ENDPOINT: %v", err) + } + }() + + // Test that trace provider initializes correctly with OTLP exporter + tp, err := InitTraceProvider(ctx, "test-service", "v1.0.0") + if err != nil { + t.Fatalf("Failed to initialize trace provider with OTLP: %v", err) + } + if tp == nil { + t.Fatal("Expected trace provider, got nil") + } + + // Verify tracer is available + tracer := otel.Tracer("test") + if tracer == nil { + t.Error("Expected tracer to be available") + } + + // Test shutdown + err = Shutdown(ctx, tp) + if err != nil { + t.Errorf("Failed to shutdown trace provider: %v", err) + } +} + +func TestInitTraceProvider_SamplerEnvironmentVariables(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + samplerType string + samplerArg string + expectedSample bool + }{ + { + name: "always_on", + samplerType: "always_on", + expectedSample: true, + }, + { + name: "always_off", + samplerType: "always_off", + expectedSample: false, + }, + { + name: "traceidratio_high", + samplerType: "traceidratio", + samplerArg: "1.0", + expectedSample: true, + }, + { + name: "traceidratio_zero", + samplerType: "traceidratio", + samplerArg: "0.0", + expectedSample: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Set environment variables + if tt.samplerType != "" { + err := os.Setenv("OTEL_TRACES_SAMPLER", tt.samplerType) + if err != nil { + t.Fatalf("Failed to set OTEL_TRACES_SAMPLER: %v", err) + } + defer func() { + err := os.Unsetenv("OTEL_TRACES_SAMPLER") + if err != nil { + t.Errorf("Failed to unset OTEL_TRACES_SAMPLER: %v", err) + } + }() + } + if tt.samplerArg != "" { + err := os.Setenv("OTEL_TRACES_SAMPLER_ARG", tt.samplerArg) + if err != nil { + t.Fatalf("Failed to set OTEL_TRACES_SAMPLER_ARG: %v", err) + } + defer func() { + err := os.Unsetenv("OTEL_TRACES_SAMPLER_ARG") + if err != nil { + t.Errorf("Failed to unset OTEL_TRACES_SAMPLER_ARG: %v", err) + } + }() + } + + tp, err := InitTraceProvider(ctx, "test-service", "v1.0.0") + if err != nil { + t.Fatalf("Failed to initialize trace provider: %v", err) + } + defer func(ctx context.Context, tp *trace.TracerProvider) { + err := Shutdown(ctx, tp) + if err != nil { + t.Errorf("Failed to shutdown trace provider") + } + }(ctx, tp) + + // Test sampling behavior by checking if spans are created + tracer := otel.Tracer("test") + _, span := tracer.Start(ctx, "test-span") + + if tt.expectedSample { + if !span.SpanContext().IsValid() { + t.Error("Expected valid span context for sampling=true") + } + } else { + // Add missing validation for expectedSample=false + if span.SpanContext().IsValid() && span.SpanContext().TraceFlags().IsSampled() { + t.Error("Expected span to NOT be sampled for sampling=false") + } + } + span.End() + }) + } +} + +func TestStartSpan(t *testing.T) { + ctx := context.Background() + + // Initialize trace provider with in-memory exporter for testing + exporter := tracetest.NewInMemoryExporter() + tp := trace.NewTracerProvider( + trace.WithSampler(trace.AlwaysSample()), + trace.WithBatcher(exporter), + ) + otel.SetTracerProvider(tp) + defer func(ctx context.Context, tp *trace.TracerProvider) { + err := Shutdown(ctx, tp) + if err != nil { + t.Errorf("Failed to shutdown trace provider : %v", err) + } + }(ctx, tp) + + // Test span creation + attrs := []attribute.KeyValue{ + attribute.String("test.key", "test.value"), + } + + newCtx, span := StartSpan(ctx, "test-span", attrs...) + span.End() + + // Force flush to ensure span reaches exporter + err := tp.ForceFlush(ctx) + if err != nil { + t.Fatalf("Failed to force flush: %v", err) + } + + // Verify span was created + if !span.SpanContext().IsValid() { + t.Error("Expected valid span context") + } + + // Verify span name + spans := exporter.GetSpans() + if len(spans) != 1 { + t.Fatalf("Expected 1 span, got %d", len(spans)) + } + + if spans[0].Name != "test-span" { + t.Errorf("Expected span name 'test-span', got %s", spans[0].Name) + } + + // Verify attributes + found := false + for _, attr := range spans[0].Attributes { + if attr.Key == "test.key" && attr.Value.AsString() == "test.value" { + found = true + break + } + } + if !found { + t.Error("Expected attribute 'test.key=test.value' not found") + } + + // Verify context enrichment (trace/span IDs should be in context) + traceID, hasTraceID := newCtx.Value(logger.TraceIDCtxKey).(string) + spanID, hasSpanID := newCtx.Value(logger.SpanIDCtxKey).(string) + + if !hasTraceID || traceID == "" { + t.Error("Expected context to contain trace ID") + } + if !hasSpanID || spanID == "" { + t.Error("Expected context to contain span ID") + } + + // Verify the IDs match the actual span + expectedTraceID := span.SpanContext().TraceID().String() + expectedSpanID := span.SpanContext().SpanID().String() + + if traceID != expectedTraceID { + t.Errorf("Expected trace ID %s, got %s", expectedTraceID, traceID) + } + if spanID != expectedSpanID { + t.Errorf("Expected span ID %s, got %s", expectedSpanID, spanID) + } +} + +func TestSetTraceContext(t *testing.T) { + ctx := context.Background() + + // Initialize trace provider + exporter := tracetest.NewInMemoryExporter() + tp := trace.NewTracerProvider( + trace.WithSampler(trace.AlwaysSample()), + trace.WithBatcher(exporter), + ) + otel.SetTracerProvider(tp) + defer func(ctx context.Context, tp *trace.TracerProvider) { + err := Shutdown(ctx, tp) + if err != nil { + t.Error("Failed to shutdown trace provider") + } + }(ctx, tp) + + // Create a span + tracer := otel.Tracer("test") + _, span := tracer.Start(ctx, "test-span") + + // Create CloudEvent + event := cloudevents.NewEvent() + event.SetType("test.event") + event.SetSource("test") + event.SetID("test-123") + + // Set trace context + SetTraceContext(&event, span) + span.End() + + // Verify traceparent extension was added + extensions := event.Extensions() + traceparent, exists := extensions["traceparent"] + if !exists { + t.Fatal("Expected traceparent extension to be set") + } + + // Verify traceparent format: 00-{trace_id}-{span_id}-01 + traceParentStr, ok := traceparent.(string) + if !ok { + t.Fatal("Expected traceparent to be a string") + } + + if len(traceParentStr) != 55 { // 00-{32 chars}-{16 chars}-01 + t.Errorf("Expected traceparent length 55, got %d", len(traceParentStr)) + } + + if traceParentStr[:3] != "00-" { + t.Errorf("Expected traceparent to start with '00-', got %s", traceParentStr[:3]) + } + + if traceParentStr[len(traceParentStr)-3:] != "-01" { + t.Errorf("Expected traceparent to end with '-01', got %s", traceParentStr[len(traceParentStr)-3:]) + } +} + +func TestSetTraceContext_InvalidSpan(t *testing.T) { + // Test with invalid span context using no-op span + event := cloudevents.NewEvent() + event.SetType("test.event") + event.SetSource("test") + event.SetID("test-123") + + // Create a no-op span with invalid span context + // trace.SpanFromContext() with background context returns a no-op span + span := oteltrace.SpanFromContext(context.Background()) + + // Verify the span context is actually invalid + if span.SpanContext().IsValid() { + t.Fatal("Expected invalid span context, but got valid one") + } + + // This should not panic or error, and should not set traceparent + SetTraceContext(&event, span) + + // Should NOT have traceparent extension since span context is invalid + extensions := event.Extensions() + if _, exists := extensions["traceparent"]; exists { + t.Error("Expected no traceparent extension for invalid span context, but got one") + } +} diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 47c2327..bb9c6bc 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -13,6 +13,7 @@ import ( "os" "runtime" "strings" + "sync/atomic" "testing" "time" @@ -22,7 +23,11 @@ import ( "github.com/openshift-hyperfleet/hyperfleet-sentinel/internal/metrics" "github.com/openshift-hyperfleet/hyperfleet-sentinel/internal/sentinel" "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/logger" + "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/telemetry" "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" ) // TestMain provides centralized setup/teardown for integration tests @@ -424,6 +429,40 @@ func TestIntegration_TSLSyntaxMultipleLabels(t *testing.T) { t.Logf("TSL syntax validation completed - received correct format: %s", receivedSearchParam) } +// TestIntegration_BrokerLoggerContext validates that OpenTelemetry trace context and Sentinel-specific context fields are properly propagated through the logging system +// during event publishing workflows. +// +// Context Propagation Flow: +// 1. OpenTelemetry creates trace_id and span_id for spans +// 2. telemetry.StartSpan() enriches context with trace IDs for logging +// 3. Sentinel adds decision context (decision_reason, topic, subset) to context +// 4. Context flows through to broker operations via logger.WithXXX() calls +// 5. All log entries contain both OpenTelemetry and Sentinel context fields +// +// Validated Log Fields: +// +// OpenTelemetry fields: +// - trace_id: Distributed trace identifier from active span +// - span_id: Current span identifier from active span +// +// Sentinel-specific fields: +// - decision_reason: Why the event was triggered (e.g., "max age exceeded", "generation changed") +// - topic: Message broker topic where event is published +// - subset: Resource type being monitored (e.g., "clusters") +// - component: Always "sentinel" for consistent log attribution +// +// Test Approach: +// - Captures structured JSON logs to a buffer for analysis +// - Uses real RabbitMQ broker to generate authentic broker operation logs +// - Mock server returns resources that trigger multiple event types +// - Parses JSON log entries to verify required context fields are present +// +// Success Criteria: +// - Sentinel's "Published event" logs contain all OpenTelemetry and context fields +// - Broker operation logs inherit Sentinel's component and context +// - No context fields are lost during the broker publishing workflow +// +// This test ensures distributed tracing correlation and structured logging work correctly across the Sentinel → Broker boundary for observability. func TestIntegration_BrokerLoggerContext(t *testing.T) { const SENTINEL_COMPONENT = "sentinel" const TEST_VERSION = "test" @@ -439,6 +478,25 @@ func TestIntegration_BrokerLoggerContext(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() + // Set OTLP sampler + err := os.Setenv("OTEL_TRACES_SAMPLER", "always_on") + if err != nil { + t.Errorf("Failed to set OTEL_TRACES_SAMPLER: %v", err) + } + defer func() { + err := os.Unsetenv("OTEL_TRACES_SAMPLER") + if err != nil { + t.Errorf("Failed to unset OTEL_TRACES_SAMPLER: %v", err) + } + }() + + // Setup OpenTelemetry for integration test + tp, err := telemetry.InitTraceProvider(ctx, "sentinel", "test") + if err != nil { + t.Fatalf("Failed to initialize OpenTelemetry: %v", err) + } + defer telemetry.Shutdown(context.Background(), tp) + // Get globalConfig and assign multiWriter to observe output logs globalConfig := logger.GetGlobalConfig() multiWriter := io.MultiWriter(globalConfig.Output, &logBuffer) @@ -451,6 +509,9 @@ func TestIntegration_BrokerLoggerContext(t *testing.T) { Component: SENTINEL_COMPONENT, Version: TEST_VERSION, Hostname: TEST_HOST, + OTel: logger.OTelConfig{ + Enabled: true, + }, } log := logger.NewHyperFleetLoggerWithConfig(cfg) @@ -562,13 +623,13 @@ func TestIntegration_BrokerLoggerContext(t *testing.T) { if entry["subset"] == nil { t.Errorf("Sentinel event log missing subset: %v", entry) } - // TODO: Remove the commented lines as part of https://issues.redhat.com/browse/HYPERFLEET-542. We expect trace_id and span_id to be propagated once they're available - //if entry["trace_id"] == nil { - // t.Errorf("Sentinel event log missing trace_id: %v", entry) - //} - //if entry["span_id"] == nil { - // t.Errorf("Sentinel event log missing span_id: %v", entry) - //} + + if entry["trace_id"] == nil { + t.Errorf("Sentinel event log missing trace_id: %v", entry) + } + if entry["span_id"] == nil { + t.Errorf("Sentinel event log missing span_id: %v", entry) + } t.Logf("Found Sentinel event log with context: decision_reason=%v topic=%v subset=%v", entry["decision_reason"], entry["topic"], entry["subset"]) @@ -616,3 +677,318 @@ func TestIntegration_BrokerLoggerContext(t *testing.T) { t.Log("SUCCESS: Logger context inheritance verified - both Sentinel and broker operations log with component=sentinel") } } + +// TestIntegration_EndToEndSpanHierarchy validates the complete OpenTelemetry span hierarchy created during Sentinel's polling and event publishing workflow. +// +// Expected Span Hierarchy: +// +// sentinel.poll (root span - one per polling cycle) +// ├── HTTP GET (API call to fetch resources) +// ├── sentinel.evaluate (one per resource evaluated) +// │ └── {topic} publish (one per event published) +// ├── sentinel.evaluate (next resource) +// │ └── {topic} publish +// └── ... +// +// The test validates: +// 1. Required spans are created: sentinel.poll, sentinel.evaluate, {topic} publish +// 2. Parent-child relationships: evaluate HTTP spans are children of poll spans, publish spans are children of evaluate spans +// 3. Multiple spans: One evaluate/publish span per resource that triggers an event +// 4. Trace continuity: All spans within a poll cycle belong to the same trace +// +// Test Setup: +// - Uses in-memory OpenTelemetry exporter to capture and analyze spans +// - Mock server returns 3 test resources that will trigger events +// - Real RabbitMQ broker for realistic message publishing +// +// Note: The test may capture multiple poll cycles. Due to context cancellation timing, only 2 poll cycles are validated +func TestIntegration_EndToEndSpanHierarchy(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Setup in-memory trace exporter to capture spans + exporter := tracetest.NewInMemoryExporter() + tp := trace.NewTracerProvider( + trace.WithSampler(trace.AlwaysSample()), + trace.WithBatcher(exporter), + ) + prevTP := otel.GetTracerProvider() + otel.SetTracerProvider(tp) + defer otel.SetTracerProvider(prevTP) + defer func() { + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cleanupCancel() + if err := tp.Shutdown(cleanupCtx); err != nil { + t.Logf("Warning: shutdown of tracer provider failed: %v", err) + } + }() + + now := time.Now() + var callCount atomic.Int32 + readyChan := make(chan bool, 1) + + // Mock server that returns clusters requiring reconciliation + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount.Add(1) + if callCount.Load() > 2 { + select { + case readyChan <- true: + default: + } + } + + // Return clusters that will trigger different decision types + clusters := []map[string]interface{}{ + // Triggers max_age_ready exceeded + createMockCluster("cluster-ready-old", 2, 2, true, now.Add(-35*time.Minute)), + // Triggers max_age_not_ready exceeded + createMockCluster("cluster-not-ready-old", 1, 1, false, now.Add(-15*time.Second)), + // Triggers generation mismatch + createMockCluster("cluster-generation-mismatch", 5, 3, true, now.Add(-5*time.Minute)), + } + response := createMockClusterList(clusters) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + // Get shared RabbitMQ from helper + helper := NewHelper() + + // Setup components + hyperfleetClient, clientErr := client.NewHyperFleetClient(server.URL, 10*time.Second) + if clientErr != nil { + t.Fatalf("failed to create HyperFleet client: %v", clientErr) + } + decisionEngine := engine.NewDecisionEngine(10*time.Second, 30*time.Minute) + log := logger.NewHyperFleetLogger() + + registry := prometheus.NewRegistry() + metrics.NewSentinelMetrics(registry, "test") + + cfg := &config.SentinelConfig{ + ResourceType: "clusters", + Topic: "test-spans-topic", + PollInterval: 100 * time.Millisecond, + MaxAgeNotReady: 10 * time.Second, + MaxAgeReady: 30 * time.Minute, + MessagingSystem: "rabbitmq", + MessageData: map[string]interface{}{ + "id": "resource.id", + "kind": "resource.kind", + }, + } + + s, err := sentinel.NewSentinel(cfg, hyperfleetClient, decisionEngine, helper.RabbitMQ.Publisher(), log) + if err != nil { + t.Fatalf("NewSentinel failed: %v", err) + } + + // Run Sentinel to generate spans + errChan := make(chan error, 1) + go func() { + errChan <- s.Start(ctx) + }() + + // Wait for at least 2 polling cycles + select { + case <-readyChan: + t.Log("Sentinel completed required polling cycles") + case <-time.After(5 * time.Second): + t.Fatal("Timeout waiting for sentinel polling") + } + cancel() + + // Wait for Sentinel to stop + select { + case err := <-errChan: + if err != nil && !errors.Is(err, context.Canceled) { + t.Fatalf("Sentinel failed: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Sentinel did not stop within timeout") + } + + // Force flush spans to exporter + flushCtx, flushCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer flushCancel() + + err = tp.ForceFlush(flushCtx) + if err != nil { + t.Fatalf("force flush error: %v", err) + } + + // Analyze captured spans + spans := exporter.GetSpans() + t.Logf("Captured %d spans", len(spans)) + + // Build span maps for analysis + spansByName := make(map[string][]tracetest.SpanStub) + spansByID := make(map[string]tracetest.SpanStub) + spansByParentID := make(map[string][]tracetest.SpanStub) + + for _, span := range spans { + spansByName[span.Name] = append(spansByName[span.Name], span) + spansByID[span.SpanContext.SpanID().String()] = span + + if span.Parent.IsValid() { + parentID := span.Parent.SpanID().String() + spansByParentID[parentID] = append(spansByParentID[parentID], span) + } + } + + // Print span hierarchy for debugging + t.Log("Span hierarchy:") + for _, span := range spans { + parentInfo := "ROOT" + if span.Parent.IsValid() { + parentInfo = "child of " + span.Parent.SpanID().String() + } + t.Logf(" %s (%s) - %s", span.Name, span.SpanContext.SpanID().String()[:8], parentInfo) + } + + // Validate required spans exist + requiredSpans := []string{ + "sentinel.poll", + "sentinel.evaluate", + "test-spans-topic publish", + } + + for _, requiredSpan := range requiredSpans { + if _, exists := spansByName[requiredSpan]; !exists { + t.Errorf("Required span '%s' not found. Available spans: %v", requiredSpan, getSpanNames(spans)) + } + } + + // Validate span hierarchy structure + pollSpans := spansByName["sentinel.poll"] + if len(pollSpans) == 0 { + t.Fatal("No sentinel.poll spans found") + } + + pollSpansToValidate := pollSpans + if len(pollSpansToValidate) > 2 { + pollSpansToValidate = pollSpansToValidate[:2] + } + + // For each poll span, validate it has the expected children, evaluate only the first two poll spans + for _, pollSpan := range pollSpansToValidate { + pollSpanID := pollSpan.SpanContext.SpanID().String() + directChildren := spansByParentID[pollSpanID] + + t.Logf("Poll span %s has %d direct children", pollSpanID[:8], len(directChildren)) + + // Verify poll span has evaluate children (direct) + hasEvaluateChild := false + hasHTTPChild := false + evaluateChildCount := 0 + + for _, child := range directChildren { + switch { + case child.Name == "sentinel.evaluate": + hasEvaluateChild = true + evaluateChildCount++ + case strings.Contains(child.Name, "HTTP"): + hasHTTPChild = true + } + } + + if !hasEvaluateChild { + t.Errorf("Poll span %s missing sentinel.evaluate child", pollSpanID[:8]) + continue + } + + // Verify each evaluate span has publish grandchildren + publishGrandchildCount := 0 + for _, child := range directChildren { + if child.Name == "sentinel.evaluate" { + childID := child.SpanContext.SpanID().String() + grandchildren := spansByParentID[childID] + for _, grandchild := range grandchildren { + if strings.HasSuffix(grandchild.Name, " publish") { + publishGrandchildCount++ + } + } + } + } + + t.Logf("Poll span %s: evaluate children=%d, publish grandchildren=%d, http=%t", + pollSpanID[:8], evaluateChildCount, publishGrandchildCount, hasHTTPChild) + + // Only validate publish grandchildren if this poll span actually processed events + if evaluateChildCount > 0 && publishGrandchildCount == 0 { + t.Errorf("Poll span %s has %d evaluate children but no publish grandchildren", + pollSpanID[:8], evaluateChildCount) + } + } + + // Validate we have multiple evaluation spans (one per resource) + evaluateSpans := spansByName["sentinel.evaluate"] + if len(evaluateSpans) < 3 { + t.Errorf("Expected at least 3 sentinel.evaluate spans (one per test resource), got %d", len(evaluateSpans)) + } + + // Validate we have multiple publish spans (one per event) + publishSpans := spansByName["test-spans-topic publish"] + if len(publishSpans) < 3 { + t.Errorf("Expected at least 3 publish spans (one per test event), got %d", len(publishSpans)) + } + + validateSpanAttribute(t, publishSpans, "test-spans-topic publish", "messaging.system", cfg.MessagingSystem) + validateSpanAttribute(t, publishSpans, "test-spans-topic publish", "messaging.operation.type", "publish") + validateSpanAttribute(t, publishSpans, "test-spans-topic publish", "messaging.destination.name", cfg.Topic) + + for _, publishSpan := range publishSpans { + if !publishSpan.Parent.IsValid() { + t.Errorf("Publish span %s should have a parent", publishSpan.SpanContext.SpanID().String()[:8]) + continue + } + + parentSpan, exists := spansByID[publishSpan.Parent.SpanID().String()] + if !exists { + t.Errorf("Publish span parent not found") + continue + } + + if parentSpan.Name != "sentinel.evaluate" { + t.Errorf("Publish span parent should be sentinel.evaluate, got %s", parentSpan.Name) + } + } + + // Verify trace continuity - spans should form coherent traces + traceIDs := make(map[string]bool) + for _, span := range spans { + traceIDs[span.SpanContext.TraceID().String()] = true + } + + if len(traceIDs) > len(pollSpans) { + t.Errorf("Expected spans to belong to %d traces (one per poll), but found %d trace IDs", len(pollSpans), len(traceIDs)) + } +} + +// Helper function for span name extraction +func getSpanNames(spans []tracetest.SpanStub) []string { + names := make([]string, len(spans)) + for i, span := range spans { + names[i] = span.Name + } + return names +} + +func validateSpanAttribute(t *testing.T, spans []tracetest.SpanStub, spanName, attrKey, expectedValue string) { + for _, span := range spans { + if span.Name == spanName { + for _, attr := range span.Attributes { + if string(attr.Key) == attrKey { + if attr.Value.AsString() != expectedValue { + t.Errorf("Span '%s': expected %s=%s, got %s", spanName, attrKey, expectedValue, attr.Value.AsString()) + } + return + } + } + t.Errorf("Span '%s': attribute '%s' not found", spanName, attrKey) + return + } + } + t.Errorf("Span '%s' not found", spanName) +}