From 2e0cdd9bb11598303b8626eff713561e7f77c4de Mon Sep 17 00:00:00 2001 From: Marco Supino Date: Sun, 8 Feb 2026 11:44:31 +0200 Subject: [PATCH 1/3] ignore other pods issues --- deploy/deploy.go | 12 ++ pods/status.go | 29 ++++ topo/node/drivenets/drivenets.go | 249 ++++++++++++++++++++++++++++++- topo/topo.go | 2 + 4 files changed, 291 insertions(+), 1 deletion(-) diff --git a/deploy/deploy.go b/deploy/deploy.go index 47ae5cc1..3f090620 100644 --- a/deploy/deploy.go +++ b/deploy/deploy.go @@ -224,6 +224,18 @@ func (d *Deployment) Deploy(ctx context.Context, kubecfg string) (rerr error) { log.Warningf("Failed to start pod watcher: %v", err) } else { w.SetProgress(d.Progress) + // Restrict watcher to known namespaces managed during deployment to avoid noise + // from unrelated user workloads in other namespaces. + w.AllowNamespaces( + "kube-system", + "metallb-system", + "meshnet", + "arista-ceoslab-operator-system", + "lemming-operator", + "srlinux-controller-system", + "ixiatg-op-system", + "cdnos-controller-system", + ) defer func() { cancel() rerr = w.Cleanup(rerr) diff --git a/pods/status.go b/pods/status.go index 6f30310b..782a7aad 100644 --- a/pods/status.go +++ b/pods/status.go @@ -35,6 +35,7 @@ type Watcher struct { progress bool currentNamespace string currentPod types.UID + allowedNS map[string]struct{} } // NewWatcher returns a Watcher on the provided client or an error. The cancel @@ -78,6 +79,24 @@ func (w *Watcher) SetProgress(value bool) { w.mu.Unlock() } +// AllowNamespaces restricts the watcher to only consider pods in the provided namespaces. +// If no namespaces are provided, all namespaces are considered. +func (w *Watcher) AllowNamespaces(namespaces ...string) { + w.mu.Lock() + defer w.mu.Unlock() + if len(namespaces) == 0 { + w.allowedNS = nil + return + } + w.allowedNS = make(map[string]struct{}, len(namespaces)) + for _, ns := range namespaces { + if ns == "" { + continue + } + w.allowedNS[ns] = struct{}{} + } +} + func (w *Watcher) stop() { w.mu.Lock() stop := w.wstop @@ -128,6 +147,16 @@ func (w *Watcher) display(format string, v ...any) { } func (w *Watcher) updatePod(s *PodStatus) bool { + // If allowed namespaces are configured, ignore pods outside them. + w.mu.Lock() + allowed := w.allowedNS + w.mu.Unlock() + if allowed != nil { + if _, ok := allowed[s.Namespace]; !ok { + return true + } + } + newNamespace := s.Namespace != w.currentNamespace var newState string diff --git a/topo/node/drivenets/drivenets.go b/topo/node/drivenets/drivenets.go index ebe95fa0..ea4992be 100644 --- a/topo/node/drivenets/drivenets.go +++ b/topo/node/drivenets/drivenets.go @@ -26,16 +26,19 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/drivenets/cdnos-controller/api/v1/clientset" "github.com/openconfig/kne/topo/node" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" cdnosv1 "github.com/drivenets/cdnos-controller/api/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" log "k8s.io/klog/v2" @@ -255,9 +258,108 @@ func (n *Node) cdnosCreate(ctx context.Context) error { if _, err := cs.CdnosV1alpha1().Cdnoss(n.Namespace).Create(ctx, dut, metav1.CreateOptions{}); err != nil { return fmt.Errorf("failed to create cdnos: %v", err) } + // Ensure the controller-created Service has required Azure LB annotations when on AKS. + // Creation will fail if annotations cannot be applied within the timeout. + if err := n.annotateCdnosService(ctx); err != nil { + return fmt.Errorf("failed to annotate service for %s: %v", n.Name(), err) + } return nil } +// annotateCdnosService waits for the controller-created Service named "service-" +// and adds Azure LoadBalancer annotations required by the user. +func (n *Node) annotateCdnosService(ctx context.Context) error { + if !isAzureAKS(n.KubeClient) { + log.V(1).Infof("Azure AKS not detected; skipping service annotation for %q", n.Name()) + return nil + } + log.Infof("Azure AKS detected; annotating controller-managed Services for %q", n.Name()) + deadline := time.Now().Add(10 * time.Minute) + desired := map[string]string{ + "service.beta.kubernetes.io/azure-load-balancer-internal": "true", + } + // Build no-probe rules from this node's services (outside ports). + for port := range n.Proto.Services { + key := fmt.Sprintf("service.beta.kubernetes.io/port_%d_no_probe_rule", port) + desired[key] = "true" + } + for { + if time.Now().After(deadline) { + return fmt.Errorf("timeout waiting to annotate services for %q", n.Name()) + } + svcs, err := n.servicesForNode(ctx) + if err != nil || len(svcs) == 0 { + time.Sleep(1 * time.Second) + continue + } + allAnnotated := true + for i := range svcs { + s := &svcs[i] + changed := false + if s.Annotations == nil { + s.Annotations = map[string]string{} + changed = true + } + for k, v := range desired { + if s.Annotations[k] != v { + s.Annotations[k] = v + changed = true + } + } + if changed { + // Use a short-lived background context to avoid parent ctx cancellations. + updateCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, err := n.KubeClient.CoreV1().Services(n.Namespace).Update(updateCtx, s, metav1.UpdateOptions{}) + cancel() + if err != nil { + // Retry once on conflict with a fresh GET + if apierrors.IsConflict(err) { + getCtx, cancelGet := context.WithTimeout(context.Background(), 5*time.Second) + fresh, gerr := n.KubeClient.CoreV1().Services(n.Namespace).Get(getCtx, s.Name, metav1.GetOptions{}) + cancelGet() + if gerr == nil { + if fresh.Annotations == nil { + fresh.Annotations = map[string]string{} + } + for k, v := range desired { + fresh.Annotations[k] = v + } + updateCtx2, cancelUpd2 := context.WithTimeout(context.Background(), 5*time.Second) + _, uerr := n.KubeClient.CoreV1().Services(n.Namespace).Update(updateCtx2, fresh, metav1.UpdateOptions{}) + cancelUpd2() + if uerr == nil { + log.Infof("Annotated Service %q with Azure LB annotations (after conflict retry)", s.Name) + continue + } + } + } + allAnnotated = false + continue + } + log.Infof("Annotated Service %q with Azure LB annotations", s.Name) + } + // Verify + getCtx, cancelGet := context.WithTimeout(context.Background(), 5*time.Second) + got, err := n.KubeClient.CoreV1().Services(n.Namespace).Get(getCtx, s.Name, metav1.GetOptions{}) + cancelGet() + if err != nil { + allAnnotated = false + continue + } + for k, v := range desired { + if got.Annotations[k] != v { + allAnnotated = false + break + } + } + } + if allAnnotated { + return nil + } + time.Sleep(500 * time.Millisecond) + } +} + func (n *Node) Status(ctx context.Context) (node.Status, error) { if !isModelCdnos(n.Impl.Proto.Model) { return node.StatusUnknown, fmt.Errorf("invalid model specified") @@ -298,7 +400,112 @@ func (n *Node) cdnosDelete(ctx context.Context) error { if err != nil { return err } - return cs.CdnosV1alpha1().Cdnoss(n.Namespace).Delete(ctx, n.Name(), metav1.DeleteOptions{}) + // 1) Start teardown by deleting all Cdnos CRs in the namespace + // (controller will clean up owned objects for each). + list, err := cs.CdnosV1alpha1().Cdnoss(n.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + if len(list.Items) == 0 { + log.V(1).Infof("No Cdnos CRs found in namespace %q", n.Namespace) + } else { + var crNames []string + for _, item := range list.Items { + crNames = append(crNames, item.Name) + } + log.Infof("Deleting Cdnos CRs in %q: %v", n.Namespace, crNames) + for _, item := range list.Items { + if err := cs.CdnosV1alpha1().Cdnoss(n.Namespace).Delete(ctx, item.Name, metav1.DeleteOptions{}); err != nil { + return err + } + } + } + + // 2) Monitor Services associated with this node until the controller removes them. + svcs, _ := n.servicesForNode(ctx) + if len(svcs) == 0 { + log.V(1).Infof("No Services found for node %q", n.Name()) + } else { + var svcNames []string + for _, s := range svcs { + svcNames = append(svcNames, s.Name) + } + log.Infof("Monitoring Services for %q to be removed by controller: %v", n.Name(), svcNames) + } + // Wait for Services to be removed (longer on AKS due to LoadBalancer cleanup). + waitDeadline := time.Now().Add(2 * time.Minute) + if isAzureAKS(n.KubeClient) { + waitDeadline = time.Now().Add(10 * time.Minute) + log.Infof("AKS detected; waiting up to %v for all Services to be removed", time.Until(waitDeadline).Truncate(time.Second)) + } else { + log.V(1).Infof("Azure AKS not detected; waiting up to %v for all Services to be removed", time.Until(waitDeadline).Truncate(time.Second)) + } + start := time.Now() + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + if time.Now().After(waitDeadline) { + log.Warningf("Timeout waiting for Services removal; continuing teardown") + break + } + svcs, _ = n.servicesForNode(ctx) + remaining := len(svcs) + if remaining == 0 { + log.Infof("All Services for %q removed after %v", n.Name(), time.Since(start).Truncate(time.Second)) + break + } + select { + case <-ticker.C: + var names []string + for _, s := range svcs { + names = append(names, s.Name) + } + log.Infof("Waiting for Services removal for %q (%d remaining: %v, %v elapsed)", n.Name(), remaining, names, time.Since(start).Truncate(time.Second)) + default: + } + time.Sleep(2 * time.Second) + } + return nil +} + +// servicesForNode lists Services in the namespace that are associated with this node. +// It matches by: +// - name equals "service-" +// - label "name" equals node name (per controller) +// - selector app == node name +// - ownerReference is Cdnos/ +func (n *Node) servicesForNode(ctx context.Context) ([]corev1.Service, error) { + // Use a short-lived background context for API calls to avoid parent ctx deadline cancellations. + _ = ctx // ctx is intentionally unused to avoid parent deadline cancellations + listCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + list, err := n.KubeClient.CoreV1().Services(n.Namespace).List(listCtx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + var out []corev1.Service + wantName := fmt.Sprintf("service-%s", n.Name()) + for _, s := range list.Items { + if s.Name == wantName { + out = append(out, s) + continue + } + if s.Labels["name"] == n.Name() { + out = append(out, s) + continue + } + if s.Spec.Selector != nil && s.Spec.Selector["app"] == n.Name() { + out = append(out, s) + continue + } + for _, or := range s.OwnerReferences { + if or.Kind == "Cdnos" && or.Name == n.Name() { + out = append(out, s) + break + } + } + } + return out, nil } func (n *Node) ResetCfg(ctx context.Context) error { @@ -380,6 +587,46 @@ func init() { node.Vendor(tpb.Vendor_DRIVENETS, New) } +// isAzureAKS attempts to detect whether the current cluster is Azure AKS. +// It returns true if any node has a providerID starting with "azure://" +// or has any label prefixed with "kubernetes.azure.com/". +func isAzureAKS(k kubernetes.Interface) bool { + // Allow manual override for environments where listing nodes is restricted. + if v := os.Getenv("KNE_FORCE_AKS"); v == "1" || strings.ToLower(v) == "true" { + log.V(1).Infof("AKS detection overridden via KNE_FORCE_AKS") + return true + } + if v := os.Getenv("KNE_FORCE_AZURE_ANNOTATIONS"); v == "1" || strings.ToLower(v) == "true" { + log.V(1).Infof("AKS detection overridden via KNE_FORCE_AZURE_ANNOTATIONS") + return true + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + nodes, err := k.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + log.V(1).Infof("AKS detection: failed to list nodes: %v", err) + return false + } + if len(nodes.Items) == 0 { + log.V(1).Infof("AKS detection: no nodes found in cluster") + return false + } + for _, n := range nodes.Items { + if strings.HasPrefix(n.Spec.ProviderID, "azure://") { + log.V(1).Infof("AKS detection: node %q providerID %q indicates Azure", n.Name, n.Spec.ProviderID) + return true + } + for key := range n.Labels { + if strings.HasPrefix(key, "kubernetes.azure.com/") { + log.V(1).Infof("AKS detection: node %q has Azure label %q", n.Name, key) + return true + } + } + } + log.V(1).Infof("AKS detection: no Azure providerID or labels found on any node") + return false +} + func (n *Node) CreateConfig(ctx context.Context) (*corev1.Volume, error) { pb := n.Proto var data []byte diff --git a/topo/topo.go b/topo/topo.go index 85d5e2c7..a4aeb94b 100644 --- a/topo/topo.go +++ b/topo/topo.go @@ -270,6 +270,8 @@ func (m *Manager) Create(ctx context.Context, timeout time.Duration) (rerr error log.Warningf("Failed to start pod watcher: %v", err) } else { w.SetProgress(m.progress) + // Only watch pods in this topology's namespace to avoid unrelated failures. + w.AllowNamespaces(m.topo.Name) defer func() { cancel() rerr = w.Cleanup(rerr) From 4fe4c7fa8de54e61ad3dd38f1c5a6242069666a8 Mon Sep 17 00:00:00 2001 From: marco Date: Sun, 8 Feb 2026 12:10:52 +0200 Subject: [PATCH 2/3] update timers --- topo/node/drivenets/drivenets.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/topo/node/drivenets/drivenets.go b/topo/node/drivenets/drivenets.go index ea4992be..95be1f99 100644 --- a/topo/node/drivenets/drivenets.go +++ b/topo/node/drivenets/drivenets.go @@ -600,9 +600,9 @@ func isAzureAKS(k kubernetes.Interface) bool { log.V(1).Infof("AKS detection overridden via KNE_FORCE_AZURE_ANNOTATIONS") return true } - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - nodes, err := k.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + nodes, err := k.CoreV1().Nodes().List(ctx, metav1.ListOptions{Limit: 1}) if err != nil { log.V(1).Infof("AKS detection: failed to list nodes: %v", err) return false From bf0ce3efd68c8291c37027c17fcd669e0ce77c21 Mon Sep 17 00:00:00 2001 From: marco Date: Sun, 8 Feb 2026 12:13:10 +0200 Subject: [PATCH 3/3] fixes --- topo/node/drivenets/drivenets.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/topo/node/drivenets/drivenets.go b/topo/node/drivenets/drivenets.go index 95be1f99..96be93b0 100644 --- a/topo/node/drivenets/drivenets.go +++ b/topo/node/drivenets/drivenets.go @@ -258,8 +258,8 @@ func (n *Node) cdnosCreate(ctx context.Context) error { if _, err := cs.CdnosV1alpha1().Cdnoss(n.Namespace).Create(ctx, dut, metav1.CreateOptions{}); err != nil { return fmt.Errorf("failed to create cdnos: %v", err) } - // Ensure the controller-created Service has required Azure LB annotations when on AKS. - // Creation will fail if annotations cannot be applied within the timeout. + // Ensure the controller-created Service has required Azure LB annotations. + // Azure annotations are required for proper LoadBalancer behavior. if err := n.annotateCdnosService(ctx); err != nil { return fmt.Errorf("failed to annotate service for %s: %v", n.Name(), err) } @@ -267,11 +267,15 @@ func (n *Node) cdnosCreate(ctx context.Context) error { } // annotateCdnosService waits for the controller-created Service named "service-" -// and adds Azure LoadBalancer annotations required by the user. +// and adds Azure LoadBalancer annotations. These annotations are required for proper operation. func (n *Node) annotateCdnosService(ctx context.Context) error { - if !isAzureAKS(n.KubeClient) { - log.V(1).Infof("Azure AKS not detected; skipping service annotation for %q", n.Name()) - return nil + // Always apply Azure annotations - they're required and harmless on non-Azure clusters + // If Azure detection fails, still apply annotations to ensure proper operation + isAzure := isAzureAKS(n.KubeClient) + if !isAzure { + log.V(1).Infof("Azure AKS not detected, but applying Azure annotations anyway (required for operation)") + } else { + log.Infof("Azure AKS detected; annotating controller-managed Services for %q", n.Name()) } log.Infof("Azure AKS detected; annotating controller-managed Services for %q", n.Name()) deadline := time.Now().Add(10 * time.Minute)