From 7ed69fbeb61e639581c668fdc14317bf329885e6 Mon Sep 17 00:00:00 2001 From: Pedro Coutinho Date: Thu, 7 May 2026 12:32:40 -0700 Subject: [PATCH 1/4] [CORE-12452] feat: auto-recover host-networked pods when node IP changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Detect Calico's host-networked pods (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches the node's current InternalIP, and delete them so the Deployment / DaemonSet controller recreates them with the correct IP. This works around an upstream Kubernetes limitation [1] where status.podIPs is immutable for hostNetwork pods once set: when a node's IP changes (e.g. KubeVirt VM reboot pulls a new DHCP lease), existing hostNetwork pods keep their old IP. The kube EndpointSlice controller reads from status.podIPs, so the calico-typha EndpointSlice ends up advertising stale IPs and Felix times out connecting to Typha. Restarting the container does not help — only deleting and recreating the pod itself causes the kubelet to repopulate status.podIPs from the current node IP. Implementation lives in the existing Typha autoscaler tick (every 10s, already has a Node informer cache): - Compare each pod's status.podIPs to its node's status.InternalIP (which the kubelet does update promptly via heartbeat). - Delete stale pods, paced one per workload-batch per tick. Batch size is read from each workload's existing rolling-update setting: the Typha PDB's maxUnavailable, or the DaemonSet's updateStrategy.rollingUpdate.maxUnavailable. Falls back to 1 if not set or if the resolved value is < 1 (minimum-progress guarantee). - Order: Typha first; if any Typha was deleted this cycle, skip the calico-node deletions until the next tick to give the new Typha pod a clean window to come up. Linux and Windows DaemonSets are paced independently of each other. - Skipped entirely on the non-cluster-host autoscaler instance. Tested by ODCN on KubeVirt: 3-node cluster with all node IPs changed, all calico-node and Typha pods recovered automatically without manual intervention. [1] https://github.com/kubernetes/kubernetes/issues/93897 Jira: CI-1951, CORE-12452 --- .../installation/typha_autoscaler.go | 179 +++++++++++ .../installation/typha_autoscaler_test.go | 290 ++++++++++++++++++ 2 files changed, 469 insertions(+) diff --git a/pkg/controller/installation/typha_autoscaler.go b/pkg/controller/installation/typha_autoscaler.go index 8d675591c4..a4a907eb37 100644 --- a/pkg/controller/installation/typha_autoscaler.go +++ b/pkg/controller/installation/typha_autoscaler.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -156,6 +157,35 @@ func (t *typhaAutoscaler) start(ctx context.Context) { } else { degraded = false } + + // Check for host-networked pods with stale IPs (e.g., after a node + // IP change) and delete them so they get recreated with the correct + // IP. Typha is checked first; if any Typha pod was deleted this + // cycle, calico-node deletions are skipped to give the new Typha a + // clean window to come up before churning calico-node pods that + // depend on it. + typhaBatch := t.resolveTyphaMaxUnavailable() + deletedTypha := t.deleteStaleHostNetworkPods( + "calico-typha", + fmt.Sprintf("%s=%s", render.AppLabelName, render.TyphaK8sAppName), + typhaBatch, + ) > 0 + if !deletedTypha { + // Linux and Windows DaemonSets are paced independently of each + // other. + linuxBatch := t.resolveDaemonSetMaxUnavailable(render.CalicoNodeObjectName) + t.deleteStaleHostNetworkPods( + "calico-node", + fmt.Sprintf("%s=%s", render.AppLabelName, render.CalicoNodeObjectName), + linuxBatch, + ) + windowsBatch := t.resolveDaemonSetMaxUnavailable(render.WindowsNodeObjectName) + t.deleteStaleHostNetworkPods( + "calico-node-windows", + fmt.Sprintf("%s=%s", render.AppLabelName, render.WindowsNodeObjectName), + windowsBatch, + ) + } case errCh := <-t.triggerRunChan: if err := t.autoscaleReplicas(); err != nil { degraded = true @@ -280,6 +310,155 @@ func (t *typhaAutoscaler) getNodeCounts() (int, int) { return schedulable, linuxNodes } +// deleteStaleHostNetworkPods lists pods matching labelSelector in the +// calico-system namespace and compares each pod's status.podIPs against the +// current InternalIP of the node the pod is running on. If the IPs don't match +// (stale pod IP after a node IP change), up to maxBatch pods are deleted so the +// owning controller (Deployment / DaemonSet) recreates them with the correct IP. +// +// This is necessary because Kubernetes does not update status.podIPs for +// existing hostNetwork pods when the node's IP changes — it is explicitly +// immutable in the kubelet: +// https://github.com/kubernetes/kubernetes/issues/93897. +// +// Returns the number of pods deleted in this call. +// +// workloadName is used only for logging. +func (t *typhaAutoscaler) deleteStaleHostNetworkPods(workloadName, labelSelector string, maxBatch int) int { + if t.nonClusterHost { + return 0 + } + if maxBatch < 1 { + maxBatch = 1 + } + + pods, err := t.client.CoreV1().Pods(common.CalicoNamespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + typhaLog.V(5).Info("Failed to list pods for stale IP check", "workload", workloadName, "error", err) + return 0 + } + + // Build a map of node name → InternalIP from the informer cache. + nodeInternalIPs := map[string]string{} + for _, obj := range t.indexInformer.GetIndexer().List() { + n := obj.(*v1.Node) + for _, addr := range n.Status.Addresses { + if addr.Type == v1.NodeInternalIP { + nodeInternalIPs[n.Name] = addr.Address + break + } + } + } + + deleted := 0 + for i := range pods.Items { + if deleted >= maxBatch { + break + } + pod := &pods.Items[i] + if pod.Spec.NodeName == "" { + continue + } + nodeIP, ok := nodeInternalIPs[pod.Spec.NodeName] + if !ok { + continue + } + + // Check if any of the pod's IPs match the node's current InternalIP. + match := false + for _, podIP := range pod.Status.PodIPs { + if podIP.IP == nodeIP { + match = true + break + } + } + if match { + continue + } + + // Pod IP is stale — delete the pod so the owning controller recreates + // it with the correct IP. + podIPs := make([]string, len(pod.Status.PodIPs)) + for j, pip := range pod.Status.PodIPs { + podIPs[j] = pip.IP + } + typhaLog.Info("Pod has stale IP after node IP change; deleting pod so it gets recreated with the correct IP", + "workload", workloadName, "pod", pod.Name, "node", pod.Spec.NodeName, + "podIPs", podIPs, "nodeInternalIP", nodeIP) + if err := t.client.CoreV1().Pods(common.CalicoNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil { + typhaLog.Error(err, "Failed to delete pod with stale IP", "workload", workloadName, "pod", pod.Name) + continue + } + deleted++ + } + return deleted +} + +// resolveTyphaMaxUnavailable reads the maxUnavailable value from the Typha +// PodDisruptionBudget and resolves it to an absolute pod count using the +// current Typha replica count. Returns 1 if the PDB doesn't exist, doesn't +// have maxUnavailable set, or if the resolved value is < 1 (so progress +// is always guaranteed). +func (t *typhaAutoscaler) resolveTyphaMaxUnavailable() int { + const fallback = 1 + pdb, err := t.client.PolicyV1().PodDisruptionBudgets(common.CalicoNamespace).Get( + context.Background(), common.TyphaDeploymentName, metav1.GetOptions{}, + ) + if err != nil || pdb.Spec.MaxUnavailable == nil { + return fallback + } + replicas := int(t.activeReplicas) + if replicas <= 0 { + // activeReplicas is populated by the informer; fall back to fetching + // the deployment if it hasn't been observed yet. + typha, err := t.client.AppsV1().Deployments(common.CalicoNamespace).Get( + context.Background(), common.TyphaDeploymentName, metav1.GetOptions{}, + ) + if err == nil && typha.Spec.Replicas != nil { + replicas = int(*typha.Spec.Replicas) + } + } + if replicas < 1 { + return fallback + } + val, err := intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MaxUnavailable, replicas, true) + if err != nil || val < 1 { + return fallback + } + return val +} + +// resolveDaemonSetMaxUnavailable reads the maxUnavailable value from the named +// DaemonSet's update strategy and resolves it to an absolute pod count using +// the desired DaemonSet pod count. Returns 1 if the DaemonSet doesn't exist, +// doesn't have a RollingUpdate strategy, or if the resolved value is < 1. +func (t *typhaAutoscaler) resolveDaemonSetMaxUnavailable(name string) int { + const fallback = 1 + ds, err := t.client.AppsV1().DaemonSets(common.CalicoNamespace).Get( + context.Background(), name, metav1.GetOptions{}, + ) + if err != nil { + return fallback + } + if ds.Spec.UpdateStrategy.RollingUpdate == nil || + ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable == nil { + return fallback + } + desired := int(ds.Status.DesiredNumberScheduled) + if desired < 1 { + return fallback + } + val, err := intstr.GetScaledValueFromIntOrPercent( + ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, desired, true, + ) + if err != nil || val < 1 { + return fallback + } + return val +} + // getHostEndpointCounts returns the number of host endpoints in the cluster that are not created by the kube-controllers. func (t *typhaAutoscaler) getHostEndpointCounts() int { heps := 0 diff --git a/pkg/controller/installation/typha_autoscaler_test.go b/pkg/controller/installation/typha_autoscaler_test.go index 3ddedcef51..e33f93d283 100644 --- a/pkg/controller/installation/typha_autoscaler_test.go +++ b/pkg/controller/installation/typha_autoscaler_test.go @@ -29,8 +29,10 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" kfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" @@ -273,6 +275,294 @@ var _ = Describe("Test typha autoscaler ", func() { statusManager.AssertExpectations(GinkgoT()) }) + + Context("stale pod IP detection", func() { + var ta *typhaAutoscaler + + const ( + typhaSelector = "k8s-app=calico-typha" + calicoNodeSelector = "k8s-app=calico-node" + windowsNodeSelector = "k8s-app=calico-node-windows" + ) + + BeforeEach(func() { + ta = newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) + ta.start(ctx) + }) + + createNodeWithIP := func(name, ip string) *corev1.Node { + node := CreateNode(c, name, map[string]string{"kubernetes.io/os": "linux"}, nil) + node.Status.Addresses = []corev1.NodeAddress{ + {Type: corev1.NodeInternalIP, Address: ip}, + } + var err error + node, err = c.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + return node + } + + createPodWithLabel := func(name, nodeName, podIP, k8sApp string) *corev1.Pod { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "calico-system", + Labels: map[string]string{"k8s-app": k8sApp}, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + }, + Status: corev1.PodStatus{ + PodIPs: []corev1.PodIP{{IP: podIP}}, + }, + } + var err error + pod, err = c.CoreV1().Pods("calico-system").Create(ctx, pod, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + // fake client doesn't persist status on Create; update it separately. + pod.Status.PodIPs = []corev1.PodIP{{IP: podIP}} + pod, err = c.CoreV1().Pods("calico-system").UpdateStatus(ctx, pod, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + return pod + } + + // waitForNodes blocks until the node informer has observed n schedulable nodes. + waitForNodes := func(n int) { + EventuallyWithOffset(1, func() int { + all, _ := ta.getNodeCounts() + return all + }, 5*time.Second).Should(Equal(n)) + } + + listPods := func(labelSelector string) []corev1.Pod { + pods, err := c.CoreV1().Pods("calico-system").List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + Expect(err).To(BeNil()) + return pods.Items + } + + It("returns 0 and deletes nothing when all pod IPs match the node InternalIP", func() { + createNodeWithIP("node1", "10.0.0.1") + createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") + waitForNodes(1) + + deleted := ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1) + Expect(deleted).To(Equal(0)) + Expect(listPods(typhaSelector)).To(HaveLen(1)) + }) + + It("returns 1 and deletes a Typha pod whose IP doesn't match the node InternalIP", func() { + createNodeWithIP("node1", "10.0.0.2") + createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") + waitForNodes(1) + + deleted := ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1) + Expect(deleted).To(Equal(1)) + Expect(listPods(typhaSelector)).To(HaveLen(0)) + }) + + It("deletes a stale calico-node (Linux) pod", func() { + createNodeWithIP("node1", "10.0.0.2") + createPodWithLabel("calico-node-abc", "node1", "10.0.0.1", "calico-node") + waitForNodes(1) + + deleted := ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 1) + Expect(deleted).To(Equal(1)) + Expect(listPods(calicoNodeSelector)).To(HaveLen(0)) + }) + + It("deletes a stale calico-node-windows pod", func() { + createNodeWithIP("node1", "10.0.0.2") + createPodWithLabel("calico-node-windows-abc", "node1", "10.0.0.1", "calico-node-windows") + waitForNodes(1) + + deleted := ta.deleteStaleHostNetworkPods("calico-node-windows", windowsNodeSelector, 1) + Expect(deleted).To(Equal(1)) + Expect(listPods(windowsNodeSelector)).To(HaveLen(0)) + }) + + It("respects maxBatch=1 (default): exactly one stale pod deleted per call", func() { + createNodeWithIP("node1", "10.0.0.2") + createNodeWithIP("node2", "10.0.0.4") + createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") + createPodWithLabel("typha-def", "node2", "10.0.0.3", "calico-typha") + waitForNodes(2) + + Expect(ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1)).To(Equal(1)) + Expect(listPods(typhaSelector)).To(HaveLen(1)) + + Expect(ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1)).To(Equal(1)) + Expect(listPods(typhaSelector)).To(HaveLen(0)) + }) + + It("respects maxBatch=N (>1): up to N stale pods deleted per call", func() { + for i := 0; i < 5; i++ { + nodeName := fmt.Sprintf("node%d", i) + createNodeWithIP(nodeName, fmt.Sprintf("10.0.0.%d", 100+i)) + createPodWithLabel(fmt.Sprintf("calico-node-%d", i), nodeName, fmt.Sprintf("10.0.0.%d", i+1), "calico-node") + } + waitForNodes(5) + + // maxBatch=3 → delete 3 of the 5 stale pods. + Expect(ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 3)).To(Equal(3)) + Expect(listPods(calicoNodeSelector)).To(HaveLen(2)) + + // Next call cleans up the remaining 2. + Expect(ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 3)).To(Equal(2)) + Expect(listPods(calicoNodeSelector)).To(HaveLen(0)) + }) + + It("treats maxBatch < 1 as 1 (minimum-progress fallback)", func() { + createNodeWithIP("node1", "10.0.0.2") + createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") + waitForNodes(1) + + // maxBatch=0 should still delete one pod (minimum-progress fallback). + Expect(ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 0)).To(Equal(1)) + Expect(listPods(typhaSelector)).To(HaveLen(0)) + }) + + It("does not delete a pod whose node is not in the informer cache", func() { + createPodWithLabel("typha-abc", "unknown-node", "10.0.0.1", "calico-typha") + + deleted := ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1) + Expect(deleted).To(Equal(0)) + Expect(listPods(typhaSelector)).To(HaveLen(1)) + }) + + It("paces Linux and Windows DaemonSets independently of each other", func() { + createNodeWithIP("node-linux", "10.0.0.2") + createNodeWithIP("node-win", "10.0.1.2") + createPodWithLabel("calico-node-abc", "node-linux", "10.0.0.1", "calico-node") + createPodWithLabel("calico-node-windows-abc", "node-win", "10.0.1.1", "calico-node-windows") + waitForNodes(2) + + // Both DaemonSets have a stale pod; each call deletes its own. + Expect(ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 1)).To(Equal(1)) + Expect(ta.deleteStaleHostNetworkPods("calico-node-windows", windowsNodeSelector, 1)).To(Equal(1)) + Expect(listPods(calicoNodeSelector)).To(HaveLen(0)) + Expect(listPods(windowsNodeSelector)).To(HaveLen(0)) + }) + }) + + Context("maxUnavailable resolution", func() { + var ta *typhaAutoscaler + + BeforeEach(func() { + ta = newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) + ta.start(ctx) + }) + + It("returns 1 for Typha when the PDB does not exist", func() { + Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(1)) + }) + + It("resolves an int Typha PDB maxUnavailable", func() { + var replicas int32 = 5 + _, err := c.AppsV1().Deployments("calico-system").Create(ctx, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: appsv1.DeploymentSpec{Replicas: &replicas}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + mu := intstr.FromInt(2) + _, err = c.PolicyV1().PodDisruptionBudgets("calico-system").Create(ctx, &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: policyv1.PodDisruptionBudgetSpec{MaxUnavailable: &mu}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(2)) + }) + + It("resolves a percentage Typha PDB maxUnavailable against replica count", func() { + var replicas int32 = 10 + _, err := c.AppsV1().Deployments("calico-system").Create(ctx, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: appsv1.DeploymentSpec{Replicas: &replicas}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + mu := intstr.FromString("25%") + _, err = c.PolicyV1().PodDisruptionBudgets("calico-system").Create(ctx, &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: policyv1.PodDisruptionBudgetSpec{MaxUnavailable: &mu}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + // 25% of 10 = 3 (rounded up from 2.5). + Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(3)) + }) + + It("returns 1 for Typha when maxUnavailable resolves to 0", func() { + var replicas int32 = 5 + _, err := c.AppsV1().Deployments("calico-system").Create(ctx, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: appsv1.DeploymentSpec{Replicas: &replicas}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + mu := intstr.FromInt(0) + _, err = c.PolicyV1().PodDisruptionBudgets("calico-system").Create(ctx, &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: policyv1.PodDisruptionBudgetSpec{MaxUnavailable: &mu}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(1)) + }) + + It("returns 1 for a DaemonSet without RollingUpdate", func() { + _, err := c.AppsV1().DaemonSets("calico-system").Create(ctx, &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-node", Namespace: "calico-system"}, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{Type: appsv1.OnDeleteDaemonSetStrategyType}, + }, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + Expect(ta.resolveDaemonSetMaxUnavailable("calico-node")).To(Equal(1)) + }) + + It("resolves an int DaemonSet maxUnavailable", func() { + mu := intstr.FromInt(4) + _, err := c.AppsV1().DaemonSets("calico-system").Create(ctx, &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-node", Namespace: "calico-system"}, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: &mu, + }, + }, + }, + Status: appsv1.DaemonSetStatus{DesiredNumberScheduled: 100}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + Expect(ta.resolveDaemonSetMaxUnavailable("calico-node")).To(Equal(4)) + }) + + It("resolves a percentage DaemonSet maxUnavailable against desired pod count", func() { + mu := intstr.FromString("10%") + _, err := c.AppsV1().DaemonSets("calico-system").Create(ctx, &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-node", Namespace: "calico-system"}, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: &mu, + }, + }, + }, + Status: appsv1.DaemonSetStatus{DesiredNumberScheduled: 100}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + // 10% of 100 = 10. + Expect(ta.resolveDaemonSetMaxUnavailable("calico-node")).To(Equal(10)) + }) + }) }) func verifyTyphaReplicas(c kubernetes.Interface, expectedReplicas int) { From 0d910f0d379f6a0d4ee00a31a36874a26d62f112 Mon Sep 17 00:00:00 2001 From: Pedro Coutinho Date: Fri, 8 May 2026 12:13:51 -0700 Subject: [PATCH 2/4] Make stale pod IP recovery configurable via Installation.Spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new Installation.Spec.StalePodIPRecovery field (Enabled / Disabled, default Enabled) that gates the host-networked stale pod IP detection and deletion logic in the typha autoscaler. When set to Disabled, the entire detection path is skipped each tick. The default-on choice is consistent with other operator-managed automation (e.g. the typha autoscaler is itself always-on with no toggle), avoids opt-in friction for users who don't know the bug exists, and provides an escape hatch for environments where the detection might interact badly with custom node-IP management. Implementation notes: - api/v1: new StalePodIPRecoveryType enum and IsStalePodIPRecoveryEnabled helper, modeled on the existing FIPSMode pattern. nil is treated as Enabled so the default-on behavior is encoded in one place. - typha_autoscaler.go: new optional func() bool field on the autoscaler consulted at the top of each tick. Wired via the existing option pattern (typhaAutoscalerOptionStalePodIPRecoveryEnabled) so tests can inject true / false / nil. A nil getter is treated as enabled, which keeps existing tests and the non-cluster-host autoscaler path unchanged. - core_controller.go: the closure reads the Installation named "default" from the manager's cached client at call time so toggles take effect on the next tick (~10s). Failures fall through to enabled — recovery is the safer default for the kubelet bug we're working around. Tests: - 3 new gate tests covering nil getter, true, and false. - Defensive Maybe() expectations on SetDegraded in the existing stale pod IP detection and maxUnavailable resolution contexts to fix a pre-existing race-condition flakiness exposed by this work. --- api/v1/installation_types.go | 26 ++++++ api/v1/zz_generated.deepcopy.go | 5 ++ .../installation/core_controller.go | 20 ++++- .../installation/typha_autoscaler.go | 60 ++++++++----- .../installation/typha_autoscaler_test.go | 90 +++++++++++++++++++ .../operator.tigera.io_installations.yaml | 26 ++++++ 6 files changed, 204 insertions(+), 23 deletions(-) diff --git a/api/v1/installation_types.go b/api/v1/installation_types.go index 5c8ef6b222..afde459af5 100644 --- a/api/v1/installation_types.go +++ b/api/v1/installation_types.go @@ -168,6 +168,17 @@ type InstallationSpec struct { // +optional NodeUpdateStrategy appsv1.DaemonSetUpdateStrategy `json:"nodeUpdateStrategy,omitempty"` + // StalePodIPRecovery enables automatic detection and deletion of host-networked Calico pods + // (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches their + // node's current InternalIP. When stale pod IPs are detected (e.g., after a node reboot pulls + // a new DHCP lease), the operator deletes affected pods so the Deployment / DaemonSet + // controllers recreate them with the correct IP. This works around an upstream Kubernetes + // limitation where status.podIPs is immutable for hostNetwork pods. + // Default: Enabled + // +kubebuilder:validation:Enum=Enabled;Disabled + // +optional + StalePodIPRecovery *StalePodIPRecoveryType `json:"stalePodIPRecovery,omitempty"` + // Deprecated. Please use CalicoNodeDaemonSet, TyphaDeployment, and KubeControllersDeployment. // ComponentResources can be used to customize the resource requirements for each component. // Node, Typha, and KubeControllers are supported for installations. @@ -358,6 +369,15 @@ const ( FIPSModeDisabled FIPSMode = "Disabled" ) +// StalePodIPRecoveryType controls whether the operator automatically detects and recreates +// host-networked Calico pods with stale status.podIPs after a node IP change. +type StalePodIPRecoveryType string + +const ( + StalePodIPRecoveryEnabled StalePodIPRecoveryType = "Enabled" + StalePodIPRecoveryDisabled StalePodIPRecoveryType = "Disabled" +) + // Deprecated. Please use TyphaDeployment instead. // TyphaAffinity allows configuration of node affinity characteristics for Typha pods. type TyphaAffinity struct { @@ -1141,6 +1161,12 @@ func IsFIPSModeEnabledString(mode *FIPSMode) string { return fmt.Sprintf("%t", IsFIPSModeEnabled(mode)) } +// IsStalePodIPRecoveryEnabled returns whether stale pod IP recovery is enabled. The behavior +// is default-on, so a nil reference means enabled. +func IsStalePodIPRecoveryEnabled(s *StalePodIPRecoveryType) bool { + return s == nil || *s == StalePodIPRecoveryEnabled +} + type WindowsNodeSpec struct { // CNIBinDir is the path to the CNI binaries directory on Windows, it must match what is used as 'bin_dir' under // [plugins] diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 7a26d77a2f..64f2c55aa6 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -5987,6 +5987,11 @@ func (in *InstallationSpec) DeepCopyInto(out *InstallationSpec) { **out = **in } in.NodeUpdateStrategy.DeepCopyInto(&out.NodeUpdateStrategy) + if in.StalePodIPRecovery != nil { + in, out := &in.StalePodIPRecovery, &out.StalePodIPRecovery + *out = new(StalePodIPRecoveryType) + **out = **in + } if in.ComponentResources != nil { in, out := &in.ComponentResources, &out.ComponentResources *out = make([]ComponentResource, len(*in)) diff --git a/pkg/controller/installation/core_controller.go b/pkg/controller/installation/core_controller.go index a018c16364..f59522988f 100644 --- a/pkg/controller/installation/core_controller.go +++ b/pkg/controller/installation/core_controller.go @@ -322,9 +322,25 @@ func newReconciler(mgr manager.Manager, opts options.ControllerOptions) (*Reconc nodeIndexInformer := cache.NewSharedIndexInformer(nodeListWatch, &corev1.Node{}, 0, cache.Indexers{}) go nodeIndexInformer.Run(opts.ShutdownContext.Done()) - // Create a Typha autoscaler. + // Create a Typha autoscaler. Stale pod IP recovery defers to the current + // Installation.Spec.StalePodIPRecovery setting on each tick (default-on if unset). + // If the Installation can't be read for any reason, fail-open and let the recovery + // run — it's the safer behavior for the kubelet bug we're working around. + mgrClient := mgr.GetClient() typhaListWatch := cache.NewListWatchFromClient(opts.K8sClientset.AppsV1().RESTClient(), "deployments", "calico-system", fields.OneTermEqualSelector("metadata.name", "calico-typha")) - typhaScaler := newTyphaAutoscaler(opts.K8sClientset, nodeIndexInformer, typhaListWatch, statusManager) + typhaScaler := newTyphaAutoscaler( + opts.K8sClientset, + nodeIndexInformer, + typhaListWatch, + statusManager, + typhaAutoscalerOptionStalePodIPRecoveryEnabled(func() bool { + inst := &operatorv1.Installation{} + if err := mgrClient.Get(opts.ShutdownContext, types.NamespacedName{Name: "default"}, inst); err != nil { + return true + } + return operatorv1.IsStalePodIPRecoveryEnabled(inst.Spec.StalePodIPRecovery) + }), + ) r := &ReconcileInstallation{ config: mgr.GetConfig(), diff --git a/pkg/controller/installation/typha_autoscaler.go b/pkg/controller/installation/typha_autoscaler.go index a4a907eb37..fc7ffa9c0c 100644 --- a/pkg/controller/installation/typha_autoscaler.go +++ b/pkg/controller/installation/typha_autoscaler.go @@ -58,6 +58,10 @@ type typhaAutoscaler struct { typhaIndexer cache.Store nonClusterHost bool + // stalePodIPRecoveryEnabled returns whether the operator should detect and delete + // host-networked pods with stale status.podIPs each tick. Default-on if nil. + stalePodIPRecoveryEnabled func() bool + // Number of currently running replicas. activeReplicas int32 } @@ -78,6 +82,15 @@ func typhaAutoscalerOptionNonclusterHost(nonClusterHost bool) typhaAutoscalerOpt } } +// typhaAutoscalerOptionStalePodIPRecoveryEnabled provides a getter that the autoscaler will +// call each tick to determine whether stale-IP pod recovery is enabled. If unset, recovery +// is always enabled. +func typhaAutoscalerOptionStalePodIPRecoveryEnabled(enabled func() bool) typhaAutoscalerOption { + return func(t *typhaAutoscaler) { + t.stalePodIPRecoveryEnabled = enabled + } +} + // newTyphaAutoscaler creates a new Typha autoscaler, optionally applying any options to the default autoscaler instance. // The default sync period is 10 seconds. func newTyphaAutoscaler(cs kubernetes.Interface, indexInformer cache.SharedIndexInformer, typhaListWatch cache.ListerWatcher, statusManager status.StatusManager, options ...typhaAutoscalerOption) *typhaAutoscaler { @@ -164,27 +177,32 @@ func (t *typhaAutoscaler) start(ctx context.Context) { // cycle, calico-node deletions are skipped to give the new Typha a // clean window to come up before churning calico-node pods that // depend on it. - typhaBatch := t.resolveTyphaMaxUnavailable() - deletedTypha := t.deleteStaleHostNetworkPods( - "calico-typha", - fmt.Sprintf("%s=%s", render.AppLabelName, render.TyphaK8sAppName), - typhaBatch, - ) > 0 - if !deletedTypha { - // Linux and Windows DaemonSets are paced independently of each - // other. - linuxBatch := t.resolveDaemonSetMaxUnavailable(render.CalicoNodeObjectName) - t.deleteStaleHostNetworkPods( - "calico-node", - fmt.Sprintf("%s=%s", render.AppLabelName, render.CalicoNodeObjectName), - linuxBatch, - ) - windowsBatch := t.resolveDaemonSetMaxUnavailable(render.WindowsNodeObjectName) - t.deleteStaleHostNetworkPods( - "calico-node-windows", - fmt.Sprintf("%s=%s", render.AppLabelName, render.WindowsNodeObjectName), - windowsBatch, - ) + // + // Skip the entire check if stale-IP recovery has been disabled + // via Installation.Spec.StalePodIPRecovery. + if t.stalePodIPRecoveryEnabled == nil || t.stalePodIPRecoveryEnabled() { + typhaBatch := t.resolveTyphaMaxUnavailable() + deletedTypha := t.deleteStaleHostNetworkPods( + "calico-typha", + fmt.Sprintf("%s=%s", render.AppLabelName, render.TyphaK8sAppName), + typhaBatch, + ) > 0 + if !deletedTypha { + // Linux and Windows DaemonSets are paced independently of each + // other. + linuxBatch := t.resolveDaemonSetMaxUnavailable(render.CalicoNodeObjectName) + t.deleteStaleHostNetworkPods( + "calico-node", + fmt.Sprintf("%s=%s", render.AppLabelName, render.CalicoNodeObjectName), + linuxBatch, + ) + windowsBatch := t.resolveDaemonSetMaxUnavailable(render.WindowsNodeObjectName) + t.deleteStaleHostNetworkPods( + "calico-node-windows", + fmt.Sprintf("%s=%s", render.AppLabelName, render.WindowsNodeObjectName), + windowsBatch, + ) + } } case errCh := <-t.triggerRunChan: if err := t.autoscaleReplicas(); err != nil { diff --git a/pkg/controller/installation/typha_autoscaler_test.go b/pkg/controller/installation/typha_autoscaler_test.go index e33f93d283..f16ff0ef49 100644 --- a/pkg/controller/installation/typha_autoscaler_test.go +++ b/pkg/controller/installation/typha_autoscaler_test.go @@ -286,6 +286,10 @@ var _ = Describe("Test typha autoscaler ", func() { ) BeforeEach(func() { + // The autoscaler may degrade if the first tick fires before nodes are + // added by the test body (race against ta.start). We don't care about + // scaling behavior in these tests; allow any SetDegraded call. + statusManager.On("SetDegraded", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() ta = newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) ta.start(ctx) }) @@ -449,6 +453,8 @@ var _ = Describe("Test typha autoscaler ", func() { var ta *typhaAutoscaler BeforeEach(func() { + // Allow any degradation that may happen due to race with autoscaler ticks. + statusManager.On("SetDegraded", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() ta = newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) ta.start(ctx) }) @@ -563,6 +569,90 @@ var _ = Describe("Test typha autoscaler ", func() { Expect(ta.resolveDaemonSetMaxUnavailable("calico-node")).To(Equal(10)) }) }) + + Context("stalePodIPRecoveryEnabled gate", func() { + const ( + typhaSelector = "k8s-app=calico-typha" + calicoNodeSelector = "k8s-app=calico-node" + ) + + BeforeEach(func() { + // The autoscaler may degrade if there aren't enough linux nodes to satisfy + // the expected typha scale. We don't care about scaling behavior here, so + // allow any SetDegraded call. + statusManager.On("SetDegraded", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() + }) + + // stale-node has InternalIP 10.0.0.2; pods are placed on it with podIP 10.0.0.1 + // to make them stale. + ensureStaleNode := func() { + if _, err := c.CoreV1().Nodes().Get(ctx, "stale-node", metav1.GetOptions{}); err == nil { + return + } + node := CreateNode(c, "stale-node", map[string]string{"kubernetes.io/os": "linux"}, nil) + node.Status.Addresses = []corev1.NodeAddress{{Type: corev1.NodeInternalIP, Address: "10.0.0.2"}} + _, err := c.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + } + + createStalePod := func(name, k8sApp string) { + ensureStaleNode() + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, Namespace: "calico-system", + Labels: map[string]string{"k8s-app": k8sApp}, + }, + Spec: corev1.PodSpec{NodeName: "stale-node"}, + Status: corev1.PodStatus{PodIPs: []corev1.PodIP{{IP: "10.0.0.1"}}}, + } + pod, err := c.CoreV1().Pods("calico-system").Create(ctx, pod, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + pod.Status.PodIPs = []corev1.PodIP{{IP: "10.0.0.1"}} + _, err = c.CoreV1().Pods("calico-system").UpdateStatus(ctx, pod, metav1.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + } + + listPods := func(selector string) []corev1.Pod { + pods, err := c.CoreV1().Pods("calico-system").List(ctx, metav1.ListOptions{LabelSelector: selector}) + Expect(err).NotTo(HaveOccurred()) + return pods.Items + } + + It("nil getter (default) is treated as enabled", func() { + ta := newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) + ta.start(ctx) + + createStalePod("typha-abc", "calico-typha") + Eventually(func() int { return len(listPods(typhaSelector)) }, 5*time.Second).Should(Equal(0)) + }) + + It("getter returning true allows deletions to proceed", func() { + ta := newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, + typhaAutoscalerOptionPeriod(10*time.Millisecond), + typhaAutoscalerOptionStalePodIPRecoveryEnabled(func() bool { return true }), + ) + ta.start(ctx) + + createStalePod("typha-abc", "calico-typha") + Eventually(func() int { return len(listPods(typhaSelector)) }, 5*time.Second).Should(Equal(0)) + }) + + It("getter returning false suppresses all deletions", func() { + ta := newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, + typhaAutoscalerOptionPeriod(10*time.Millisecond), + typhaAutoscalerOptionStalePodIPRecoveryEnabled(func() bool { return false }), + ) + ta.start(ctx) + + createStalePod("typha-abc", "calico-typha") + createStalePod("calico-node-abc", "calico-node") + + // Wait long enough for several ticks; nothing should be deleted. + Consistently(func() int { + return len(listPods(typhaSelector)) + len(listPods(calicoNodeSelector)) + }, 200*time.Millisecond, 20*time.Millisecond).Should(Equal(2)) + }) + }) }) func verifyTyphaReplicas(c kubernetes.Interface, expectedReplicas int) { diff --git a/pkg/imports/crds/operator/operator.tigera.io_installations.yaml b/pkg/imports/crds/operator/operator.tigera.io_installations.yaml index 1e04c6eb26..c3db058162 100644 --- a/pkg/imports/crds/operator/operator.tigera.io_installations.yaml +++ b/pkg/imports/crds/operator/operator.tigera.io_installations.yaml @@ -7384,6 +7384,19 @@ spec: items: type: string type: array + stalePodIPRecovery: + description: |- + StalePodIPRecovery enables automatic detection and deletion of host-networked Calico pods + (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches their + node's current InternalIP. When stale pod IPs are detected (e.g., after a node reboot pulls + a new DHCP lease), the operator deletes affected pods so the Deployment / DaemonSet + controllers recreate them with the correct IP. This works around an upstream Kubernetes + limitation where status.podIPs is immutable for hostNetwork pods. + Default: Enabled + enum: + - Enabled + - Disabled + type: string tlsCipherSuites: description: TLSCipherSuites defines the cipher suite list that the @@ -16737,6 +16750,19 @@ spec: items: type: string type: array + stalePodIPRecovery: + description: |- + StalePodIPRecovery enables automatic detection and deletion of host-networked Calico pods + (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches their + node's current InternalIP. When stale pod IPs are detected (e.g., after a node reboot pulls + a new DHCP lease), the operator deletes affected pods so the Deployment / DaemonSet + controllers recreate them with the correct IP. This works around an upstream Kubernetes + limitation where status.podIPs is immutable for hostNetwork pods. + Default: Enabled + enum: + - Enabled + - Disabled + type: string tlsCipherSuites: description: TLSCipherSuites defines the cipher suite list that From 016e5b084011d12c3a4e6441eeba9628d00e7d56 Mon Sep 17 00:00:00 2001 From: Pedro Coutinho Date: Thu, 21 May 2026 14:34:53 -0700 Subject: [PATCH 3/4] stale pod IP recovery via a dedicated event-driven controller When a node's IP changes (e.g. KubeVirt VM reboot pulls a new DHCP lease), Kubernetes does not update status.podIPs on existing hostNetwork pods, it treats the field as immutable once set (kubernetes/kubernetes#93897). The Typha EndpointSlice ends up advertising the stale IP, Felix can't reach Typha, and calico-node pods stay NotReady until pods are manually deleted. Add a small controller (pkg/controller/podiprecovery) that watches Nodes with a predicate enqueueing reconciles only when the set of NodeInternalIP addresses actually changes, so routine heartbeats don't trigger any work. On reconcile, it lists operator-managed host-networked pods on the affected node and deletes any whose status.podIPs no longer matches the node's current InternalIP. The owning Deployment / DaemonSet recreates each pod with a fresh sandbox, which the kubelet populates with the correct IP. Covers calico-typha, calico-node (Linux + Windows), tigera-dpi, l7-log-collector, calico-apiserver, and calico-webhooks. A per-pod spec.hostNetwork check makes the conditional ones (apiserver, webhooks) work naturally without mirroring HostNetworkRequired() logic. No pacing, as stale-IP pods are non-functional by definition, so deleting them all at once doesn't worsen availability. --- api/v1/installation_types.go | 26 -- api/v1/zz_generated.deepcopy.go | 5 - internal/controller/controllers.go | 7 + .../controller/podiprecovery_controller.go | 39 ++ .../installation/core_controller.go | 20 +- .../installation/typha_autoscaler.go | 197 --------- .../installation/typha_autoscaler_test.go | 380 ------------------ .../podiprecovery/podiprecovery_controller.go | 272 +++++++++++++ .../podiprecovery_controller_test.go | 233 +++++++++++ .../podiprecovery/podiprecovery_suite_test.go | 34 ++ .../operator.tigera.io_installations.yaml | 26 -- 11 files changed, 587 insertions(+), 652 deletions(-) create mode 100644 internal/controller/podiprecovery_controller.go create mode 100644 pkg/controller/podiprecovery/podiprecovery_controller.go create mode 100644 pkg/controller/podiprecovery/podiprecovery_controller_test.go create mode 100644 pkg/controller/podiprecovery/podiprecovery_suite_test.go diff --git a/api/v1/installation_types.go b/api/v1/installation_types.go index afde459af5..5c8ef6b222 100644 --- a/api/v1/installation_types.go +++ b/api/v1/installation_types.go @@ -168,17 +168,6 @@ type InstallationSpec struct { // +optional NodeUpdateStrategy appsv1.DaemonSetUpdateStrategy `json:"nodeUpdateStrategy,omitempty"` - // StalePodIPRecovery enables automatic detection and deletion of host-networked Calico pods - // (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches their - // node's current InternalIP. When stale pod IPs are detected (e.g., after a node reboot pulls - // a new DHCP lease), the operator deletes affected pods so the Deployment / DaemonSet - // controllers recreate them with the correct IP. This works around an upstream Kubernetes - // limitation where status.podIPs is immutable for hostNetwork pods. - // Default: Enabled - // +kubebuilder:validation:Enum=Enabled;Disabled - // +optional - StalePodIPRecovery *StalePodIPRecoveryType `json:"stalePodIPRecovery,omitempty"` - // Deprecated. Please use CalicoNodeDaemonSet, TyphaDeployment, and KubeControllersDeployment. // ComponentResources can be used to customize the resource requirements for each component. // Node, Typha, and KubeControllers are supported for installations. @@ -369,15 +358,6 @@ const ( FIPSModeDisabled FIPSMode = "Disabled" ) -// StalePodIPRecoveryType controls whether the operator automatically detects and recreates -// host-networked Calico pods with stale status.podIPs after a node IP change. -type StalePodIPRecoveryType string - -const ( - StalePodIPRecoveryEnabled StalePodIPRecoveryType = "Enabled" - StalePodIPRecoveryDisabled StalePodIPRecoveryType = "Disabled" -) - // Deprecated. Please use TyphaDeployment instead. // TyphaAffinity allows configuration of node affinity characteristics for Typha pods. type TyphaAffinity struct { @@ -1161,12 +1141,6 @@ func IsFIPSModeEnabledString(mode *FIPSMode) string { return fmt.Sprintf("%t", IsFIPSModeEnabled(mode)) } -// IsStalePodIPRecoveryEnabled returns whether stale pod IP recovery is enabled. The behavior -// is default-on, so a nil reference means enabled. -func IsStalePodIPRecoveryEnabled(s *StalePodIPRecoveryType) bool { - return s == nil || *s == StalePodIPRecoveryEnabled -} - type WindowsNodeSpec struct { // CNIBinDir is the path to the CNI binaries directory on Windows, it must match what is used as 'bin_dir' under // [plugins] diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 64f2c55aa6..7a26d77a2f 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -5987,11 +5987,6 @@ func (in *InstallationSpec) DeepCopyInto(out *InstallationSpec) { **out = **in } in.NodeUpdateStrategy.DeepCopyInto(&out.NodeUpdateStrategy) - if in.StalePodIPRecovery != nil { - in, out := &in.StalePodIPRecovery, &out.StalePodIPRecovery - *out = new(StalePodIPRecoveryType) - **out = **in - } if in.ComponentResources != nil { in, out := &in.ComponentResources, &out.ComponentResources *out = make([]ComponentResource, len(*in)) diff --git a/internal/controller/controllers.go b/internal/controller/controllers.go index f80dc87d98..5f9f7d487e 100644 --- a/internal/controller/controllers.go +++ b/internal/controller/controllers.go @@ -195,6 +195,13 @@ func AddToManager(mgr ctrl.Manager, options options.ControllerOptions) error { }).SetupWithManager(mgr, options); err != nil { return fmt.Errorf("failed to create controller %s: %v", "KubeProxy", err) } + if err := (&PodIPRecoveryReconciler{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("PodIPRecovery"), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr, options); err != nil { + return fmt.Errorf("failed to create controller %s: %v", "PodIPRecovery", err) + } // +kubebuilder:scaffold:builder return nil } diff --git a/internal/controller/podiprecovery_controller.go b/internal/controller/podiprecovery_controller.go new file mode 100644 index 0000000000..5005de52a0 --- /dev/null +++ b/internal/controller/podiprecovery_controller.go @@ -0,0 +1,39 @@ +// Copyright (c) 2026 Tigera, Inc. All rights reserved. +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/tigera/operator/pkg/controller/options" + "github.com/tigera/operator/pkg/controller/podiprecovery" +) + +// PodIPRecoveryReconciler watches Nodes for IP changes and deletes +// operator-managed host-networked pods with stale IPs. +type PodIPRecoveryReconciler struct { + client.Client + Log logr.Logger + Scheme *runtime.Scheme +} + +func (r *PodIPRecoveryReconciler) SetupWithManager(mgr ctrl.Manager, opts options.ControllerOptions) error { + return podiprecovery.Add(mgr, opts) +} diff --git a/pkg/controller/installation/core_controller.go b/pkg/controller/installation/core_controller.go index f59522988f..a018c16364 100644 --- a/pkg/controller/installation/core_controller.go +++ b/pkg/controller/installation/core_controller.go @@ -322,25 +322,9 @@ func newReconciler(mgr manager.Manager, opts options.ControllerOptions) (*Reconc nodeIndexInformer := cache.NewSharedIndexInformer(nodeListWatch, &corev1.Node{}, 0, cache.Indexers{}) go nodeIndexInformer.Run(opts.ShutdownContext.Done()) - // Create a Typha autoscaler. Stale pod IP recovery defers to the current - // Installation.Spec.StalePodIPRecovery setting on each tick (default-on if unset). - // If the Installation can't be read for any reason, fail-open and let the recovery - // run — it's the safer behavior for the kubelet bug we're working around. - mgrClient := mgr.GetClient() + // Create a Typha autoscaler. typhaListWatch := cache.NewListWatchFromClient(opts.K8sClientset.AppsV1().RESTClient(), "deployments", "calico-system", fields.OneTermEqualSelector("metadata.name", "calico-typha")) - typhaScaler := newTyphaAutoscaler( - opts.K8sClientset, - nodeIndexInformer, - typhaListWatch, - statusManager, - typhaAutoscalerOptionStalePodIPRecoveryEnabled(func() bool { - inst := &operatorv1.Installation{} - if err := mgrClient.Get(opts.ShutdownContext, types.NamespacedName{Name: "default"}, inst); err != nil { - return true - } - return operatorv1.IsStalePodIPRecoveryEnabled(inst.Spec.StalePodIPRecovery) - }), - ) + typhaScaler := newTyphaAutoscaler(opts.K8sClientset, nodeIndexInformer, typhaListWatch, statusManager) r := &ReconcileInstallation{ config: mgr.GetConfig(), diff --git a/pkg/controller/installation/typha_autoscaler.go b/pkg/controller/installation/typha_autoscaler.go index fc7ffa9c0c..8d675591c4 100644 --- a/pkg/controller/installation/typha_autoscaler.go +++ b/pkg/controller/installation/typha_autoscaler.go @@ -23,7 +23,6 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -58,10 +57,6 @@ type typhaAutoscaler struct { typhaIndexer cache.Store nonClusterHost bool - // stalePodIPRecoveryEnabled returns whether the operator should detect and delete - // host-networked pods with stale status.podIPs each tick. Default-on if nil. - stalePodIPRecoveryEnabled func() bool - // Number of currently running replicas. activeReplicas int32 } @@ -82,15 +77,6 @@ func typhaAutoscalerOptionNonclusterHost(nonClusterHost bool) typhaAutoscalerOpt } } -// typhaAutoscalerOptionStalePodIPRecoveryEnabled provides a getter that the autoscaler will -// call each tick to determine whether stale-IP pod recovery is enabled. If unset, recovery -// is always enabled. -func typhaAutoscalerOptionStalePodIPRecoveryEnabled(enabled func() bool) typhaAutoscalerOption { - return func(t *typhaAutoscaler) { - t.stalePodIPRecoveryEnabled = enabled - } -} - // newTyphaAutoscaler creates a new Typha autoscaler, optionally applying any options to the default autoscaler instance. // The default sync period is 10 seconds. func newTyphaAutoscaler(cs kubernetes.Interface, indexInformer cache.SharedIndexInformer, typhaListWatch cache.ListerWatcher, statusManager status.StatusManager, options ...typhaAutoscalerOption) *typhaAutoscaler { @@ -170,40 +156,6 @@ func (t *typhaAutoscaler) start(ctx context.Context) { } else { degraded = false } - - // Check for host-networked pods with stale IPs (e.g., after a node - // IP change) and delete them so they get recreated with the correct - // IP. Typha is checked first; if any Typha pod was deleted this - // cycle, calico-node deletions are skipped to give the new Typha a - // clean window to come up before churning calico-node pods that - // depend on it. - // - // Skip the entire check if stale-IP recovery has been disabled - // via Installation.Spec.StalePodIPRecovery. - if t.stalePodIPRecoveryEnabled == nil || t.stalePodIPRecoveryEnabled() { - typhaBatch := t.resolveTyphaMaxUnavailable() - deletedTypha := t.deleteStaleHostNetworkPods( - "calico-typha", - fmt.Sprintf("%s=%s", render.AppLabelName, render.TyphaK8sAppName), - typhaBatch, - ) > 0 - if !deletedTypha { - // Linux and Windows DaemonSets are paced independently of each - // other. - linuxBatch := t.resolveDaemonSetMaxUnavailable(render.CalicoNodeObjectName) - t.deleteStaleHostNetworkPods( - "calico-node", - fmt.Sprintf("%s=%s", render.AppLabelName, render.CalicoNodeObjectName), - linuxBatch, - ) - windowsBatch := t.resolveDaemonSetMaxUnavailable(render.WindowsNodeObjectName) - t.deleteStaleHostNetworkPods( - "calico-node-windows", - fmt.Sprintf("%s=%s", render.AppLabelName, render.WindowsNodeObjectName), - windowsBatch, - ) - } - } case errCh := <-t.triggerRunChan: if err := t.autoscaleReplicas(); err != nil { degraded = true @@ -328,155 +280,6 @@ func (t *typhaAutoscaler) getNodeCounts() (int, int) { return schedulable, linuxNodes } -// deleteStaleHostNetworkPods lists pods matching labelSelector in the -// calico-system namespace and compares each pod's status.podIPs against the -// current InternalIP of the node the pod is running on. If the IPs don't match -// (stale pod IP after a node IP change), up to maxBatch pods are deleted so the -// owning controller (Deployment / DaemonSet) recreates them with the correct IP. -// -// This is necessary because Kubernetes does not update status.podIPs for -// existing hostNetwork pods when the node's IP changes — it is explicitly -// immutable in the kubelet: -// https://github.com/kubernetes/kubernetes/issues/93897. -// -// Returns the number of pods deleted in this call. -// -// workloadName is used only for logging. -func (t *typhaAutoscaler) deleteStaleHostNetworkPods(workloadName, labelSelector string, maxBatch int) int { - if t.nonClusterHost { - return 0 - } - if maxBatch < 1 { - maxBatch = 1 - } - - pods, err := t.client.CoreV1().Pods(common.CalicoNamespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: labelSelector, - }) - if err != nil { - typhaLog.V(5).Info("Failed to list pods for stale IP check", "workload", workloadName, "error", err) - return 0 - } - - // Build a map of node name → InternalIP from the informer cache. - nodeInternalIPs := map[string]string{} - for _, obj := range t.indexInformer.GetIndexer().List() { - n := obj.(*v1.Node) - for _, addr := range n.Status.Addresses { - if addr.Type == v1.NodeInternalIP { - nodeInternalIPs[n.Name] = addr.Address - break - } - } - } - - deleted := 0 - for i := range pods.Items { - if deleted >= maxBatch { - break - } - pod := &pods.Items[i] - if pod.Spec.NodeName == "" { - continue - } - nodeIP, ok := nodeInternalIPs[pod.Spec.NodeName] - if !ok { - continue - } - - // Check if any of the pod's IPs match the node's current InternalIP. - match := false - for _, podIP := range pod.Status.PodIPs { - if podIP.IP == nodeIP { - match = true - break - } - } - if match { - continue - } - - // Pod IP is stale — delete the pod so the owning controller recreates - // it with the correct IP. - podIPs := make([]string, len(pod.Status.PodIPs)) - for j, pip := range pod.Status.PodIPs { - podIPs[j] = pip.IP - } - typhaLog.Info("Pod has stale IP after node IP change; deleting pod so it gets recreated with the correct IP", - "workload", workloadName, "pod", pod.Name, "node", pod.Spec.NodeName, - "podIPs", podIPs, "nodeInternalIP", nodeIP) - if err := t.client.CoreV1().Pods(common.CalicoNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil { - typhaLog.Error(err, "Failed to delete pod with stale IP", "workload", workloadName, "pod", pod.Name) - continue - } - deleted++ - } - return deleted -} - -// resolveTyphaMaxUnavailable reads the maxUnavailable value from the Typha -// PodDisruptionBudget and resolves it to an absolute pod count using the -// current Typha replica count. Returns 1 if the PDB doesn't exist, doesn't -// have maxUnavailable set, or if the resolved value is < 1 (so progress -// is always guaranteed). -func (t *typhaAutoscaler) resolveTyphaMaxUnavailable() int { - const fallback = 1 - pdb, err := t.client.PolicyV1().PodDisruptionBudgets(common.CalicoNamespace).Get( - context.Background(), common.TyphaDeploymentName, metav1.GetOptions{}, - ) - if err != nil || pdb.Spec.MaxUnavailable == nil { - return fallback - } - replicas := int(t.activeReplicas) - if replicas <= 0 { - // activeReplicas is populated by the informer; fall back to fetching - // the deployment if it hasn't been observed yet. - typha, err := t.client.AppsV1().Deployments(common.CalicoNamespace).Get( - context.Background(), common.TyphaDeploymentName, metav1.GetOptions{}, - ) - if err == nil && typha.Spec.Replicas != nil { - replicas = int(*typha.Spec.Replicas) - } - } - if replicas < 1 { - return fallback - } - val, err := intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MaxUnavailable, replicas, true) - if err != nil || val < 1 { - return fallback - } - return val -} - -// resolveDaemonSetMaxUnavailable reads the maxUnavailable value from the named -// DaemonSet's update strategy and resolves it to an absolute pod count using -// the desired DaemonSet pod count. Returns 1 if the DaemonSet doesn't exist, -// doesn't have a RollingUpdate strategy, or if the resolved value is < 1. -func (t *typhaAutoscaler) resolveDaemonSetMaxUnavailable(name string) int { - const fallback = 1 - ds, err := t.client.AppsV1().DaemonSets(common.CalicoNamespace).Get( - context.Background(), name, metav1.GetOptions{}, - ) - if err != nil { - return fallback - } - if ds.Spec.UpdateStrategy.RollingUpdate == nil || - ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable == nil { - return fallback - } - desired := int(ds.Status.DesiredNumberScheduled) - if desired < 1 { - return fallback - } - val, err := intstr.GetScaledValueFromIntOrPercent( - ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, desired, true, - ) - if err != nil || val < 1 { - return fallback - } - return val -} - // getHostEndpointCounts returns the number of host endpoints in the cluster that are not created by the kube-controllers. func (t *typhaAutoscaler) getHostEndpointCounts() int { heps := 0 diff --git a/pkg/controller/installation/typha_autoscaler_test.go b/pkg/controller/installation/typha_autoscaler_test.go index f16ff0ef49..3ddedcef51 100644 --- a/pkg/controller/installation/typha_autoscaler_test.go +++ b/pkg/controller/installation/typha_autoscaler_test.go @@ -29,10 +29,8 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" kfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" @@ -275,384 +273,6 @@ var _ = Describe("Test typha autoscaler ", func() { statusManager.AssertExpectations(GinkgoT()) }) - - Context("stale pod IP detection", func() { - var ta *typhaAutoscaler - - const ( - typhaSelector = "k8s-app=calico-typha" - calicoNodeSelector = "k8s-app=calico-node" - windowsNodeSelector = "k8s-app=calico-node-windows" - ) - - BeforeEach(func() { - // The autoscaler may degrade if the first tick fires before nodes are - // added by the test body (race against ta.start). We don't care about - // scaling behavior in these tests; allow any SetDegraded call. - statusManager.On("SetDegraded", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() - ta = newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) - ta.start(ctx) - }) - - createNodeWithIP := func(name, ip string) *corev1.Node { - node := CreateNode(c, name, map[string]string{"kubernetes.io/os": "linux"}, nil) - node.Status.Addresses = []corev1.NodeAddress{ - {Type: corev1.NodeInternalIP, Address: ip}, - } - var err error - node, err = c.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{}) - Expect(err).To(BeNil()) - return node - } - - createPodWithLabel := func(name, nodeName, podIP, k8sApp string) *corev1.Pod { - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "calico-system", - Labels: map[string]string{"k8s-app": k8sApp}, - }, - Spec: corev1.PodSpec{ - NodeName: nodeName, - }, - Status: corev1.PodStatus{ - PodIPs: []corev1.PodIP{{IP: podIP}}, - }, - } - var err error - pod, err = c.CoreV1().Pods("calico-system").Create(ctx, pod, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - // fake client doesn't persist status on Create; update it separately. - pod.Status.PodIPs = []corev1.PodIP{{IP: podIP}} - pod, err = c.CoreV1().Pods("calico-system").UpdateStatus(ctx, pod, metav1.UpdateOptions{}) - Expect(err).To(BeNil()) - return pod - } - - // waitForNodes blocks until the node informer has observed n schedulable nodes. - waitForNodes := func(n int) { - EventuallyWithOffset(1, func() int { - all, _ := ta.getNodeCounts() - return all - }, 5*time.Second).Should(Equal(n)) - } - - listPods := func(labelSelector string) []corev1.Pod { - pods, err := c.CoreV1().Pods("calico-system").List(ctx, metav1.ListOptions{ - LabelSelector: labelSelector, - }) - Expect(err).To(BeNil()) - return pods.Items - } - - It("returns 0 and deletes nothing when all pod IPs match the node InternalIP", func() { - createNodeWithIP("node1", "10.0.0.1") - createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") - waitForNodes(1) - - deleted := ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1) - Expect(deleted).To(Equal(0)) - Expect(listPods(typhaSelector)).To(HaveLen(1)) - }) - - It("returns 1 and deletes a Typha pod whose IP doesn't match the node InternalIP", func() { - createNodeWithIP("node1", "10.0.0.2") - createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") - waitForNodes(1) - - deleted := ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1) - Expect(deleted).To(Equal(1)) - Expect(listPods(typhaSelector)).To(HaveLen(0)) - }) - - It("deletes a stale calico-node (Linux) pod", func() { - createNodeWithIP("node1", "10.0.0.2") - createPodWithLabel("calico-node-abc", "node1", "10.0.0.1", "calico-node") - waitForNodes(1) - - deleted := ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 1) - Expect(deleted).To(Equal(1)) - Expect(listPods(calicoNodeSelector)).To(HaveLen(0)) - }) - - It("deletes a stale calico-node-windows pod", func() { - createNodeWithIP("node1", "10.0.0.2") - createPodWithLabel("calico-node-windows-abc", "node1", "10.0.0.1", "calico-node-windows") - waitForNodes(1) - - deleted := ta.deleteStaleHostNetworkPods("calico-node-windows", windowsNodeSelector, 1) - Expect(deleted).To(Equal(1)) - Expect(listPods(windowsNodeSelector)).To(HaveLen(0)) - }) - - It("respects maxBatch=1 (default): exactly one stale pod deleted per call", func() { - createNodeWithIP("node1", "10.0.0.2") - createNodeWithIP("node2", "10.0.0.4") - createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") - createPodWithLabel("typha-def", "node2", "10.0.0.3", "calico-typha") - waitForNodes(2) - - Expect(ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1)).To(Equal(1)) - Expect(listPods(typhaSelector)).To(HaveLen(1)) - - Expect(ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1)).To(Equal(1)) - Expect(listPods(typhaSelector)).To(HaveLen(0)) - }) - - It("respects maxBatch=N (>1): up to N stale pods deleted per call", func() { - for i := 0; i < 5; i++ { - nodeName := fmt.Sprintf("node%d", i) - createNodeWithIP(nodeName, fmt.Sprintf("10.0.0.%d", 100+i)) - createPodWithLabel(fmt.Sprintf("calico-node-%d", i), nodeName, fmt.Sprintf("10.0.0.%d", i+1), "calico-node") - } - waitForNodes(5) - - // maxBatch=3 → delete 3 of the 5 stale pods. - Expect(ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 3)).To(Equal(3)) - Expect(listPods(calicoNodeSelector)).To(HaveLen(2)) - - // Next call cleans up the remaining 2. - Expect(ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 3)).To(Equal(2)) - Expect(listPods(calicoNodeSelector)).To(HaveLen(0)) - }) - - It("treats maxBatch < 1 as 1 (minimum-progress fallback)", func() { - createNodeWithIP("node1", "10.0.0.2") - createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") - waitForNodes(1) - - // maxBatch=0 should still delete one pod (minimum-progress fallback). - Expect(ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 0)).To(Equal(1)) - Expect(listPods(typhaSelector)).To(HaveLen(0)) - }) - - It("does not delete a pod whose node is not in the informer cache", func() { - createPodWithLabel("typha-abc", "unknown-node", "10.0.0.1", "calico-typha") - - deleted := ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1) - Expect(deleted).To(Equal(0)) - Expect(listPods(typhaSelector)).To(HaveLen(1)) - }) - - It("paces Linux and Windows DaemonSets independently of each other", func() { - createNodeWithIP("node-linux", "10.0.0.2") - createNodeWithIP("node-win", "10.0.1.2") - createPodWithLabel("calico-node-abc", "node-linux", "10.0.0.1", "calico-node") - createPodWithLabel("calico-node-windows-abc", "node-win", "10.0.1.1", "calico-node-windows") - waitForNodes(2) - - // Both DaemonSets have a stale pod; each call deletes its own. - Expect(ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 1)).To(Equal(1)) - Expect(ta.deleteStaleHostNetworkPods("calico-node-windows", windowsNodeSelector, 1)).To(Equal(1)) - Expect(listPods(calicoNodeSelector)).To(HaveLen(0)) - Expect(listPods(windowsNodeSelector)).To(HaveLen(0)) - }) - }) - - Context("maxUnavailable resolution", func() { - var ta *typhaAutoscaler - - BeforeEach(func() { - // Allow any degradation that may happen due to race with autoscaler ticks. - statusManager.On("SetDegraded", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() - ta = newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) - ta.start(ctx) - }) - - It("returns 1 for Typha when the PDB does not exist", func() { - Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(1)) - }) - - It("resolves an int Typha PDB maxUnavailable", func() { - var replicas int32 = 5 - _, err := c.AppsV1().Deployments("calico-system").Create(ctx, &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, - Spec: appsv1.DeploymentSpec{Replicas: &replicas}, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - mu := intstr.FromInt(2) - _, err = c.PolicyV1().PodDisruptionBudgets("calico-system").Create(ctx, &policyv1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, - Spec: policyv1.PodDisruptionBudgetSpec{MaxUnavailable: &mu}, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(2)) - }) - - It("resolves a percentage Typha PDB maxUnavailable against replica count", func() { - var replicas int32 = 10 - _, err := c.AppsV1().Deployments("calico-system").Create(ctx, &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, - Spec: appsv1.DeploymentSpec{Replicas: &replicas}, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - mu := intstr.FromString("25%") - _, err = c.PolicyV1().PodDisruptionBudgets("calico-system").Create(ctx, &policyv1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, - Spec: policyv1.PodDisruptionBudgetSpec{MaxUnavailable: &mu}, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - // 25% of 10 = 3 (rounded up from 2.5). - Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(3)) - }) - - It("returns 1 for Typha when maxUnavailable resolves to 0", func() { - var replicas int32 = 5 - _, err := c.AppsV1().Deployments("calico-system").Create(ctx, &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, - Spec: appsv1.DeploymentSpec{Replicas: &replicas}, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - mu := intstr.FromInt(0) - _, err = c.PolicyV1().PodDisruptionBudgets("calico-system").Create(ctx, &policyv1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, - Spec: policyv1.PodDisruptionBudgetSpec{MaxUnavailable: &mu}, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(1)) - }) - - It("returns 1 for a DaemonSet without RollingUpdate", func() { - _, err := c.AppsV1().DaemonSets("calico-system").Create(ctx, &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{Name: "calico-node", Namespace: "calico-system"}, - Spec: appsv1.DaemonSetSpec{ - UpdateStrategy: appsv1.DaemonSetUpdateStrategy{Type: appsv1.OnDeleteDaemonSetStrategyType}, - }, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - Expect(ta.resolveDaemonSetMaxUnavailable("calico-node")).To(Equal(1)) - }) - - It("resolves an int DaemonSet maxUnavailable", func() { - mu := intstr.FromInt(4) - _, err := c.AppsV1().DaemonSets("calico-system").Create(ctx, &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{Name: "calico-node", Namespace: "calico-system"}, - Spec: appsv1.DaemonSetSpec{ - UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ - Type: appsv1.RollingUpdateDaemonSetStrategyType, - RollingUpdate: &appsv1.RollingUpdateDaemonSet{ - MaxUnavailable: &mu, - }, - }, - }, - Status: appsv1.DaemonSetStatus{DesiredNumberScheduled: 100}, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - Expect(ta.resolveDaemonSetMaxUnavailable("calico-node")).To(Equal(4)) - }) - - It("resolves a percentage DaemonSet maxUnavailable against desired pod count", func() { - mu := intstr.FromString("10%") - _, err := c.AppsV1().DaemonSets("calico-system").Create(ctx, &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{Name: "calico-node", Namespace: "calico-system"}, - Spec: appsv1.DaemonSetSpec{ - UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ - Type: appsv1.RollingUpdateDaemonSetStrategyType, - RollingUpdate: &appsv1.RollingUpdateDaemonSet{ - MaxUnavailable: &mu, - }, - }, - }, - Status: appsv1.DaemonSetStatus{DesiredNumberScheduled: 100}, - }, metav1.CreateOptions{}) - Expect(err).To(BeNil()) - - // 10% of 100 = 10. - Expect(ta.resolveDaemonSetMaxUnavailable("calico-node")).To(Equal(10)) - }) - }) - - Context("stalePodIPRecoveryEnabled gate", func() { - const ( - typhaSelector = "k8s-app=calico-typha" - calicoNodeSelector = "k8s-app=calico-node" - ) - - BeforeEach(func() { - // The autoscaler may degrade if there aren't enough linux nodes to satisfy - // the expected typha scale. We don't care about scaling behavior here, so - // allow any SetDegraded call. - statusManager.On("SetDegraded", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() - }) - - // stale-node has InternalIP 10.0.0.2; pods are placed on it with podIP 10.0.0.1 - // to make them stale. - ensureStaleNode := func() { - if _, err := c.CoreV1().Nodes().Get(ctx, "stale-node", metav1.GetOptions{}); err == nil { - return - } - node := CreateNode(c, "stale-node", map[string]string{"kubernetes.io/os": "linux"}, nil) - node.Status.Addresses = []corev1.NodeAddress{{Type: corev1.NodeInternalIP, Address: "10.0.0.2"}} - _, err := c.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{}) - Expect(err).NotTo(HaveOccurred()) - } - - createStalePod := func(name, k8sApp string) { - ensureStaleNode() - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, Namespace: "calico-system", - Labels: map[string]string{"k8s-app": k8sApp}, - }, - Spec: corev1.PodSpec{NodeName: "stale-node"}, - Status: corev1.PodStatus{PodIPs: []corev1.PodIP{{IP: "10.0.0.1"}}}, - } - pod, err := c.CoreV1().Pods("calico-system").Create(ctx, pod, metav1.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) - pod.Status.PodIPs = []corev1.PodIP{{IP: "10.0.0.1"}} - _, err = c.CoreV1().Pods("calico-system").UpdateStatus(ctx, pod, metav1.UpdateOptions{}) - Expect(err).NotTo(HaveOccurred()) - } - - listPods := func(selector string) []corev1.Pod { - pods, err := c.CoreV1().Pods("calico-system").List(ctx, metav1.ListOptions{LabelSelector: selector}) - Expect(err).NotTo(HaveOccurred()) - return pods.Items - } - - It("nil getter (default) is treated as enabled", func() { - ta := newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) - ta.start(ctx) - - createStalePod("typha-abc", "calico-typha") - Eventually(func() int { return len(listPods(typhaSelector)) }, 5*time.Second).Should(Equal(0)) - }) - - It("getter returning true allows deletions to proceed", func() { - ta := newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, - typhaAutoscalerOptionPeriod(10*time.Millisecond), - typhaAutoscalerOptionStalePodIPRecoveryEnabled(func() bool { return true }), - ) - ta.start(ctx) - - createStalePod("typha-abc", "calico-typha") - Eventually(func() int { return len(listPods(typhaSelector)) }, 5*time.Second).Should(Equal(0)) - }) - - It("getter returning false suppresses all deletions", func() { - ta := newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, - typhaAutoscalerOptionPeriod(10*time.Millisecond), - typhaAutoscalerOptionStalePodIPRecoveryEnabled(func() bool { return false }), - ) - ta.start(ctx) - - createStalePod("typha-abc", "calico-typha") - createStalePod("calico-node-abc", "calico-node") - - // Wait long enough for several ticks; nothing should be deleted. - Consistently(func() int { - return len(listPods(typhaSelector)) + len(listPods(calicoNodeSelector)) - }, 200*time.Millisecond, 20*time.Millisecond).Should(Equal(2)) - }) - }) }) func verifyTyphaReplicas(c kubernetes.Interface, expectedReplicas int) { diff --git a/pkg/controller/podiprecovery/podiprecovery_controller.go b/pkg/controller/podiprecovery/podiprecovery_controller.go new file mode 100644 index 0000000000..3c5d4d2025 --- /dev/null +++ b/pkg/controller/podiprecovery/podiprecovery_controller.go @@ -0,0 +1,272 @@ +// Copyright (c) 2026 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package podiprecovery contains a small controller that watches Kubernetes +// Nodes for InternalIP changes and deletes operator-managed host-networked +// pods whose status.podIPs no longer matches the node's current InternalIP. +// +// This works around an upstream Kubernetes behavior +// (https://github.com/kubernetes/kubernetes/issues/93897) where status.podIPs +// is immutable for hostNetwork pods once set. When a node's IP changes +// (e.g. after a KubeVirt VM reboot pulls a new DHCP lease), existing +// hostNetwork pods keep their stale IPs in their status, the Kubernetes +// EndpointSlice controller advertises the stale IPs, and Felix can't reach +// Typha. Only deleting and recreating the pod causes the kubelet to populate +// status.podIPs from the current node IP. +package podiprecovery + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/tigera/operator/pkg/controller/options" + "github.com/tigera/operator/pkg/ctrlruntime" + "github.com/tigera/operator/pkg/render" + "github.com/tigera/operator/pkg/render/applicationlayer" + "github.com/tigera/operator/pkg/render/intrusiondetection/dpi" + "github.com/tigera/operator/pkg/render/webhooks" +) + +var log = logf.Log.WithName("controller_podiprecovery") + +// targetLabelSelectors is the set of label selectors identifying +// operator-managed pods that are (or may be) host-networked. The controller +// applies a per-pod hostNetwork check before deleting, so non-hostNetwork +// pods that happen to match are left alone. +var targetLabelSelectors = []labels.Selector{ + labels.SelectorFromSet(labels.Set{render.AppLabelName: render.TyphaK8sAppName}), + labels.SelectorFromSet(labels.Set{render.AppLabelName: render.CalicoNodeObjectName}), + labels.SelectorFromSet(labels.Set{render.AppLabelName: render.WindowsNodeObjectName}), + labels.SelectorFromSet(labels.Set{render.AppLabelName: dpi.DeepPacketInspectionName}), + labels.SelectorFromSet(labels.Set{render.AppLabelName: applicationlayer.ApplicationLayerDaemonsetName}), + labels.SelectorFromSet(labels.Set{"apiserver": "true"}), + labels.SelectorFromSet(labels.Set{render.AppLabelName: webhooks.WebhooksName}), +} + +// Add wires the controller into the manager. +func Add(mgr manager.Manager, opts options.ControllerOptions) error { + r := &Reconciler{ + client: mgr.GetClient(), + scheme: mgr.GetScheme(), + } + + c, err := ctrlruntime.NewController("podiprecovery-controller", mgr, controller.Options{Reconciler: r}) + if err != nil { + return fmt.Errorf("failed to create podiprecovery-controller: %w", err) + } + + // Watch Node objects. Only enqueue reconciliations when the set of + // InternalIPs has changed — that is the only signal that interests us, + // and it avoids spurious reconciles for routine kubelet heartbeats. + if err := c.WatchObject(&corev1.Node{}, &handler.EnqueueRequestForObject{}, internalIPChangedPredicate()); err != nil { + return fmt.Errorf("podiprecovery-controller failed to watch Nodes: %w", err) + } + + return nil +} + +// internalIPChangedPredicate filters Node events so reconciles only fire when +// the node's InternalIPs change (including initial set / removal). New nodes +// are reconciled once to handle the case where pods are scheduled before the +// Node's status is populated. +func internalIPChangedPredicate() predicate.Predicate { + return predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldNode, oldOK := e.ObjectOld.(*corev1.Node) + newNode, newOK := e.ObjectNew.(*corev1.Node) + if !oldOK || !newOK { + return false + } + return !sameInternalIPs(oldNode.Status.Addresses, newNode.Status.Addresses) + }, + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + } +} + +// sameInternalIPs returns true when both slices contain the same set of +// NodeInternalIP addresses (order-independent). +func sameInternalIPs(a, b []corev1.NodeAddress) bool { + aIPs := internalIPSet(a) + bIPs := internalIPSet(b) + if len(aIPs) != len(bIPs) { + return false + } + for ip := range aIPs { + if !bIPs[ip] { + return false + } + } + return true +} + +func internalIPSet(addrs []corev1.NodeAddress) map[string]bool { + out := map[string]bool{} + for _, a := range addrs { + if a.Type == corev1.NodeInternalIP { + out[a.Address] = true + } + } + return out +} + +// Reconciler implements reconcile.Reconciler. +type Reconciler struct { + client client.Client + scheme *runtime.Scheme +} + +var _ reconcile.Reconciler = &Reconciler{} + +// Reconcile is called for a Node when its InternalIPs change (or on initial +// creation). It lists operator-managed pods on the node and deletes any +// host-networked pod whose status.podIPs doesn't include any of the node's +// current InternalIPs. +func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.WithValues("node", req.Name) + + node := &corev1.Node{} + if err := r.client.Get(ctx, req.NamespacedName, node); err != nil { + if apierrors.IsNotFound(err) { + // Node is gone — Kubernetes garbage collection will clean up + // the pods that ran on it. Nothing to do here. + return ctrl.Result{}, nil + } + return ctrl.Result{}, fmt.Errorf("failed to get Node %q: %w", req.Name, err) + } + + nodeIPs := internalIPSet(node.Status.Addresses) + if len(nodeIPs) == 0 { + // Nothing to compare against; bail out to avoid deleting pods + // based on a transient empty status. + logger.V(1).Info("Node has no InternalIPs reported; skipping pod IP check") + return ctrl.Result{}, nil + } + + // List operator-managed pods running on this node. We list once per + // label selector and filter by spec.nodeName on the client side. The + // pod list is small enough that this is cheap. + pods, err := r.listOperatorManagedPodsOnNode(ctx, node.Name) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to list pods on node %q: %w", node.Name, err) + } + + var firstErr error + deleted := 0 + for i := range pods { + pod := &pods[i] + if !pod.Spec.HostNetwork { + // Safety check: only delete hostNetwork pods. A non-hostNetwork + // pod that happens to match our labels has a CNI-assigned IP + // that legitimately differs from the node's IP. + continue + } + if podIPMatchesNode(pod, nodeIPs) { + continue + } + + podIPs := make([]string, 0, len(pod.Status.PodIPs)) + for _, pip := range pod.Status.PodIPs { + podIPs = append(podIPs, pip.IP) + } + logger.Info("Deleting pod with stale IP after node IP change so its controller can recreate it with the current IP", + "pod", pod.Name, "namespace", pod.Namespace, + "podIPs", podIPs, "nodeInternalIPs", keys(nodeIPs)) + + if delErr := r.client.Delete(ctx, pod); delErr != nil && !apierrors.IsNotFound(delErr) { + logger.Error(delErr, "Failed to delete pod with stale IP", "pod", pod.Name, "namespace", pod.Namespace) + if firstErr == nil { + firstErr = delErr + } + continue + } + deleted++ + } + + if deleted > 0 { + logger.Info("Deleted stale-IP pods on node", "count", deleted) + } + return ctrl.Result{}, firstErr +} + +// listOperatorManagedPodsOnNode lists pods on the given node that match any +// of the operator's host-networked-workload label selectors. +func (r *Reconciler) listOperatorManagedPodsOnNode(ctx context.Context, nodeName string) ([]corev1.Pod, error) { + seen := map[string]struct{}{} + var out []corev1.Pod + for _, sel := range targetLabelSelectors { + var pl corev1.PodList + if err := r.client.List(ctx, &pl, &client.ListOptions{LabelSelector: sel}); err != nil { + return nil, fmt.Errorf("listing pods with selector %q: %w", sel.String(), err) + } + for i := range pl.Items { + pod := &pl.Items[i] + if pod.Spec.NodeName != nodeName { + continue + } + key := pod.Namespace + "/" + pod.Name + if _, dup := seen[key]; dup { + continue + } + seen[key] = struct{}{} + out = append(out, *pod) + } + } + return out, nil +} + +// podIPMatchesNode returns true if any of the pod's reported IPs is also +// listed as an InternalIP on the node. +func podIPMatchesNode(pod *corev1.Pod, nodeIPs map[string]bool) bool { + for _, pip := range pod.Status.PodIPs { + if nodeIPs[pip.IP] { + return true + } + } + // Fall back to the singular PodIP field for older pods that haven't + // been re-statused with podIPs. + if pod.Status.PodIP != "" && nodeIPs[pod.Status.PodIP] { + return true + } + return false +} + +func keys(m map[string]bool) []string { + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) + } + return out +} diff --git a/pkg/controller/podiprecovery/podiprecovery_controller_test.go b/pkg/controller/podiprecovery/podiprecovery_controller_test.go new file mode 100644 index 0000000000..effae6e3b3 --- /dev/null +++ b/pkg/controller/podiprecovery/podiprecovery_controller_test.go @@ -0,0 +1,233 @@ +// Copyright (c) 2026 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package podiprecovery + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + ctrlrfake "github.com/tigera/operator/pkg/ctrlruntime/client/fake" +) + +const ns = "calico-system" + +var _ = Describe("PodIPRecovery controller", func() { + var ( + ctx context.Context + c client.Client + r *Reconciler + ) + + newNode := func(name string, internalIPs ...string) *corev1.Node { + n := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + } + for _, ip := range internalIPs { + n.Status.Addresses = append(n.Status.Addresses, corev1.NodeAddress{ + Type: corev1.NodeInternalIP, + Address: ip, + }) + } + return n + } + + newPod := func(name, nodeName, podIP string, hostNetwork bool, labels map[string]string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + Labels: labels, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + HostNetwork: hostNetwork, + }, + Status: corev1.PodStatus{ + PodIP: podIP, + PodIPs: []corev1.PodIP{{IP: podIP}}, + }, + } + } + + BeforeEach(func() { + ctx = context.Background() + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + c = ctrlrfake.DefaultFakeClientBuilder(scheme).Build() + r = &Reconciler{client: c, scheme: scheme} + }) + + reconcileNode := func(nodeName string) { + _, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: nodeName}}) + Expect(err).NotTo(HaveOccurred()) + } + + podExists := func(name string) bool { + err := c.Get(ctx, types.NamespacedName{Namespace: ns, Name: name}, &corev1.Pod{}) + if apierrors.IsNotFound(err) { + return false + } + Expect(err).NotTo(HaveOccurred()) + return true + } + + Context("Reconcile", func() { + It("leaves a pod alone when its IP matches the node InternalIP", func() { + Expect(c.Create(ctx, newNode("node1", "10.0.0.1"))).To(Succeed()) + Expect(c.Create(ctx, newPod("typha", "node1", "10.0.0.1", true, + map[string]string{"k8s-app": "calico-typha"}))).To(Succeed()) + + reconcileNode("node1") + Expect(podExists("typha")).To(BeTrue()) + }) + + It("deletes a hostNetwork pod whose IP doesn't match the node InternalIP", func() { + Expect(c.Create(ctx, newNode("node1", "10.0.0.2"))).To(Succeed()) + Expect(c.Create(ctx, newPod("typha", "node1", "10.0.0.1", true, + map[string]string{"k8s-app": "calico-typha"}))).To(Succeed()) + + reconcileNode("node1") + Expect(podExists("typha")).To(BeFalse()) + }) + + It("deletes stale pods of multiple workloads on the same node in one reconcile (no pacing)", func() { + Expect(c.Create(ctx, newNode("node1", "10.0.0.2"))).To(Succeed()) + Expect(c.Create(ctx, newPod("typha-1", "node1", "10.0.0.1", true, + map[string]string{"k8s-app": "calico-typha"}))).To(Succeed()) + Expect(c.Create(ctx, newPod("node-1", "node1", "10.0.0.1", true, + map[string]string{"k8s-app": "calico-node"}))).To(Succeed()) + Expect(c.Create(ctx, newPod("nodewin-1", "node1", "10.0.0.1", true, + map[string]string{"k8s-app": "calico-node-windows"}))).To(Succeed()) + + reconcileNode("node1") + Expect(podExists("typha-1")).To(BeFalse()) + Expect(podExists("node-1")).To(BeFalse()) + Expect(podExists("nodewin-1")).To(BeFalse()) + }) + + It("only touches pods on the reconciled node", func() { + Expect(c.Create(ctx, newNode("node1", "10.0.0.2"))).To(Succeed()) + Expect(c.Create(ctx, newNode("node2", "10.0.0.3"))).To(Succeed()) + Expect(c.Create(ctx, newPod("typha-1", "node1", "10.0.0.1", true, + map[string]string{"k8s-app": "calico-typha"}))).To(Succeed()) + Expect(c.Create(ctx, newPod("typha-2", "node2", "10.0.0.1", true, + map[string]string{"k8s-app": "calico-typha"}))).To(Succeed()) + + reconcileNode("node1") + Expect(podExists("typha-1")).To(BeFalse(), "stale pod on node1 should be deleted") + Expect(podExists("typha-2")).To(BeTrue(), "stale pod on node2 should be untouched") + }) + + It("returns without error when the node is gone", func() { + // No node created; reconcile should be a no-op. + _, err := r.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "missing"}}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("skips a non-hostNetwork pod even if its labels match", func() { + Expect(c.Create(ctx, newNode("node1", "10.0.0.2"))).To(Succeed()) + Expect(c.Create(ctx, newPod("cnipod", "node1", "10.244.0.5", false, + map[string]string{"k8s-app": "calico-typha"}))).To(Succeed()) + + reconcileNode("node1") + Expect(podExists("cnipod")).To(BeTrue()) + }) + + It("matches dual-stack pod IPs against any of the node's InternalIPs", func() { + node := newNode("node1", "10.0.0.1", "fd00::1") + Expect(c.Create(ctx, node)).To(Succeed()) + + // Pod reports both v4 and v6; one matches, so the pod is healthy. + pod := newPod("typha", "node1", "10.0.0.1", true, map[string]string{"k8s-app": "calico-typha"}) + pod.Status.PodIPs = []corev1.PodIP{{IP: "10.0.0.1"}, {IP: "fd00::1"}} + Expect(c.Create(ctx, pod)).To(Succeed()) + + reconcileNode("node1") + Expect(podExists("typha")).To(BeTrue()) + }) + + It("skips reconcile when the node has no InternalIPs reported", func() { + // Avoid deleting based on a transient empty status. + Expect(c.Create(ctx, newNode("node1" /* no IPs */))).To(Succeed()) + Expect(c.Create(ctx, newPod("typha", "node1", "10.0.0.1", true, + map[string]string{"k8s-app": "calico-typha"}))).To(Succeed()) + + reconcileNode("node1") + Expect(podExists("typha")).To(BeTrue()) + }) + + It("deletes stale apiserver pods (different label scheme)", func() { + // apiserver uses {apiserver: true} rather than k8s-app=... + Expect(c.Create(ctx, newNode("node1", "10.0.0.2"))).To(Succeed()) + Expect(c.Create(ctx, newPod("apiserver", "node1", "10.0.0.1", true, + map[string]string{"apiserver": "true"}))).To(Succeed()) + + reconcileNode("node1") + Expect(podExists("apiserver")).To(BeFalse()) + }) + }) + + Context("internalIPChangedPredicate", func() { + pred := internalIPChangedPredicate() + + It("enqueues on Create", func() { + Expect(pred.Create(event.CreateEvent{Object: newNode("n1", "10.0.0.1")})).To(BeTrue()) + }) + + It("does not enqueue on Delete", func() { + Expect(pred.Delete(event.DeleteEvent{Object: newNode("n1", "10.0.0.1")})).To(BeFalse()) + }) + + It("enqueues on Update when InternalIPs change", func() { + old := newNode("n1", "10.0.0.1") + new := newNode("n1", "10.0.0.2") + Expect(pred.Update(event.UpdateEvent{ObjectOld: old, ObjectNew: new})).To(BeTrue()) + }) + + It("does not enqueue on Update when InternalIPs are unchanged (heartbeat-only)", func() { + old := newNode("n1", "10.0.0.1") + new := newNode("n1", "10.0.0.1") + // Simulate a heartbeat that adds a Hostname address but keeps the InternalIP. + new.Status.Addresses = append(new.Status.Addresses, corev1.NodeAddress{ + Type: corev1.NodeHostName, + Address: "n1", + }) + Expect(pred.Update(event.UpdateEvent{ObjectOld: old, ObjectNew: new})).To(BeFalse()) + }) + + It("treats InternalIPs as a set (order-insensitive)", func() { + old := newNode("n1", "10.0.0.1", "fd00::1") + new := newNode("n1", "fd00::1", "10.0.0.1") + Expect(pred.Update(event.UpdateEvent{ObjectOld: old, ObjectNew: new})).To(BeFalse()) + }) + + It("enqueues when a new InternalIP is added", func() { + old := newNode("n1", "10.0.0.1") + new := newNode("n1", "10.0.0.1", "fd00::1") + Expect(pred.Update(event.UpdateEvent{ObjectOld: old, ObjectNew: new})).To(BeTrue()) + }) + }) +}) diff --git a/pkg/controller/podiprecovery/podiprecovery_suite_test.go b/pkg/controller/podiprecovery/podiprecovery_suite_test.go new file mode 100644 index 0000000000..2a4705db7e --- /dev/null +++ b/pkg/controller/podiprecovery/podiprecovery_suite_test.go @@ -0,0 +1,34 @@ +// Copyright (c) 2026 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package podiprecovery + +import ( + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + uzap "go.uber.org/zap" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +func TestPodIPRecovery(t *testing.T) { + logf.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true), zap.Level(uzap.NewAtomicLevelAt(uzap.DebugLevel)))) + gomega.RegisterFailHandler(ginkgo.Fail) + suiteConfig, reporterConfig := ginkgo.GinkgoConfiguration() + reporterConfig.JUnitReport = "../../../report/ut/podiprecovery_controller_suite.xml" + ginkgo.RunSpecs(t, "pkg/controller/podiprecovery Suite", suiteConfig, reporterConfig) +} diff --git a/pkg/imports/crds/operator/operator.tigera.io_installations.yaml b/pkg/imports/crds/operator/operator.tigera.io_installations.yaml index c3db058162..1e04c6eb26 100644 --- a/pkg/imports/crds/operator/operator.tigera.io_installations.yaml +++ b/pkg/imports/crds/operator/operator.tigera.io_installations.yaml @@ -7384,19 +7384,6 @@ spec: items: type: string type: array - stalePodIPRecovery: - description: |- - StalePodIPRecovery enables automatic detection and deletion of host-networked Calico pods - (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches their - node's current InternalIP. When stale pod IPs are detected (e.g., after a node reboot pulls - a new DHCP lease), the operator deletes affected pods so the Deployment / DaemonSet - controllers recreate them with the correct IP. This works around an upstream Kubernetes - limitation where status.podIPs is immutable for hostNetwork pods. - Default: Enabled - enum: - - Enabled - - Disabled - type: string tlsCipherSuites: description: TLSCipherSuites defines the cipher suite list that the @@ -16750,19 +16737,6 @@ spec: items: type: string type: array - stalePodIPRecovery: - description: |- - StalePodIPRecovery enables automatic detection and deletion of host-networked Calico pods - (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches their - node's current InternalIP. When stale pod IPs are detected (e.g., after a node reboot pulls - a new DHCP lease), the operator deletes affected pods so the Deployment / DaemonSet - controllers recreate them with the correct IP. This works around an upstream Kubernetes - limitation where status.podIPs is immutable for hostNetwork pods. - Default: Enabled - enum: - - Enabled - - Disabled - type: string tlsCipherSuites: description: TLSCipherSuites defines the cipher suite list that From f2f4bb4a3730705b389ff751c1cd1d707548a715 Mon Sep 17 00:00:00 2001 From: Pedro Coutinho Date: Thu, 21 May 2026 14:48:24 -0700 Subject: [PATCH 4/4] stop treating unpopulated pod IPs due to them starting up as stale --- .../podiprecovery/podiprecovery_controller.go | 11 ++++--- .../podiprecovery_controller_test.go | 32 +++++++++++++++++++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/pkg/controller/podiprecovery/podiprecovery_controller.go b/pkg/controller/podiprecovery/podiprecovery_controller.go index 3c5d4d2025..84cb81448b 100644 --- a/pkg/controller/podiprecovery/podiprecovery_controller.go +++ b/pkg/controller/podiprecovery/podiprecovery_controller.go @@ -193,6 +193,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // that legitimately differs from the node's IP. continue } + if len(pod.Status.PodIPs) == 0 && pod.Status.PodIP == "" { + // Pod hasn't been status-populated yet (e.g. Pending, kubelet + // has not admitted it). The kubelet will set the correct IPs on + // admission; deleting now would just race that. + continue + } if podIPMatchesNode(pod, nodeIPs) { continue } @@ -255,11 +261,6 @@ func podIPMatchesNode(pod *corev1.Pod, nodeIPs map[string]bool) bool { return true } } - // Fall back to the singular PodIP field for older pods that haven't - // been re-statused with podIPs. - if pod.Status.PodIP != "" && nodeIPs[pod.Status.PodIP] { - return true - } return false } diff --git a/pkg/controller/podiprecovery/podiprecovery_controller_test.go b/pkg/controller/podiprecovery/podiprecovery_controller_test.go index effae6e3b3..c721b437ea 100644 --- a/pkg/controller/podiprecovery/podiprecovery_controller_test.go +++ b/pkg/controller/podiprecovery/podiprecovery_controller_test.go @@ -188,6 +188,24 @@ var _ = Describe("PodIPRecovery controller", func() { reconcileNode("node1") Expect(podExists("apiserver")).To(BeFalse()) }) + + It("leaves a pending pod (no podIPs reported yet) alone", func() { + // A pod that was just scheduled but hasn't been admitted by the + // kubelet yet has empty status.podIPs. Deleting it would race the + // kubelet, which is about to populate the IPs correctly from the + // node's current address. + Expect(c.Create(ctx, newNode("node1", "10.0.0.1"))).To(Succeed()) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pending", Namespace: ns, + Labels: map[string]string{"k8s-app": "calico-typha"}}, + Spec: corev1.PodSpec{NodeName: "node1", HostNetwork: true}, + // Intentionally no Status.PodIPs / Status.PodIP — pending pod. + } + Expect(c.Create(ctx, pod)).To(Succeed()) + + reconcileNode("node1") + Expect(podExists("pending")).To(BeTrue()) + }) }) Context("internalIPChangedPredicate", func() { @@ -229,5 +247,19 @@ var _ = Describe("PodIPRecovery controller", func() { new := newNode("n1", "10.0.0.1", "fd00::1") Expect(pred.Update(event.UpdateEvent{ObjectOld: old, ObjectNew: new})).To(BeTrue()) }) + + It("does not enqueue on Update when only ExternalIP changes", func() { + // Cloud environments commonly reassign external IPs while the + // node's internal IP stays put. Don't react to those. + old := newNode("n1", "10.0.0.1") + old.Status.Addresses = append(old.Status.Addresses, corev1.NodeAddress{ + Type: corev1.NodeExternalIP, Address: "203.0.113.1", + }) + new := newNode("n1", "10.0.0.1") + new.Status.Addresses = append(new.Status.Addresses, corev1.NodeAddress{ + Type: corev1.NodeExternalIP, Address: "203.0.113.99", + }) + Expect(pred.Update(event.UpdateEvent{ObjectOld: old, ObjectNew: new})).To(BeFalse()) + }) }) })