Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ type Condition struct {
// - This maintains service availability during resource provisioning/deletion
// - Only resources with valid status are returned
//
// The additionalFilters parameter accepts optional TSL condition expressions
// (e.g., "status.conditions.Ready='False'") that are combined with label
// selectors using "and" to form the final search query.
//
// Returns a slice of resources and an error if the fetch operation fails.
func (c *HyperFleetClient) FetchResources(ctx context.Context, resourceType ResourceType, labelSelector map[string]string) ([]Resource, error) {
func (c *HyperFleetClient) FetchResources(ctx context.Context, resourceType ResourceType, labelSelector map[string]string, additionalFilters ...string) ([]Resource, error) {
// Validate inputs
if ctx == nil {
return nil, fmt.Errorf("context cannot be nil")
Expand All @@ -140,15 +144,15 @@ func (c *HyperFleetClient) FetchResources(ctx context.Context, resourceType Reso

// Retry operation with backoff (v5 API)
operation := func() ([]Resource, error) {
resources, err := c.fetchResourcesOnce(ctx, resourceType, labelSelector)
resources, err := c.fetchResourcesOnce(ctx, resourceType, labelSelector, additionalFilters)
if err != nil {
// Check if error is retriable
if isRetriable(err) {
c.log.V(2).Infof(ctx, "Retriable error fetching %s: %v (will retry)", resourceType, err)
c.log.Debugf(ctx, "Retriable error fetching %s: %v (will retry)", resourceType, err)
return nil, err // Retry
}
// Non-retriable error - stop retrying
c.log.V(2).Infof(ctx, "Non-retriable error fetching %s: %v (will not retry)", resourceType, err)
c.log.Debugf(ctx, "Non-retriable error fetching %s: %v (will not retry)", resourceType, err)
return nil, backoff.Permanent(err)
}
return resources, nil
Expand Down Expand Up @@ -191,10 +195,30 @@ func labelSelectorToSearchString(labelSelector map[string]string) string {
return strings.Join(parts, " and ")
}

// buildSearchString combines label selectors and additional TSL filters into a
// single search query string. Label selectors are converted to TSL format
// (e.g., "labels.key='value'") and joined with additional filters using "and".
func buildSearchString(labelSelector map[string]string, additionalFilters []string) string {
parts := make([]string, 0, len(additionalFilters)+1)

labelSearch := labelSelectorToSearchString(labelSelector)
if labelSearch != "" {
parts = append(parts, labelSearch)
}

for _, f := range additionalFilters {
if f != "" {
parts = append(parts, f)
}
}

return strings.Join(parts, " and ")
}

// fetchResourcesOnce performs a single fetch operation without retry logic
func (c *HyperFleetClient) fetchResourcesOnce(ctx context.Context, resourceType ResourceType, labelSelector map[string]string) ([]Resource, error) {
// Build search parameter from label selector
searchParam := labelSelectorToSearchString(labelSelector)
func (c *HyperFleetClient) fetchResourcesOnce(ctx context.Context, resourceType ResourceType, labelSelector map[string]string, additionalFilters []string) ([]Resource, error) {
// Build search parameter from label selector and additional filters
searchParam := buildSearchString(labelSelector, additionalFilters)

// Call appropriate endpoint based on resource type
switch resourceType {
Expand Down
125 changes: 125 additions & 0 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,3 +664,128 @@ func TestFetchResources_WithLabelSelector(t *testing.T) {
t.Errorf("Expected search parameter %q, got %q", expectedSearch, receivedSearchParam)
}
}

// TestBuildSearchString tests combining label selectors and additional filters
func TestBuildSearchString(t *testing.T) {
tests := []struct {
name string
labelSelector map[string]string
additionalFilters []string
want string
}{
{
name: "labels only",
labelSelector: map[string]string{"shard": "1"},
additionalFilters: nil,
want: "labels.shard='1'",
},
{
name: "filter only",
labelSelector: nil,
additionalFilters: []string{"status.conditions.Ready='False'"},
want: "status.conditions.Ready='False'",
},
{
name: "both labels and filter",
labelSelector: map[string]string{"shard": "1"},
additionalFilters: []string{"status.conditions.Ready='False'"},
want: "labels.shard='1' and status.conditions.Ready='False'",
},
{
name: "multiple filters",
labelSelector: map[string]string{"shard": "1"},
additionalFilters: []string{"status.conditions.Ready='True'", "status.conditions.Ready.last_updated_time<='2025-01-01T00:00:00Z'"},
want: "labels.shard='1' and status.conditions.Ready='True' and status.conditions.Ready.last_updated_time<='2025-01-01T00:00:00Z'",
},
{
name: "both empty",
labelSelector: nil,
additionalFilters: nil,
want: "",
},
{
name: "empty string filter ignored",
labelSelector: nil,
additionalFilters: []string{"", "status.conditions.Ready='False'", ""},
want: "status.conditions.Ready='False'",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := buildSearchString(tt.labelSelector, tt.additionalFilters)
if got != tt.want {
t.Errorf("buildSearchString() = %q, want %q", got, tt.want)
}
})
}
}

// TestFetchResources_WithAdditionalFilters tests FetchResources with combined label selectors and condition filters
func TestFetchResources_WithAdditionalFilters(t *testing.T) {
var receivedSearchParam string

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
receivedSearchParam = r.URL.Query().Get("search")

response := map[string]interface{}{
"kind": "ClusterList",
"page": 1,
"size": 0,
"total": 0,
"items": []map[string]interface{}{},
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(response); err != nil {
t.Logf("Error encoding response: %v", err)
}
}))
defer server.Close()

client, _ := NewHyperFleetClient(server.URL, 10*time.Second)
labelSelector := map[string]string{"shard": "1"}

_, err := client.FetchResources(context.Background(), ResourceTypeClusters, labelSelector, "status.conditions.Ready='False'")
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}

expectedSearch := "labels.shard='1' and status.conditions.Ready='False'"
if receivedSearchParam != expectedSearch {
t.Errorf("Expected search parameter %q, got %q", expectedSearch, receivedSearchParam)
}
}

// TestFetchResources_WithConditionFilterOnly tests FetchResources with only condition filters (no labels)
func TestFetchResources_WithConditionFilterOnly(t *testing.T) {
var receivedSearchParam string

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
receivedSearchParam = r.URL.Query().Get("search")

response := map[string]interface{}{
"kind": "ClusterList",
"page": 1,
"size": 0,
"total": 0,
"items": []map[string]interface{}{},
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(response); err != nil {
t.Logf("Error encoding response: %v", err)
}
}))
defer server.Close()

client, _ := NewHyperFleetClient(server.URL, 10*time.Second)

_, err := client.FetchResources(context.Background(), ResourceTypeClusters, nil, "status.conditions.Ready='True'")
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}

expectedSearch := "status.conditions.Ready='True'"
if receivedSearchParam != expectedSearch {
t.Errorf("Expected search parameter %q, got %q", expectedSearch, receivedSearchParam)
}
}
81 changes: 76 additions & 5 deletions internal/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sentinel

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -17,6 +18,15 @@ import (
"github.com/openshift-hyperfleet/hyperfleet-sentinel/pkg/logger"
)

const (
// notReadyFilter selects resources whose Ready condition is False.
notReadyFilter = "status.conditions.Ready='False'"

// staleReadyFilterFmt selects resources whose Ready condition is True but
// last_updated_time is older than the given cutoff (RFC 3339 timestamp).
staleReadyFilterFmt = "status.conditions.Ready='True' and status.conditions.Ready.last_updated_time<='%s'"
)

// Sentinel polls the HyperFleet API and triggers reconciliation events
type Sentinel struct {
config *config.SentinelConfig
Expand Down Expand Up @@ -102,15 +112,16 @@ func (s *Sentinel) trigger(ctx context.Context) error {
ctx = logger.WithSubset(ctx, resourceType)
ctx = logger.WithTopic(ctx, topic)

s.logger.V(2).Info(ctx, "Starting trigger cycle")
s.logger.Debug(ctx, "Starting trigger cycle")

// Convert label selectors to map for filtering
labelSelector := s.config.ResourceSelector.ToMap()

// Fetch resources from HyperFleet API
resources, err := s.client.FetchResources(ctx, client.ResourceType(s.config.ResourceType), labelSelector)
// Fetch resources using condition-based server-side filtering:
// Query 1: Not-ready resources (need frequent reconciliation)
// Query 2: Stale ready resources (exceeded max age)
resources, err := s.fetchFilteredResources(ctx, labelSelector)
if err != nil {
// Record API error
metrics.UpdateAPIErrorsMetric(resourceType, resourceSelector, "fetch_error")
return fmt.Errorf("failed to fetch resources: %w", err)
}
Expand All @@ -126,6 +137,11 @@ func (s *Sentinel) trigger(ctx context.Context) error {
for i := range resources {
resource := &resources[i]

if resource.ID == "" {
s.logger.Warnf(ctx, "Skipping resource with empty ID kind=%s", resource.Kind)
continue
}

decision := s.decisionEngine.Evaluate(resource, now)

if decision.ShouldPublish {
Expand Down Expand Up @@ -168,7 +184,7 @@ func (s *Sentinel) trigger(ctx context.Context) error {
// Record skipped resource
metrics.UpdateResourcesSkippedMetric(resourceType, resourceSelector, decision.Reason)

s.logger.V(2).Infof(skipCtx, "Skipped resource resource_id=%s ready=%t",
s.logger.Debugf(skipCtx, "Skipped resource resource_id=%s ready=%t",
resource.ID, resource.Status.Ready)
skipped++
}
Expand All @@ -192,6 +208,61 @@ func (s *Sentinel) trigger(ctx context.Context) error {
return nil
}

// fetchFilteredResources makes two targeted API queries to fetch only resources
// that likely need reconciliation, reducing network traffic compared to fetching
// all resources:
//
// 1. Not-ready resources: status.conditions.Ready='False'
// 2. Stale ready resources: Ready='True' with last_updated_time older than max_age_ready
//
// Results are merged and deduplicated by resource ID. The DecisionEngine still
// evaluates the filtered set in memory (e.g., for generation-based checks).
func (s *Sentinel) fetchFilteredResources(ctx context.Context, labelSelector map[string]string) ([]client.Resource, error) {
rt := client.ResourceType(s.config.ResourceType)

// Query 1: Not-ready resources
notReadyResources, err := s.client.FetchResources(ctx, rt, labelSelector, notReadyFilter)
if err != nil {
return nil, fmt.Errorf("failed to fetch not-ready resources: %w", err)
}
s.logger.Debugf(ctx, "Fetched not-ready resources count=%d", len(notReadyResources))

// Query 2: Stale ready resources (last_updated_time exceeded max_age_ready)
cutoff := time.Now().Add(-s.config.MaxAgeReady)
staleFilter := fmt.Sprintf(staleReadyFilterFmt, cutoff.Format(time.RFC3339))
staleResources, err := s.client.FetchResources(ctx, rt, labelSelector, staleFilter)
if err != nil {
// Propagate context cancellation/timeout — the caller must see these
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
// Graceful degradation: for transient/API errors, continue with not-ready results
s.logger.Errorf(ctx, "Failed to fetch stale resources, continuing with not-ready only: %v", err)
return notReadyResources, nil
}
s.logger.Debugf(ctx, "Fetched stale ready resources count=%d", len(staleResources))

return mergeResources(notReadyResources, staleResources), nil
}

// mergeResources combines two resource slices, deduplicating by resource ID.
// Resources from the first slice take precedence when duplicates are found.
func mergeResources(a, b []client.Resource) []client.Resource {
seen := make(map[string]struct{}, len(a))
result := make([]client.Resource, 0, len(a)+len(b))

for i := range a {
seen[a[i].ID] = struct{}{}
result = append(result, a[i])
}
for i := range b {
if _, exists := seen[b[i].ID]; !exists {
result = append(result, b[i])
}
}
return result
}

// buildEventData builds the CloudEvent data payload for a resource using the
// configured payload builder.
func (s *Sentinel) buildEventData(ctx context.Context, resource *client.Resource, decision engine.Decision) map[string]interface{} {
Expand Down
Loading