Skip to content

Fix: CloudWatch/Watchtower logs dropped in Task SDK due to handler lifetime bugs#66633

Open
korex-f wants to merge 11 commits into
apache:mainfrom
korex-f:fix/remote-log-handler-premature-close
Open

Fix: CloudWatch/Watchtower logs dropped in Task SDK due to handler lifetime bugs#66633
korex-f wants to merge 11 commits into
apache:mainfrom
korex-f:fix/remote-log-handler-premature-close

Conversation

@korex-f

@korex-f korex-f commented May 9, 2026

Copy link
Copy Markdown
Contributor

Three related bugs all produce WatchtowerWarning: "Received message after logging system shutdown", causing CloudWatch log streams to be truncated or never created.

Bug 1 (task-sdk/src/airflow/sdk/log.py):
In configure_logging(), remote.processors was accessed before the inner configure_logging() call ran dictConfig(). dictConfig() calls _clearExistingHandlers()logging.shutdown() on all existing handlers, killing the just-created watchtower handler (shutting_down=True) before any task log is emitted. Fix: move getattr(remote, "processors") to after dictConfig runs, injecting via a second structlog.configure() call.

Bug 2 (providers/amazon/.../cloudwatch_task_handler.py):
CloudWatchRemoteLogIO.handler was a cached_property — once killed by logging.shutdown() the dead instance was never replaced. The processors cached_property also captured _handler in a closure, pinning the dead instance. Fix: convert handler to a regular property that recreates when shutting_down=True; access self.handler dynamically in the processors closure.

Bug 3 (task-sdk/src/airflow/sdk/execution_time/supervisor.py):
_configure_logging() returned a plain tuple with no cleanup path. The only code that ever closed the remote handler was logging.shutdown() at process exit, which fired after supervise_task() returned while final task messages were still in flight. Fix: convert _configure_logging() to a contextmanager that explicitly flushes and closes the handler after process.wait() drains all task log messages.

closes: #66475

@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label May 11, 2026
@chris-stetter

Copy link
Copy Markdown

I can confirm that this works in our ECS environment. Thank you!

@korex-f

korex-f commented May 11, 2026

Copy link
Copy Markdown
Contributor Author

You're very much welcome.

Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py Outdated

@o-nikolas o-nikolas left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected some new or more updated test cases to ensure this doesn't regress again? Do you not think that's required here @korex-f ?

Comment thread providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py Outdated
Comment thread providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py Outdated
Comment thread providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py
Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py Outdated
Comment thread task-sdk/src/airflow/sdk/log.py Outdated
@korex-f

korex-f commented May 13, 2026

Copy link
Copy Markdown
Contributor Author

I would have expected some new or more updated test cases to ensure this doesn't regress again? Do you not think that's required here @korex-f ?

Good point, I have added tests to test_cloudwatch_task_handler.py covering the self-healing handler property (verifies a new handler is created when shutting_down=True) and the dynamic self.handler access in the processors closure.

@o-nikolas o-nikolas left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me now, but I still think @ashb or @amoghrajesh should have a look

@potiuk

potiuk commented May 18, 2026

Copy link
Copy Markdown
Member

@korex-f — There are 2 unresolved review threads on this PR from @o-nikolas. Could you either push a fix or reply in each thread explaining why the feedback doesn't apply? Once you believe the feedback is addressed, mark the thread as resolved so the reviewer isn't re-pinged needlessly. Thanks!


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

Comment thread providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py Outdated

@seanghaeli seanghaeli left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified end-to-end this fixes remote logging. Thanks @korex-f this issue has been floating around for a while.

@korex-f korex-f force-pushed the fix/remote-log-handler-premature-close branch from 904f1ea to ff723d1 Compare June 6, 2026 05:51
korex-f added 8 commits June 6, 2026 09:58
…etime bugs

Three related bugs all produce WatchtowerWarning: "Received message
after
logging system shutdown", causing CloudWatch log streams to be truncated
or never created.

Bug 1 (task-sdk/src/airflow/sdk/log.py):
In configure_logging(), remote.processors was accessed before the inner
configure_logging() call ran dictConfig(). dictConfig() calls
_clearExistingHandlers() -> logging.shutdown() on all existing handlers,
killing the just-created watchtower handler (shutting_down=True) before
any task log is emitted. Fix: move getattr(remote, "processors") to
after
dictConfig runs, injecting via a second structlog.configure() call.

Bug 2 (providers/amazon/.../cloudwatch_task_handler.py):
CloudWatchRemoteLogIO.handler was a cached_property — once killed by
logging.shutdown() the dead instance was never replaced. The processors
cached_property also captured _handler in a closure, pinning the dead
instance. Fix: convert handler to a regular property that recreates when
shutting_down=True; access self.handler dynamically in the processors
closure.

Bug 3 (task-sdk/src/airflow/sdk/execution_time/supervisor.py):
_configure_logging() returned a plain tuple with no cleanup path. The
only code that ever closed the remote handler was logging.shutdown() at
process exit, which fired after supervise_task() returned while final
task messages were still in flight. Fix: convert _configure_logging() to
a contextmanager that explicitly flushes and closes the handler after
process.wait() drains all task log messages.

Fixes: apache#66475
…chRemoteLogIO to satisfy updated RemoteLogIO protocol
korex-f added 2 commits June 6, 2026 09:58
…e errors

- Restore original _configure_logging call site ordering so secrets
  masker reset happens after logging is configured, not before
- Simplify _configure_logging to yield only logger, not a tuple —
  file descriptor cleanup is internal to the context manager
- Replace contextlib.suppress(Exception) in _close_remote_log_handler
  with a logged warning so close errors are visible for debugging
- Revert f-string formatting change in _fork_main
- Move import structlog.typing back into TYPE_CHECKING block
- Revert handler to cached_property in CloudWatchRemoteLogIO,
  removing the self-healing property complexity introduced earlier
- Keep only the processors closure fix: access self.handler
  dynamically instead of captured _handler variable
- Remove tests for the self-healing property that is being reverted

@1fanwang 1fanwang left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the comments, lgtm!

@korex-f korex-f force-pushed the fix/remote-log-handler-premature-close branch from ff723d1 to 93a67a7 Compare June 7, 2026 00:44
@potiuk potiuk removed the ready for maintainer review Set after triaging when all criteria pass. label Jun 9, 2026
@potiuk

potiuk commented Jun 9, 2026

Copy link
Copy Markdown
Member

@korex-f — A reviewer (@ashb) has requested changes on this PR, so I've removed the ready for maintainer review label — the next step is on your side. Could you address the review comments (push a fix, or reply in-thread explaining why the feedback doesn't apply)? Once addressed, re-request review from @ashb or re-mark the PR ready and it returns to the maintainer queue. Thank you.

Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

- Remove duplicate GetXComCount handler in _handler_request
- Fix with-statement syntax by wrapping conditional context manager
- Eliminate leftover merge-conflict artifacts
@korex-f korex-f requested a review from ashb June 10, 2026 08:24
@korex-f

korex-f commented Jun 10, 2026

Copy link
Copy Markdown
Contributor Author

Thanks for the review, @ashb. I have addressed the requested changes, pushed the fixes, and re-requested review. Please do take another look when you have the chance.

@sarvesh371

Copy link
Copy Markdown

Hey, running into this exact issue on Airflow 3.2.2 with KubernetesExecutor — task pods emit WatchtowerWarning: Received message after logging system shutdown and logs never make it to CloudWatch.

Any idea on ETA for merge and which provider version will ship the fix? Also is there any interim workaround while we wait?

Happy to test if there's a pre-release available.

@jason810496 jason810496 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this and I verify the root case as well. The overall direction LGTM.
However, this PR touches both Task-SDK and provider.

IIRC, the current convention is separating PRs of Task-SDK and provider. So we need to make this PR Task-SDK only and raise another one for provider or vice versa.

Any idea on ETA for merge and which provider version will ship the fix? Also is there any interim workaround while we wait?

If this PR doesn't make it to the 3.3 release (since it change the Task-SDK). Then I will merge the #68779 which solve the exact issue in different direction with provider-only changes (and I verify on k8s executor setup) -> the next AWS provider version should solve this problem.

If this PR catches 3.3 release -> using Airflow 3.3 plus the next AWS provider version could solve.

if this is not done before process exit.
"""
try:
handler.close()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to have more defensive compatible guard against the handler.close call.
Since not all remote logging handler support close method when using the Task-SDK version including this patch.

Suggested change
handler.close()
if hasattr(handler, "close", None):
handler.close()

Comment on lines 858 to -1778
@@ -1707,6 +1704,8 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id:
resp, dump_opts = handle_get_variable_keys(self.client, msg)
elif isinstance(msg, GetXCom):
resp, dump_opts = handle_get_xcom(self.client, msg)
elif isinstance(msg, GetXComCount):
resp, dump_opts = handle_get_xcom_count(self.client, msg)
elif isinstance(msg, GetXComSequenceItem):
resp, dump_opts = handle_get_xcom_sequence_item(self.client, msg)
elif isinstance(msg, GetXComSequenceSlice):
@@ -1774,8 +1773,6 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id:
dump_opts = {"exclude_unset": True}
elif isinstance(msg, GetPrevSuccessfulDagRun):
resp, dump_opts = handle_get_prev_successful_dag_run(self.client, self.id)
elif isinstance(msg, GetXComCount):
resp, dump_opts = handle_get_xcom_count(self.client, msg)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we revert these non-related changes? (the parameters style of def send_msg( and the order of elif isinstance(msg, GetXComCount): ).

# logger will emit:

# WatchtowerWarning: "Received message after logging system shutdown"
remote_handler = load_remote_log_handler()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the TODO instead of dropping it silently.

Suggested change
remote_handler = load_remote_log_handler()
# TODO: Use logging providers to handle the chunked upload for us etc.
remote_handler = load_remote_log_handler()

Comment on lines +2442 to +2443
remote_handler = load_remote_log_handler()
if remote_handler is not None:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
remote_handler = load_remote_log_handler()
if remote_handler is not None:
if (remote_handler := load_remote_log_handler()) is not None:

Comment on lines +67 to +69
def close(self) -> None:
"""Flush and close the remote log handler."""
...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to introduce the close method at RemoteLogIO protocol in this PR.
Since it's still nop for ES and OS remote IO implementation.

Comment on lines +33 to +34
import structlog
import structlog.typing

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import structlog
import structlog.typing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:logging area:providers area:task-sdk provider:amazon AWS/Amazon - related issues ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Cloudwatch remote logging does not work for ECS Executor

10 participants