Skip to content

KubernetesExecutor: self.completed adoption set is never drained, completed pods re-PATCHed (patch_pod_executor_done) every sync() loop #68683

@alixirs

Description

@alixirs

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==10.18.0 (latest at time of filing).

The flaw is present in the current code on main as well. Relevant history:

  • self.completed set + the for result in self.completed: self._change_state(result) loop were introduced in 10.10.0.
  • The periodic _adopt_completed_pods() call from sync() (which regularly fills the set) was added in 10.15.0 (k8s executor - ensure pods cleaned up #61839).

Apache Airflow version

Observed on 2.11.0. The affected code is provider code (KubernetesExecutor), identical on main, so 3.x is equally affected.

Operating System

N/A (Python code path). Observed on Azure AKS.

Deployment

Other 3rd-party Helm chart

Deployment details

Kubernetes, KubernetesExecutor, delete_worker_pods=False. Environment has completed (Succeeded) pods carrying an airflow-worker label different from the current scheduler's job id (multi-scheduler HA and/or scheduler restarts).

What happened

On a busy scheduler, patch_pod_executor_done() is called repeatedly against the same already-completed pods, every scheduler loop. In a ~96-second window we saw:

  • patch_pod_executor_done ("Patching pod ... to mark it as done"): 525 calls
  • _change_state patch log ("Patched pod ... to mark it as done"): 525
  • sync() "Changing state of ..." (logged once per real result): 30
  • "Attempting to finish pod" / "finishing job" (watcher → result_queue): 30 / 30
  • "Deleted pod ...": 0 (confirms delete_worker_pods=False)
  • Distinct pods patched: 32; the busiest pods were patched exactly 30 times each.

525 patches with only 30 "Changing state of" lines is impossible if _change_state were reached only via the result-queue (every result-queue item logs "Changing state of" first). The extra ~495 calls come from the un-logged self.completed loop.

Root cause

KubernetesExecutor.sync() re-runs _change_state() over the entire self.completed set, and nothing ever removes entries from that set.

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:

    def sync(self) -> None:
        ...
        # (periodic) populate self.completed from completed pods owned by other/dead schedulers
        if now - self._last_completed_pod_adoption >= adoption_interval:   # L273
            self._last_completed_pod_adoption = now
            self._adopt_completed_pods(self.kube_client)                   # L275

        ...
        with contextlib.suppress(Empty):
            while True:
                results = self.result_queue.get_nowait()
                try:
                    ...
                    self.log.info("Changing state of %s to %s", results, results.state)  # L289
                    self._change_state(results)                                            # L291
                finally:
                    self.result_queue.task_done()

                for result in self.completed:        # L303  <-- inside the while loop
                    self._change_state(result)        # L304  <-- never logged, never drained

_adopt_completed_pods() adds to the set but it is never cleared/discarded:

            ti_id = annotations_to_key(pod.metadata.annotations)
            self.completed.add(                                   # L816
                KubernetesResults(key=ti_id, state="completed", ...)
            )

_change_state() for these synthetic state="completed" entries patches the pod, then self.running.remove(key) raises KeyError (they were never in running) and the method returns early — so the entry is never removed from self.completed:

        else:
            self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name, namespace=namespace)  # L482
            self.log.info("Patched pod %s in namespace %s to mark it as done", key, namespace)   # L483
        try:
            self.running.remove(key)                              # L486
        except KeyError:
            self.log.debug("TI key not in running, not adding to event_buffer: %s", key)
            return                                                # L489  <-- no removal from self.completed

Two compounding defects:

  1. self.completed is never drained. It is only ever declared (L117), iterated (L303), and added to (L816) — no .clear(), .discard(), .remove(), or reassignment anywhere (verified in 10.18.0 and on main). So every adopted completed pod is re-PATCHed forever, and the set grows monotonically over the scheduler's lifetime.
  2. The loop is nested inside the result-queue while True. It runs once per dequeued result, not once per sync(). PATCH volume ≈ (#results processed) × |self.completed| (the 30 passes × ~16 pods ≈ ~500 we observed).

Permalinks (10.18.0), base https://github.com/apache/airflow/blob/providers-cncf-kubernetes/10.18.0/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:

  • Declaration: #L117
  • Loop: #L303-L304
  • Add: #L816

What you think should happen instead

Adopted completed pods should be processed once and then dropped:

  • Move the for result in self.completed loop out of the result-queue while True so it runs once per sync().
  • Drain the set after processing — self.completed.clear() (or discard each entry, conservatively only after a successful patch/delete, since patch_pod_executor_done swallows ApiException).

Sketch:

        with contextlib.suppress(Empty):
            while True:
                results = self.result_queue.get_nowait()
                try:
                    ...
                    self._change_state(results)
                finally:
                    self.result_queue.task_done()

        # Process adopted completed pods once per sync, then drain.
        if self.completed:
            for result in list(self.completed):
                self._change_state(result)
            self.completed.clear()

How to reproduce

  1. KubernetesExecutor, delete_worker_pods=False, provider >= 10.15.0.
  2. Produce Succeeded pods whose airflow-worker label differs from the current scheduler's job id — e.g. restart the scheduler (new job id) while Succeeded pods from the previous incarnation remain, or run ≥2 schedulers.
  3. Wait one [scheduler] orphaned_tasks_check_interval tick (default 300s) for _adopt_completed_pods to populate self.completed.
  4. Observe: on every subsequent sync() that processes any watcher result, every pod in self.completed is re-PATCHed ("Patching pod ... to mark it as done"), and the set never shrinks until the scheduler restarts.

Quick check against scheduler logs:

grep -c "Patching pod" scheduler.log            # high
grep -c "Changing state of" scheduler.log       # much lower
grep "Patching pod" scheduler.log \
  | sed -E 's/.*Patching pod ([a-z0-9-]+) in namespace.*/\1/' \
  | sort | uniq -c | sort -rn | head            # same pods, patched many times each

Anything else

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:providerskind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new releaseprovider:cncf-kubernetesKubernetes (k8s) provider related issues

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions