From 7638cdbd4aa248769f862b9d8c8e99368a77877a Mon Sep 17 00:00:00 2001 From: Rafael Benevides Date: Thu, 12 Mar 2026 14:39:46 -0300 Subject: [PATCH] HYPERFLEET-536 - feat: implement selective querying with condition-based filtering Replace single fetch-all query with two targeted API queries per poll cycle: 1. Not-ready resources: status.conditions.Ready='False' 2. Stale ready resources: Ready='True' with last_updated_time older than max_age_ready This reduces network traffic by leveraging server-side condition filtering. Includes graceful degradation when the stale query fails and resource deduplication via mergeResources. --- internal/client/client.go | 38 ++- internal/client/client_test.go | 125 +++++++++ internal/sentinel/sentinel.go | 81 +++++- internal/sentinel/sentinel_test.go | 389 +++++++++++++++++++++++---- test/integration/integration_test.go | 283 +++++++++++-------- 5 files changed, 732 insertions(+), 184 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 05e4272..7b73da1 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -115,8 +115,12 @@ type Condition struct { // - This maintains service availability during resource provisioning/deletion // - Only resources with valid status are returned // +// The additionalFilters parameter accepts optional TSL condition expressions +// (e.g., "status.conditions.Ready='False'") that are combined with label +// selectors using "and" to form the final search query. +// // Returns a slice of resources and an error if the fetch operation fails. -func (c *HyperFleetClient) FetchResources(ctx context.Context, resourceType ResourceType, labelSelector map[string]string) ([]Resource, error) { +func (c *HyperFleetClient) FetchResources(ctx context.Context, resourceType ResourceType, labelSelector map[string]string, additionalFilters ...string) ([]Resource, error) { // Validate inputs if ctx == nil { return nil, fmt.Errorf("context cannot be nil") @@ -140,15 +144,15 @@ func (c *HyperFleetClient) FetchResources(ctx context.Context, resourceType Reso // Retry operation with backoff (v5 API) operation := func() ([]Resource, error) { - resources, err := c.fetchResourcesOnce(ctx, resourceType, labelSelector) + resources, err := c.fetchResourcesOnce(ctx, resourceType, labelSelector, additionalFilters) if err != nil { // Check if error is retriable if isRetriable(err) { - c.log.V(2).Infof(ctx, "Retriable error fetching %s: %v (will retry)", resourceType, err) + c.log.Debugf(ctx, "Retriable error fetching %s: %v (will retry)", resourceType, err) return nil, err // Retry } // Non-retriable error - stop retrying - c.log.V(2).Infof(ctx, "Non-retriable error fetching %s: %v (will not retry)", resourceType, err) + c.log.Debugf(ctx, "Non-retriable error fetching %s: %v (will not retry)", resourceType, err) return nil, backoff.Permanent(err) } return resources, nil @@ -191,10 +195,30 @@ func labelSelectorToSearchString(labelSelector map[string]string) string { return strings.Join(parts, " and ") } +// buildSearchString combines label selectors and additional TSL filters into a +// single search query string. Label selectors are converted to TSL format +// (e.g., "labels.key='value'") and joined with additional filters using "and". +func buildSearchString(labelSelector map[string]string, additionalFilters []string) string { + parts := make([]string, 0, len(additionalFilters)+1) + + labelSearch := labelSelectorToSearchString(labelSelector) + if labelSearch != "" { + parts = append(parts, labelSearch) + } + + for _, f := range additionalFilters { + if f != "" { + parts = append(parts, f) + } + } + + return strings.Join(parts, " and ") +} + // fetchResourcesOnce performs a single fetch operation without retry logic -func (c *HyperFleetClient) fetchResourcesOnce(ctx context.Context, resourceType ResourceType, labelSelector map[string]string) ([]Resource, error) { - // Build search parameter from label selector - searchParam := labelSelectorToSearchString(labelSelector) +func (c *HyperFleetClient) fetchResourcesOnce(ctx context.Context, resourceType ResourceType, labelSelector map[string]string, additionalFilters []string) ([]Resource, error) { + // Build search parameter from label selector and additional filters + searchParam := buildSearchString(labelSelector, additionalFilters) // Call appropriate endpoint based on resource type switch resourceType { diff --git a/internal/client/client_test.go b/internal/client/client_test.go index eb06dea..db78c43 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -664,3 +664,128 @@ func TestFetchResources_WithLabelSelector(t *testing.T) { t.Errorf("Expected search parameter %q, got %q", expectedSearch, receivedSearchParam) } } + +// TestBuildSearchString tests combining label selectors and additional filters +func TestBuildSearchString(t *testing.T) { + tests := []struct { + name string + labelSelector map[string]string + additionalFilters []string + want string + }{ + { + name: "labels only", + labelSelector: map[string]string{"shard": "1"}, + additionalFilters: nil, + want: "labels.shard='1'", + }, + { + name: "filter only", + labelSelector: nil, + additionalFilters: []string{"status.conditions.Ready='False'"}, + want: "status.conditions.Ready='False'", + }, + { + name: "both labels and filter", + labelSelector: map[string]string{"shard": "1"}, + additionalFilters: []string{"status.conditions.Ready='False'"}, + want: "labels.shard='1' and status.conditions.Ready='False'", + }, + { + name: "multiple filters", + labelSelector: map[string]string{"shard": "1"}, + additionalFilters: []string{"status.conditions.Ready='True'", "status.conditions.Ready.last_updated_time<='2025-01-01T00:00:00Z'"}, + want: "labels.shard='1' and status.conditions.Ready='True' and status.conditions.Ready.last_updated_time<='2025-01-01T00:00:00Z'", + }, + { + name: "both empty", + labelSelector: nil, + additionalFilters: nil, + want: "", + }, + { + name: "empty string filter ignored", + labelSelector: nil, + additionalFilters: []string{"", "status.conditions.Ready='False'", ""}, + want: "status.conditions.Ready='False'", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := buildSearchString(tt.labelSelector, tt.additionalFilters) + if got != tt.want { + t.Errorf("buildSearchString() = %q, want %q", got, tt.want) + } + }) + } +} + +// TestFetchResources_WithAdditionalFilters tests FetchResources with combined label selectors and condition filters +func TestFetchResources_WithAdditionalFilters(t *testing.T) { + var receivedSearchParam string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedSearchParam = r.URL.Query().Get("search") + + response := map[string]interface{}{ + "kind": "ClusterList", + "page": 1, + "size": 0, + "total": 0, + "items": []map[string]interface{}{}, + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + t.Logf("Error encoding response: %v", err) + } + })) + defer server.Close() + + client, _ := NewHyperFleetClient(server.URL, 10*time.Second) + labelSelector := map[string]string{"shard": "1"} + + _, err := client.FetchResources(context.Background(), ResourceTypeClusters, labelSelector, "status.conditions.Ready='False'") + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + expectedSearch := "labels.shard='1' and status.conditions.Ready='False'" + if receivedSearchParam != expectedSearch { + t.Errorf("Expected search parameter %q, got %q", expectedSearch, receivedSearchParam) + } +} + +// TestFetchResources_WithConditionFilterOnly tests FetchResources with only condition filters (no labels) +func TestFetchResources_WithConditionFilterOnly(t *testing.T) { + var receivedSearchParam string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedSearchParam = r.URL.Query().Get("search") + + response := map[string]interface{}{ + "kind": "ClusterList", + "page": 1, + "size": 0, + "total": 0, + "items": []map[string]interface{}{}, + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + t.Logf("Error encoding response: %v", err) + } + })) + defer server.Close() + + client, _ := NewHyperFleetClient(server.URL, 10*time.Second) + + _, err := client.FetchResources(context.Background(), ResourceTypeClusters, nil, "status.conditions.Ready='True'") + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + expectedSearch := "status.conditions.Ready='True'" + if receivedSearchParam != expectedSearch { + t.Errorf("Expected search parameter %q, got %q", expectedSearch, receivedSearchParam) + } +} diff --git a/internal/sentinel/sentinel.go b/internal/sentinel/sentinel.go index f0dcf60..13aade1 100644 --- a/internal/sentinel/sentinel.go +++ b/internal/sentinel/sentinel.go @@ -2,6 +2,7 @@ package sentinel import ( "context" + "errors" "fmt" "sync" "time" @@ -17,6 +18,15 @@ import ( "github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/logger" ) +const ( + // notReadyFilter selects resources whose Ready condition is False. + notReadyFilter = "status.conditions.Ready='False'" + + // staleReadyFilterFmt selects resources whose Ready condition is True but + // last_updated_time is older than the given cutoff (RFC 3339 timestamp). + staleReadyFilterFmt = "status.conditions.Ready='True' and status.conditions.Ready.last_updated_time<='%s'" +) + // Sentinel polls the HyperFleet API and triggers reconciliation events type Sentinel struct { config *config.SentinelConfig @@ -102,15 +112,16 @@ func (s *Sentinel) trigger(ctx context.Context) error { ctx = logger.WithSubset(ctx, resourceType) ctx = logger.WithTopic(ctx, topic) - s.logger.V(2).Info(ctx, "Starting trigger cycle") + s.logger.Debug(ctx, "Starting trigger cycle") // Convert label selectors to map for filtering labelSelector := s.config.ResourceSelector.ToMap() - // Fetch resources from HyperFleet API - resources, err := s.client.FetchResources(ctx, client.ResourceType(s.config.ResourceType), labelSelector) + // Fetch resources using condition-based server-side filtering: + // Query 1: Not-ready resources (need frequent reconciliation) + // Query 2: Stale ready resources (exceeded max age) + resources, err := s.fetchFilteredResources(ctx, labelSelector) if err != nil { - // Record API error metrics.UpdateAPIErrorsMetric(resourceType, resourceSelector, "fetch_error") return fmt.Errorf("failed to fetch resources: %w", err) } @@ -126,6 +137,11 @@ func (s *Sentinel) trigger(ctx context.Context) error { for i := range resources { resource := &resources[i] + if resource.ID == "" { + s.logger.Warnf(ctx, "Skipping resource with empty ID kind=%s", resource.Kind) + continue + } + decision := s.decisionEngine.Evaluate(resource, now) if decision.ShouldPublish { @@ -168,7 +184,7 @@ func (s *Sentinel) trigger(ctx context.Context) error { // Record skipped resource metrics.UpdateResourcesSkippedMetric(resourceType, resourceSelector, decision.Reason) - s.logger.V(2).Infof(skipCtx, "Skipped resource resource_id=%s ready=%t", + s.logger.Debugf(skipCtx, "Skipped resource resource_id=%s ready=%t", resource.ID, resource.Status.Ready) skipped++ } @@ -192,6 +208,61 @@ func (s *Sentinel) trigger(ctx context.Context) error { return nil } +// fetchFilteredResources makes two targeted API queries to fetch only resources +// that likely need reconciliation, reducing network traffic compared to fetching +// all resources: +// +// 1. Not-ready resources: status.conditions.Ready='False' +// 2. Stale ready resources: Ready='True' with last_updated_time older than max_age_ready +// +// Results are merged and deduplicated by resource ID. The DecisionEngine still +// evaluates the filtered set in memory (e.g., for generation-based checks). +func (s *Sentinel) fetchFilteredResources(ctx context.Context, labelSelector map[string]string) ([]client.Resource, error) { + rt := client.ResourceType(s.config.ResourceType) + + // Query 1: Not-ready resources + notReadyResources, err := s.client.FetchResources(ctx, rt, labelSelector, notReadyFilter) + if err != nil { + return nil, fmt.Errorf("failed to fetch not-ready resources: %w", err) + } + s.logger.Debugf(ctx, "Fetched not-ready resources count=%d", len(notReadyResources)) + + // Query 2: Stale ready resources (last_updated_time exceeded max_age_ready) + cutoff := time.Now().Add(-s.config.MaxAgeReady) + staleFilter := fmt.Sprintf(staleReadyFilterFmt, cutoff.Format(time.RFC3339)) + staleResources, err := s.client.FetchResources(ctx, rt, labelSelector, staleFilter) + if err != nil { + // Propagate context cancellation/timeout — the caller must see these + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, err + } + // Graceful degradation: for transient/API errors, continue with not-ready results + s.logger.Errorf(ctx, "Failed to fetch stale resources, continuing with not-ready only: %v", err) + return notReadyResources, nil + } + s.logger.Debugf(ctx, "Fetched stale ready resources count=%d", len(staleResources)) + + return mergeResources(notReadyResources, staleResources), nil +} + +// mergeResources combines two resource slices, deduplicating by resource ID. +// Resources from the first slice take precedence when duplicates are found. +func mergeResources(a, b []client.Resource) []client.Resource { + seen := make(map[string]struct{}, len(a)) + result := make([]client.Resource, 0, len(a)+len(b)) + + for i := range a { + seen[a[i].ID] = struct{}{} + result = append(result, a[i]) + } + for i := range b { + if _, exists := seen[b[i].ID]; !exists { + result = append(result, b[i]) + } + } + return result +} + // buildEventData builds the CloudEvent data payload for a resource using the // configured payload builder. func (s *Sentinel) buildEventData(ctx context.Context, resource *client.Resource, decision engine.Decision) map[string]interface{} { diff --git a/internal/sentinel/sentinel_test.go b/internal/sentinel/sentinel_test.go index ba11af9..1238888 100644 --- a/internal/sentinel/sentinel_test.go +++ b/internal/sentinel/sentinel_test.go @@ -7,6 +7,9 @@ import ( "fmt" "net/http" "net/http/httptest" + "strings" + "sync" + "sync/atomic" "testing" "time" @@ -111,20 +114,46 @@ func (m *MockPublisherWithLogger) Close() error { return nil } func (m *MockPublisherWithLogger) Health(ctx context.Context) error { return nil } -// TestTrigger_Success tests successful event publishing -func TestTrigger_Success(t *testing.T) { - ctx := context.Background() +// mockServerForConditionQueries creates a mock HTTP server that routes responses +// based on condition-based search parameters. Each poll cycle makes two queries: +// one for not-ready resources (Ready='False') and one for stale-ready resources (Ready='True'). +func mockServerForConditionQueries(t *testing.T, notReadyClusters, staleClusters []map[string]interface{}) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + search := r.URL.Query().Get("search") + + var clusters []map[string]interface{} + switch { + case strings.Contains(search, "Ready='False'"): + clusters = notReadyClusters + case strings.Contains(search, "Ready='True'"): + clusters = staleClusters + default: + t.Errorf("unexpected search query: %q", search) + w.WriteHeader(http.StatusBadRequest) + return + } - // Create mock server - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Cluster exceeds max age (31 minutes ago) - cluster := createMockCluster("cluster-1", 2, 2, true, time.Now().Add(-31*time.Minute)) - response := createMockClusterList([]map[string]interface{}{cluster}) + response := createMockClusterList(clusters) w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(response); err != nil { t.Logf("Error encoding response: %v", err) } })) +} + +// TestTrigger_Success tests successful event publishing +func TestTrigger_Success(t *testing.T) { + ctx := context.Background() + now := time.Now() + + // Stale ready cluster exceeds max age (31 minutes ago) + server := mockServerForConditionQueries(t, + nil, // no not-ready clusters + []map[string]interface{}{ + createMockCluster("cluster-1", 2, 2, true, now.Add(-31*time.Minute)), + }, + ) defer server.Close() // Setup components @@ -189,16 +218,8 @@ func TestTrigger_Success(t *testing.T) { func TestTrigger_NoEventsPublished(t *testing.T) { ctx := context.Background() - // Create mock server - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Cluster within max age and generation in sync - should NOT publish - cluster := createMockCluster("cluster-1", 1, 1, true, time.Now().Add(-5*time.Minute)) - response := createMockClusterList([]map[string]interface{}{cluster}) - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(response); err != nil { - t.Logf("Error encoding response: %v", err) - } - })) + // Both queries return empty results + server := mockServerForConditionQueries(t, nil, nil) defer server.Close() // Setup components @@ -243,7 +264,7 @@ func TestTrigger_NoEventsPublished(t *testing.T) { func TestTrigger_FetchError(t *testing.T) { ctx := context.Background() - // Create mock server that returns 500 errors + // Create mock server that returns 500 errors for all queries server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) if _, err := w.Write([]byte(`{"error": "internal server error"}`)); err != nil { @@ -294,16 +315,14 @@ func TestTrigger_FetchError(t *testing.T) { // TestTrigger_PublishError tests handling of publish errors (graceful degradation) func TestTrigger_PublishError(t *testing.T) { ctx := context.Background() + now := time.Now() - // Create mock server - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - cluster := createMockCluster("cluster-1", 2, 2, true, time.Now().Add(-31*time.Minute)) - response := createMockClusterList([]map[string]interface{}{cluster}) - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(response); err != nil { - t.Logf("Error encoding response: %v", err) - } - })) + server := mockServerForConditionQueries(t, + nil, // no not-ready clusters + []map[string]interface{}{ + createMockCluster("cluster-1", 2, 2, true, now.Add(-31*time.Minute)), + }, + ) defer server.Close() // Setup components @@ -347,20 +366,18 @@ func TestTrigger_MixedResources(t *testing.T) { ctx := context.Background() now := time.Now() - // Create mock server - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - clusters := []map[string]interface{}{ - createMockCluster("cluster-1", 2, 2, true, now.Add(-31*time.Minute)), // Should publish (max age exceeded) - createMockCluster("cluster-2", 1, 1, true, now.Add(-5*time.Minute)), // Should skip (within max age) + // Split resources between not-ready and stale queries + server := mockServerForConditionQueries(t, + // Not-ready query + []map[string]interface{}{ createMockCluster("cluster-3", 3, 3, false, now.Add(-1*time.Minute)), // Should publish (not ready max age exceeded: 1min > 10s) + }, + // Stale query + []map[string]interface{}{ + createMockCluster("cluster-1", 2, 2, true, now.Add(-31*time.Minute)), // Should publish (max age exceeded) createMockCluster("cluster-4", 5, 4, true, now.Add(-5*time.Minute)), // Should publish (generation changed) - } - response := createMockClusterList(clusters) - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(response); err != nil { - t.Logf("Error encoding response: %v", err) - } - })) + }, + ) defer server.Close() // Setup components @@ -397,8 +414,8 @@ func TestTrigger_MixedResources(t *testing.T) { } // Should publish for: - // - cluster-1 (ready max age exceeded: 31min > 30min) // - cluster-3 (not ready max age exceeded: 1min > 10s) + // - cluster-1 (ready max age exceeded: 31min > 30min) // - cluster-4 (generation changed: 5 > 4) if len(mockPublisher.publishedEvents) != 3 { t.Errorf("Expected 3 published events, got %d", len(mockPublisher.publishedEvents)) @@ -416,15 +433,14 @@ func TestTrigger_MixedResources(t *testing.T) { // used in place of the hardcoded payload when MessageData is set. func TestTrigger_WithMessageDataConfig(t *testing.T) { ctx := context.Background() + now := time.Now() - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - cluster := createMockCluster("cluster-xyz", 2, 2, true, time.Now().Add(-31*time.Minute)) - response := createMockClusterList([]map[string]interface{}{cluster}) - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(response); err != nil { - t.Logf("Error encoding response: %v", err) - } - })) + server := mockServerForConditionQueries(t, + nil, + []map[string]interface{}{ + createMockCluster("cluster-xyz", 2, 2, true, now.Add(-31*time.Minute)), + }, + ) defer server.Close() hyperfleetClient, _ := client.NewHyperFleetClient(server.URL, 10*time.Second) @@ -482,15 +498,14 @@ func TestTrigger_WithMessageDataConfig(t *testing.T) { // TestTrigger_WithNestedMessageData verifies that nested objects are correctly built. func TestTrigger_WithNestedMessageData(t *testing.T) { ctx := context.Background() + now := time.Now() - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - cluster := createMockCluster("cluster-nest", 1, 1, true, time.Now().Add(-31*time.Minute)) - response := createMockClusterList([]map[string]interface{}{cluster}) - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(response); err != nil { - t.Logf("Error encoding response: %v", err) - } - })) + server := mockServerForConditionQueries(t, + nil, + []map[string]interface{}{ + createMockCluster("cluster-nest", 1, 1, true, now.Add(-31*time.Minute)), + }, + ) defer server.Close() hyperfleetClient, _ := client.NewHyperFleetClient(server.URL, 10*time.Second) @@ -634,3 +649,263 @@ func TestTrigger_ContextPropagationToBroker(t *testing.T) { t.Errorf("span_id not propagated: got %v", spanID) } } + +// TestMergeResources tests the mergeResources deduplication function +func TestMergeResources(t *testing.T) { + tests := []struct { + name string + a []client.Resource + b []client.Resource + wantIDs []string + wantLen int + }{ + { + name: "no overlap", + a: []client.Resource{{ID: "a1"}, {ID: "a2"}}, + b: []client.Resource{{ID: "b1"}}, + wantIDs: []string{"a1", "a2", "b1"}, + wantLen: 3, + }, + { + name: "duplicate in b removed", + a: []client.Resource{{ID: "x"}}, + b: []client.Resource{{ID: "x"}, {ID: "y"}}, + wantIDs: []string{"x", "y"}, + wantLen: 2, + }, + { + name: "both empty", + a: nil, + b: nil, + wantIDs: nil, + wantLen: 0, + }, + { + name: "a empty", + a: nil, + b: []client.Resource{{ID: "b1"}}, + wantIDs: []string{"b1"}, + wantLen: 1, + }, + { + name: "b empty", + a: []client.Resource{{ID: "a1"}}, + b: nil, + wantIDs: []string{"a1"}, + wantLen: 1, + }, + { + name: "all duplicates", + a: []client.Resource{{ID: "x"}, {ID: "y"}}, + b: []client.Resource{{ID: "x"}, {ID: "y"}}, + wantIDs: []string{"x", "y"}, + wantLen: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := mergeResources(tt.a, tt.b) + if len(result) != tt.wantLen { + t.Errorf("mergeResources() returned %d resources, want %d", len(result), tt.wantLen) + } + for i, id := range tt.wantIDs { + if i < len(result) && result[i].ID != id { + t.Errorf("mergeResources()[%d].ID = %q, want %q", i, result[i].ID, id) + } + } + }) + } +} + +// TestMergeResources_FirstSlicePrecedence verifies that resources from the first +// slice take precedence when duplicates are found. +func TestMergeResources_FirstSlicePrecedence(t *testing.T) { + a := []client.Resource{{ID: "dup", Kind: "from-a"}} + b := []client.Resource{{ID: "dup", Kind: "from-b"}} + + result := mergeResources(a, b) + if len(result) != 1 { + t.Fatalf("Expected 1 resource, got %d", len(result)) + } + if result[0].Kind != "from-a" { + t.Errorf("Expected resource from first slice (Kind='from-a'), got Kind=%q", result[0].Kind) + } +} + +// TestTrigger_ConditionBasedQueries verifies that trigger sends two condition-based +// queries (not-ready + stale) and correctly processes results from both. +func TestTrigger_ConditionBasedQueries(t *testing.T) { + ctx := context.Background() + now := time.Now() + + var capturedSearchParams []string + var mu sync.Mutex + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + search := r.URL.Query().Get("search") + mu.Lock() + capturedSearchParams = append(capturedSearchParams, search) + mu.Unlock() + + var clusters []map[string]interface{} + switch { + case strings.Contains(search, "Ready='False'"): + clusters = []map[string]interface{}{ + createMockCluster("not-ready-1", 1, 1, false, now.Add(-15*time.Second)), + } + case strings.Contains(search, "Ready='True'"): + clusters = []map[string]interface{}{ + createMockCluster("stale-1", 2, 2, true, now.Add(-31*time.Minute)), + } + default: + t.Errorf("unexpected search query: %q", search) + w.WriteHeader(http.StatusBadRequest) + return + } + + response := createMockClusterList(clusters) + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + t.Logf("Error encoding response: %v", err) + } + })) + defer server.Close() + + hyperfleetClient, _ := client.NewHyperFleetClient(server.URL, 10*time.Second) + decisionEngine := engine.NewDecisionEngine(10*time.Second, 30*time.Minute) + mockPublisher := &MockPublisher{} + log := logger.NewHyperFleetLogger() + + registry := prometheus.NewRegistry() + metrics.NewSentinelMetrics(registry, "test") + + cfg := &config.SentinelConfig{ + ResourceType: "clusters", + Topic: "test-topic", + MaxAgeNotReady: 10 * time.Second, + MaxAgeReady: 30 * time.Minute, + MessageData: map[string]interface{}{ + "id": "resource.id", + "kind": "resource.kind", + }, + } + + s, err := NewSentinel(cfg, hyperfleetClient, decisionEngine, mockPublisher, log) + if err != nil { + t.Fatalf("NewSentinel failed: %v", err) + } + + if err := s.trigger(ctx); err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + // Verify two queries were sent + mu.Lock() + params := make([]string, len(capturedSearchParams)) + copy(params, capturedSearchParams) + mu.Unlock() + + if len(params) != 2 { + t.Fatalf("Expected 2 search queries (not-ready + stale), got %d", len(params)) + } + + hasNotReady := false + hasStale := false + for _, search := range params { + if strings.Contains(search, "Ready='False'") { + hasNotReady = true + } + if strings.Contains(search, "Ready='True'") { + hasStale = true + } + } + if !hasNotReady { + t.Error("No not-ready query (Ready='False') was sent") + } + if !hasStale { + t.Error("No stale query (Ready='True') was sent") + } + + // Both resources should trigger events + if len(mockPublisher.publishedEvents) != 2 { + t.Errorf("Expected 2 published events (1 not-ready + 1 stale), got %d", len(mockPublisher.publishedEvents)) + } +} + +// TestTrigger_StaleQueryFailure tests graceful degradation when the stale query +// fails but the not-ready query succeeds. +func TestTrigger_StaleQueryFailure(t *testing.T) { + ctx := context.Background() + now := time.Now() + + var queryCount atomic.Int32 + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + queryCount.Add(1) + search := r.URL.Query().Get("search") + + if strings.Contains(search, "Ready='False'") { + // Not-ready query succeeds + clusters := []map[string]interface{}{ + createMockCluster("not-ready-1", 1, 1, false, now.Add(-15*time.Second)), + } + response := createMockClusterList(clusters) + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + t.Logf("Error encoding response: %v", err) + } + } else if strings.Contains(search, "Ready='True'") { + // Stale query fails + w.WriteHeader(http.StatusInternalServerError) + if _, err := w.Write([]byte(`{"error": "internal server error"}`)); err != nil { + t.Logf("Error writing error response: %v", err) + } + } else { + t.Errorf("unexpected search query: %q", search) + w.WriteHeader(http.StatusBadRequest) + return + } + })) + defer server.Close() + + hyperfleetClient, _ := client.NewHyperFleetClient(server.URL, 1*time.Second) + decisionEngine := engine.NewDecisionEngine(10*time.Second, 30*time.Minute) + mockPublisher := &MockPublisher{} + log := logger.NewHyperFleetLogger() + + registry := prometheus.NewRegistry() + metrics.NewSentinelMetrics(registry, "test") + + cfg := &config.SentinelConfig{ + ResourceType: "clusters", + Topic: "test-topic", + MaxAgeNotReady: 10 * time.Second, + MaxAgeReady: 30 * time.Minute, + MessageData: map[string]interface{}{ + "id": "resource.id", + "kind": "resource.kind", + }, + } + + s, err := NewSentinel(cfg, hyperfleetClient, decisionEngine, mockPublisher, log) + if err != nil { + t.Fatalf("NewSentinel failed: %v", err) + } + + // Should succeed with graceful degradation (not-ready results only) + err = s.trigger(ctx) + if err != nil { + t.Errorf("Expected no error (graceful degradation), got %v", err) + } + + // Should still publish event for the not-ready resource + if len(mockPublisher.publishedEvents) != 1 { + t.Errorf("Expected 1 published event (from not-ready query), got %d", len(mockPublisher.publishedEvents)) + } + + // Both queries should have been attempted (stale query retries on 500, so queryCount >= 2) + if queryCount.Load() < 2 { + t.Errorf("Expected at least 2 queries (not-ready + stale), got %d", queryCount.Load()) + } +} diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 47c2327..a7ac12b 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -13,6 +13,8 @@ import ( "os" "runtime" "strings" + "sync" + "sync/atomic" "testing" "time" @@ -119,26 +121,34 @@ func TestIntegration_EndToEnd(t *testing.T) { helper := NewHelper() now := time.Now() + var pollCycleCount atomic.Int32 - // Create mock HyperFleet API server - callCount := 0 + // Create mock HyperFleet API server that handles condition-based queries + // Each poll cycle makes 2 queries: not-ready + stale server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - callCount++ + search := r.URL.Query().Get("search") - // First call: Return clusters with one needing reconciliation - if callCount == 1 { - clusters := []map[string]interface{}{ - createMockCluster("cluster-1", 2, 2, true, now.Add(-31*time.Minute)), // Max age exceeded - createMockCluster("cluster-2", 1, 1, true, now.Add(-5*time.Minute)), // Within max age + var clusters []map[string]interface{} + + // Track poll cycles (stale query marks end of a cycle) + if strings.Contains(search, "Ready='True'") { + pollCycleCount.Add(1) + } + + switch { + case strings.Contains(search, "Ready='False'"): + // Not-ready query: no not-ready clusters + clusters = nil + case strings.Contains(search, "Ready='True'"): + // Stale query: return stale cluster only on first cycle + if pollCycleCount.Load() == 1 { + clusters = []map[string]interface{}{ + createMockCluster("cluster-1", 2, 2, true, now.Add(-31*time.Minute)), // Max age exceeded + } } - response := createMockClusterList(clusters) - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) - return } - // Subsequent calls: Empty list - response := createMockClusterList([]map[string]interface{}{}) + response := createMockClusterList(clusters) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) })) @@ -180,8 +190,8 @@ func TestIntegration_EndToEnd(t *testing.T) { cancel() // Verify that Sentinel actually polled the API at least twice - if callCount < 2 { - t.Errorf("Expected at least 2 polling cycles, got %d", callCount) + if pollCycleCount.Load() < 2 { + t.Errorf("Expected at least 2 polling cycles, got %d", pollCycleCount.Load()) } // Wait for Sentinel to stop @@ -209,50 +219,40 @@ func TestIntegration_LabelSelectorFiltering(t *testing.T) { now := time.Now() - // Create mock server that returns 2 clusters: one with shard:1, one with shard:2 - // Server implements server-side filtering based on search parameter - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // All available clusters - allClusters := []map[string]interface{}{ - // This cluster has shard:1 - SHOULD match selector and trigger event - createMockClusterWithLabels( - "cluster-shard-1", - 2, - 2, - true, - now.Add(-31*time.Minute), // Exceeds max_age_ready (30m) - map[string]string{"shard": "1"}, - ), - // This cluster has shard:2 - should NOT match selector - createMockClusterWithLabels( - "cluster-shard-2", - 2, - 2, - true, - now.Add(-31*time.Minute), // Also exceeds max_age, but should be filtered - map[string]string{"shard": "2"}, - ), - } + // Track search parameters for assertion + var capturedSearchParams []string + var captureMu sync.Mutex - // Server-side filtering: check for search parameter - // TSL syntax: labels.key='value' and labels.key2='value2' + // Create mock server that returns clusters filtered by label selector and conditions. + // With condition-based queries, Sentinel sends: + // - Not-ready query: "labels.shard='1' and status.conditions.Ready='False'" + // - Stale query: "labels.shard='1' and status.conditions.Ready='True' and ..." + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { searchParam := r.URL.Query().Get("search") - filteredClusters := allClusters - - if searchParam != "" { - filteredClusters = []map[string]interface{}{} - for _, cluster := range allClusters { - labels, ok := cluster["labels"].(map[string]string) - if !ok { - continue - } - - // TSL matching: if search contains "labels.shard='1'", only include clusters with shard=1 - if searchParam == "labels.shard='1'" && labels["shard"] == "1" { - filteredClusters = append(filteredClusters, cluster) + captureMu.Lock() + capturedSearchParams = append(capturedSearchParams, searchParam) + captureMu.Unlock() + + var filteredClusters []map[string]interface{} + + // Only return clusters matching shard:1 label AND condition filters + if strings.Contains(searchParam, "labels.shard='1'") { + if strings.Contains(searchParam, "Ready='False'") { + // Not-ready query: no not-ready clusters with shard:1 + filteredClusters = nil + } else if strings.Contains(searchParam, "Ready='True'") { + // Stale query: return stale cluster with shard:1 + filteredClusters = []map[string]interface{}{ + createMockClusterWithLabels( + "cluster-shard-1", + 2, 2, true, + now.Add(-31*time.Minute), // Exceeds max_age_ready (30m) + map[string]string{"shard": "1"}, + ), } } } + // shard:2 clusters are never returned since label selector doesn't match response := createMockClusterList(filteredClusters) w.Header().Set("Content-Type", "application/json") @@ -304,12 +304,41 @@ func TestIntegration_LabelSelectorFiltering(t *testing.T) { t.Errorf("Expected Start to return nil or context.Canceled, got: %v", startErr) } - // Integration test validates label selector filtering works end-to-end - // Event verification requires subscriber implementation (future enhancement) - t.Log("Label selector filtering test with real RabbitMQ broker completed successfully") + // Validate captured search parameters + captureMu.Lock() + params := make([]string, len(capturedSearchParams)) + copy(params, capturedSearchParams) + captureMu.Unlock() + + if len(params) == 0 { + t.Fatal("No search queries were captured — Sentinel may not have polled") + } + + hasNotReady := false + hasStale := false + for _, search := range params { + if !strings.Contains(search, "labels.shard='1'") { + t.Errorf("Expected search to contain label selector \"labels.shard='1'\", got %q", search) + } + if strings.Contains(search, "Ready='False'") { + hasNotReady = true + } + if strings.Contains(search, "Ready='True'") { + hasStale = true + } + } + if !hasNotReady { + t.Error("No not-ready query (Ready='False') was captured") + } + if !hasStale { + t.Error("No stale query (Ready='True') was captured") + } + + t.Logf("Label selector filtering test completed with %d queries", len(params)) } // TestIntegration_TSLSyntaxMultipleLabels validates TSL syntax with multiple label selectors +// and condition-based filtering func TestIntegration_TSLSyntaxMultipleLabels(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -319,48 +348,34 @@ func TestIntegration_TSLSyntaxMultipleLabels(t *testing.T) { now := time.Now() - // Track the search parameter received by the mock server - var receivedSearchParam string + // Track the search parameters received by the mock server + var receivedSearchParams []string + var searchMu sync.Mutex - // Create mock server that validates TSL syntax + // Create mock server that validates TSL syntax with conditions server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - receivedSearchParam = r.URL.Query().Get("search") - - // All available clusters - allClusters := []map[string]interface{}{ - createMockClusterWithLabels( - "cluster-region-env-match", - 2, - 2, - true, - now.Add(-31*time.Minute), - map[string]string{"region": "us-east", "env": "production"}, - ), - createMockClusterWithLabels( - "cluster-region-only", - 2, - 2, - true, - now.Add(-31*time.Minute), - map[string]string{"region": "us-east", "env": "staging"}, - ), - } - - // Server-side filtering using TSL syntax - filteredClusters := allClusters - - // Expected TSL format: "labels.env='production' and labels.region='us-east'" - expectedTSL := "labels.env='production' and labels.region='us-east'" - if receivedSearchParam == expectedTSL { - filteredClusters = []map[string]interface{}{} - for _, cluster := range allClusters { - labels, ok := cluster["labels"].(map[string]string) - if !ok { - continue - } - // Match clusters with both labels - if labels["region"] == "us-east" && labels["env"] == "production" { - filteredClusters = append(filteredClusters, cluster) + search := r.URL.Query().Get("search") + searchMu.Lock() + receivedSearchParams = append(receivedSearchParams, search) + searchMu.Unlock() + + var filteredClusters []map[string]interface{} + + // With condition-based queries, labels are combined with condition filters + labelPrefix := "labels.env='production' and labels.region='us-east'" + if strings.HasPrefix(search, labelPrefix) { + if strings.Contains(search, "Ready='False'") { + // Not-ready query + filteredClusters = nil + } else if strings.Contains(search, "Ready='True'") { + // Stale query: return matching cluster + filteredClusters = []map[string]interface{}{ + createMockClusterWithLabels( + "cluster-region-env-match", + 2, 2, true, + now.Add(-31*time.Minute), + map[string]string{"region": "us-east", "env": "production"}, + ), } } } @@ -409,10 +424,35 @@ func TestIntegration_TSLSyntaxMultipleLabels(t *testing.T) { time.Sleep(300 * time.Millisecond) cancel() - // Validate the search parameter format is correct TSL syntax - expectedTSL := "labels.env='production' and labels.region='us-east'" - if receivedSearchParam != expectedTSL { - t.Errorf("Expected TSL search parameter %q, got %q", expectedTSL, receivedSearchParam) + // Validate that queries include both label selectors and condition filters + labelPrefix := "labels.env='production' and labels.region='us-east'" + searchMu.Lock() + paramsCopy := make([]string, len(receivedSearchParams)) + copy(paramsCopy, receivedSearchParams) + searchMu.Unlock() + + if len(paramsCopy) == 0 { + t.Fatal("No search queries were captured — Sentinel may not have polled") + } + + hasNotReady := false + hasStale := false + for _, search := range paramsCopy { + if !strings.HasPrefix(search, labelPrefix) { + t.Errorf("Expected search to start with label selectors %q, got %q", labelPrefix, search) + } + if strings.Contains(search, "Ready='False'") { + hasNotReady = true + } + if strings.Contains(search, "Ready='True'") { + hasStale = true + } + } + if !hasNotReady { + t.Error("No not-ready query (Ready='False') was captured") + } + if !hasStale { + t.Error("No stale query (Ready='True') was captured") } // Wait for sentinel to stop @@ -421,7 +461,7 @@ func TestIntegration_TSLSyntaxMultipleLabels(t *testing.T) { t.Errorf("Expected Start to return nil or context.Canceled, got: %v", startErr) } - t.Logf("TSL syntax validation completed - received correct format: %s", receivedSearchParam) + t.Logf("TSL syntax validation completed with %d queries", len(paramsCopy)) } func TestIntegration_BrokerLoggerContext(t *testing.T) { @@ -433,7 +473,7 @@ func TestIntegration_BrokerLoggerContext(t *testing.T) { // Buffer to observe logs var logBuffer bytes.Buffer now := time.Now() - callCount := 0 + var callCount atomic.Int32 readyChan := make(chan bool, 1) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -455,19 +495,32 @@ func TestIntegration_BrokerLoggerContext(t *testing.T) { log := logger.NewHyperFleetLoggerWithConfig(cfg) // Mock server returns clusters that will trigger event publishing + // With condition-based queries, each poll cycle makes 2 queries server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - callCount++ - if callCount >= 2 { - select { - case readyChan <- true: - default: + search := r.URL.Query().Get("search") + + var clusters []map[string]interface{} + switch { + case strings.Contains(search, "Ready='False'"): + if callCount.Add(1) >= 2 { + select { + case readyChan <- true: + default: + } } - } - clusters := []map[string]interface{}{ - // This cluster will trigger max_age_ready exceeded event - createMockCluster("cluster-old", 2, 2, true, now.Add(-35*time.Minute)), // Exceeds 30min - // This cluster will trigger max_age_not_ready exceeded event - createMockCluster("cluster-not-ready", 1, 1, false, now.Add(-15*time.Second)), // Exceeds 10sec + // Not-ready query: return not-ready cluster + clusters = []map[string]interface{}{ + createMockCluster("cluster-not-ready", 1, 1, false, now.Add(-15*time.Second)), + } + case strings.Contains(search, "Ready='True'"): + // Stale query: return stale ready cluster + clusters = []map[string]interface{}{ + createMockCluster("cluster-old", 2, 2, true, now.Add(-35*time.Minute)), + } + default: + t.Errorf("unexpected search query: %q", search) + w.WriteHeader(http.StatusBadRequest) + return } response := createMockClusterList(clusters) w.Header().Set("Content-Type", "application/json")