You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Observed on 2.11.0. The affected code is provider code (KubernetesExecutor), identical on main, so 3.x is equally affected.
Operating System
N/A (Python code path). Observed on Azure AKS.
Deployment
Other 3rd-party Helm chart
Deployment details
Kubernetes, KubernetesExecutor, delete_worker_pods=False. Environment has completed (Succeeded) pods carrying an airflow-worker label different from the current scheduler's job id (multi-scheduler HA and/or scheduler restarts).
What happened
On a busy scheduler, patch_pod_executor_done() is called repeatedly against the same already-completed pods, every scheduler loop. In a ~96-second window we saw:
patch_pod_executor_done ("Patching pod ... to mark it as done"): 525 calls
_change_state patch log ("Patched pod ... to mark it as done"): 525
sync()"Changing state of ..." (logged once per real result): 30
"Deleted pod ...": 0 (confirms delete_worker_pods=False)
Distinct pods patched: 32; the busiest pods were patched exactly 30 times each.
525 patches with only 30"Changing state of" lines is impossible if _change_state were reached only via the result-queue (every result-queue item logs "Changing state of" first). The extra ~495 calls come from the un-logged self.completed loop.
Root cause
KubernetesExecutor.sync() re-runs _change_state() over the entire self.completed set, and nothing ever removes entries from that set.
_change_state() for these synthetic state="completed" entries patches the pod, then self.running.remove(key) raises KeyError (they were never in running) and the method returns early — so the entry is never removed from self.completed:
else:
self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name, namespace=namespace) # L482self.log.info("Patched pod %s in namespace %s to mark it as done", key, namespace) # L483try:
self.running.remove(key) # L486exceptKeyError:
self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
return# L489 <-- no removal from self.completed
Two compounding defects:
self.completed is never drained. It is only ever declared (L117), iterated (L303), and added to (L816) — no .clear(), .discard(), .remove(), or reassignment anywhere (verified in 10.18.0 and on main). So every adopted completed pod is re-PATCHed forever, and the set grows monotonically over the scheduler's lifetime.
The loop is nested inside the result-queue while True. It runs once per dequeued result, not once per sync(). PATCH volume ≈ (#results processed) × |self.completed| (the 30 passes × ~16 pods ≈ ~500 we observed).
Permalinks (10.18.0), base https://github.com/apache/airflow/blob/providers-cncf-kubernetes/10.18.0/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
Declaration: #L117
Loop: #L303-L304
Add: #L816
What you think should happen instead
Adopted completed pods should be processed once and then dropped:
Move the for result in self.completed loop out of the result-queue while True so it runs once per sync().
Drain the set after processing — self.completed.clear() (or discard each entry, conservatively only after a successful patch/delete, since patch_pod_executor_done swallows ApiException).
Sketch:
withcontextlib.suppress(Empty):
whileTrue:
results=self.result_queue.get_nowait()
try:
...
self._change_state(results)
finally:
self.result_queue.task_done()
# Process adopted completed pods once per sync, then drain.ifself.completed:
forresultinlist(self.completed):
self._change_state(result)
self.completed.clear()
Produce Succeeded pods whose airflow-worker label differs from the current scheduler's job id — e.g. restart the scheduler (new job id) while Succeeded pods from the previous incarnation remain, or run ≥2 schedulers.
Wait one [scheduler] orphaned_tasks_check_interval tick (default 300s) for _adopt_completed_pods to populate self.completed.
Observe: on every subsequent sync() that processes any watcher result, every pod in self.completed is re-PATCHed ("Patching pod ... to mark it as done"), and the set never shrinks until the scheduler restarts.
Quick check against scheduler logs:
grep -c "Patching pod" scheduler.log # high
grep -c "Changing state of" scheduler.log # much lower
grep "Patching pod" scheduler.log \
| sed -E 's/.*Patching pod ([a-z0-9-]+) in namespace.*/\1/' \
| sort | uniq -c | sort -rn | head # same pods, patched many times each
delete_worker_pods=True only changes the verb (repeated DELETE 404s instead of PATCH); it does not drain the set.
Workaround until fixed: raise [scheduler] orphaned_tasks_check_interval (e.g. 86400) to stop repopulating the set (does not drain an already-populated set — requires a scheduler restart), and/or reduce scheduler restarts.
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==10.18.0(latest at time of filing).The flaw is present in the current code on
mainas well. Relevant history:self.completedset + thefor result in self.completed: self._change_state(result)loop were introduced in 10.10.0._adopt_completed_pods()call fromsync()(which regularly fills the set) was added in 10.15.0 (k8s executor - ensure pods cleaned up #61839).Apache Airflow version
Observed on 2.11.0. The affected code is provider code (
KubernetesExecutor), identical onmain, so 3.x is equally affected.Operating System
N/A (Python code path). Observed on Azure AKS.
Deployment
Other 3rd-party Helm chart
Deployment details
Kubernetes,
KubernetesExecutor,delete_worker_pods=False. Environment has completed (Succeeded) pods carrying anairflow-workerlabel different from the current scheduler's job id (multi-scheduler HA and/or scheduler restarts).What happened
On a busy scheduler,
patch_pod_executor_done()is called repeatedly against the same already-completed pods, every scheduler loop. In a ~96-second window we saw:patch_pod_executor_done("Patching pod ... to mark it as done"): 525 calls_change_statepatch log ("Patched pod ... to mark it as done"): 525sync()"Changing state of ..."(logged once per real result): 30"Attempting to finish pod"/"finishing job"(watcher → result_queue): 30 / 30"Deleted pod ...": 0 (confirmsdelete_worker_pods=False)525patches with only30"Changing state of"lines is impossible if_change_statewere reached only via the result-queue (every result-queue item logs"Changing state of"first). The extra ~495 calls come from the un-loggedself.completedloop.Root cause
KubernetesExecutor.sync()re-runs_change_state()over the entireself.completedset, and nothing ever removes entries from that set.providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:_adopt_completed_pods()adds to the set but it is never cleared/discarded:_change_state()for these syntheticstate="completed"entries patches the pod, thenself.running.remove(key)raisesKeyError(they were never inrunning) and the method returns early — so the entry is never removed fromself.completed:Two compounding defects:
self.completedis never drained. It is only ever declared (L117), iterated (L303), and added to (L816) — no.clear(),.discard(),.remove(), or reassignment anywhere (verified in 10.18.0 and onmain). So every adopted completed pod is re-PATCHed forever, and the set grows monotonically over the scheduler's lifetime.while True. It runs once per dequeued result, not once persync(). PATCH volume ≈(#results processed) × |self.completed|(the30 passes × ~16 pods ≈ ~500we observed).Permalinks (10.18.0), base
https://github.com/apache/airflow/blob/providers-cncf-kubernetes/10.18.0/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:#L117#L303-L304#L816What you think should happen instead
Adopted completed pods should be processed once and then dropped:
for result in self.completedloop out of the result-queuewhile Trueso it runs once persync().self.completed.clear()(ordiscardeach entry, conservatively only after a successful patch/delete, sincepatch_pod_executor_doneswallowsApiException).Sketch:
How to reproduce
KubernetesExecutor,delete_worker_pods=False, provider>= 10.15.0.airflow-workerlabel differs from the current scheduler's job id — e.g. restart the scheduler (new job id) while Succeeded pods from the previous incarnation remain, or run ≥2 schedulers.[scheduler] orphaned_tasks_check_intervaltick (default 300s) for_adopt_completed_podsto populateself.completed.sync()that processes any watcher result, every pod inself.completedis re-PATCHed ("Patching pod ... to mark it as done"), and the set never shrinks until the scheduler restarts.Quick check against scheduler logs:
Anything else
PATCH /api/v1/namespaces/.../pods/...load (multiplied by the never-draining set), compounded by_list_podscost at scale (Kubernetes Executor List Pods Performance Improvement #35599), longer scheduler loops, and tasks stalling inscheduled/queuedunder churn — same symptom class as KubernetesExecutor: multi-scheduler completed-pod thrash (10.15.0+) #66396.delete_worker_pods=Trueonly changes the verb (repeatedDELETE404s instead ofPATCH); it does not drain the set.[scheduler] orphaned_tasks_check_interval(e.g.86400) to stop repopulating the set (does not drain an already-populated set — requires a scheduler restart), and/or reduce scheduler restarts.self.completed): Completed Kubernetes Pods not cleared up #57553, k8s executor - ensure pods cleaned up #61839 (10.15.0), KubernetesExecutor: multi-scheduler completed-pod thrash (10.15.0+) #66396 / KubernetesExecutor: scope periodic completed-pod adoption to dead schedulers #66400 (10.17.1), KubernetesExecutor: scheduler crash loop (DetachedInstanceError), nested scoped create_session() in _alive_other_scheduler_job_ids detaches orphan TaskInstances (10.17.1) #67891 / Fix scheduler crashloop fromKubernetesExecutorcompleted-pod adoption #67850 (10.18.0), Kubernetes Executor List Pods Performance Improvement #35599.Are you willing to submit PR?
Code of Conduct