Fix: CloudWatch/Watchtower logs dropped in Task SDK due to handler lifetime bugs#66633
Fix: CloudWatch/Watchtower logs dropped in Task SDK due to handler lifetime bugs#66633korex-f wants to merge 11 commits into
Conversation
|
I can confirm that this works in our ECS environment. Thank you! |
|
You're very much welcome. |
Good point, I have added tests to test_cloudwatch_task_handler.py covering the self-healing handler property (verifies a new handler is created when shutting_down=True) and the dynamic self.handler access in the processors closure. |
o-nikolas
left a comment
There was a problem hiding this comment.
Looks reasonable to me now, but I still think @ashb or @amoghrajesh should have a look
|
@korex-f — There are 2 unresolved review threads on this PR from @o-nikolas. Could you either push a fix or reply in each thread explaining why the feedback doesn't apply? Once you believe the feedback is addressed, mark the thread as resolved so the reviewer isn't re-pinged needlessly. Thanks! Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
seanghaeli
left a comment
There was a problem hiding this comment.
Verified end-to-end this fixes remote logging. Thanks @korex-f this issue has been floating around for a while.
904f1ea to
ff723d1
Compare
…etime bugs Three related bugs all produce WatchtowerWarning: "Received message after logging system shutdown", causing CloudWatch log streams to be truncated or never created. Bug 1 (task-sdk/src/airflow/sdk/log.py): In configure_logging(), remote.processors was accessed before the inner configure_logging() call ran dictConfig(). dictConfig() calls _clearExistingHandlers() -> logging.shutdown() on all existing handlers, killing the just-created watchtower handler (shutting_down=True) before any task log is emitted. Fix: move getattr(remote, "processors") to after dictConfig runs, injecting via a second structlog.configure() call. Bug 2 (providers/amazon/.../cloudwatch_task_handler.py): CloudWatchRemoteLogIO.handler was a cached_property — once killed by logging.shutdown() the dead instance was never replaced. The processors cached_property also captured _handler in a closure, pinning the dead instance. Fix: convert handler to a regular property that recreates when shutting_down=True; access self.handler dynamically in the processors closure. Bug 3 (task-sdk/src/airflow/sdk/execution_time/supervisor.py): _configure_logging() returned a plain tuple with no cleanup path. The only code that ever closed the remote handler was logging.shutdown() at process exit, which fired after supervise_task() returned while final task messages were still in flight. Fix: convert _configure_logging() to a contextmanager that explicitly flushes and closes the handler after process.wait() drains all task log messages. Fixes: apache#66475
…chRemoteLogIO to satisfy updated RemoteLogIO protocol
…e errors - Restore original _configure_logging call site ordering so secrets masker reset happens after logging is configured, not before - Simplify _configure_logging to yield only logger, not a tuple — file descriptor cleanup is internal to the context manager - Replace contextlib.suppress(Exception) in _close_remote_log_handler with a logged warning so close errors are visible for debugging - Revert f-string formatting change in _fork_main - Move import structlog.typing back into TYPE_CHECKING block - Revert handler to cached_property in CloudWatchRemoteLogIO, removing the self-healing property complexity introduced earlier - Keep only the processors closure fix: access self.handler dynamically instead of captured _handler variable - Remove tests for the self-healing property that is being reverted
1fanwang
left a comment
There was a problem hiding this comment.
Thanks for addressing the comments, lgtm!
ff723d1 to
93a67a7
Compare
|
@korex-f — A reviewer (@ashb) has requested changes on this PR, so I've removed the Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
- Remove duplicate GetXComCount handler in _handler_request - Fix with-statement syntax by wrapping conditional context manager - Eliminate leftover merge-conflict artifacts
|
Thanks for the review, @ashb. I have addressed the requested changes, pushed the fixes, and re-requested review. Please do take another look when you have the chance. |
|
Hey, running into this exact issue on Airflow 3.2.2 with KubernetesExecutor — task pods emit WatchtowerWarning: Received message after logging system shutdown and logs never make it to CloudWatch. Any idea on ETA for merge and which provider version will ship the fix? Also is there any interim workaround while we wait? Happy to test if there's a pre-release available. |
There was a problem hiding this comment.
Thanks for fixing this and I verify the root case as well. The overall direction LGTM.
However, this PR touches both Task-SDK and provider.
IIRC, the current convention is separating PRs of Task-SDK and provider. So we need to make this PR Task-SDK only and raise another one for provider or vice versa.
Any idea on ETA for merge and which provider version will ship the fix? Also is there any interim workaround while we wait?
If this PR doesn't make it to the 3.3 release (since it change the Task-SDK). Then I will merge the #68779 which solve the exact issue in different direction with provider-only changes (and I verify on k8s executor setup) -> the next AWS provider version should solve this problem.
If this PR catches 3.3 release -> using Airflow 3.3 plus the next AWS provider version could solve.
| if this is not done before process exit. | ||
| """ | ||
| try: | ||
| handler.close() |
There was a problem hiding this comment.
It would be better to have more defensive compatible guard against the handler.close call.
Since not all remote logging handler support close method when using the Task-SDK version including this patch.
| handler.close() | |
| if hasattr(handler, "close", None): | |
| handler.close() |
| @@ -1707,6 +1704,8 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: | |||
| resp, dump_opts = handle_get_variable_keys(self.client, msg) | |||
| elif isinstance(msg, GetXCom): | |||
| resp, dump_opts = handle_get_xcom(self.client, msg) | |||
| elif isinstance(msg, GetXComCount): | |||
| resp, dump_opts = handle_get_xcom_count(self.client, msg) | |||
| elif isinstance(msg, GetXComSequenceItem): | |||
| resp, dump_opts = handle_get_xcom_sequence_item(self.client, msg) | |||
| elif isinstance(msg, GetXComSequenceSlice): | |||
| @@ -1774,8 +1773,6 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: | |||
| dump_opts = {"exclude_unset": True} | |||
| elif isinstance(msg, GetPrevSuccessfulDagRun): | |||
| resp, dump_opts = handle_get_prev_successful_dag_run(self.client, self.id) | |||
| elif isinstance(msg, GetXComCount): | |||
| resp, dump_opts = handle_get_xcom_count(self.client, msg) | |||
There was a problem hiding this comment.
Could we revert these non-related changes? (the parameters style of def send_msg( and the order of elif isinstance(msg, GetXComCount): ).
| # logger will emit: | ||
|
|
||
| # WatchtowerWarning: "Received message after logging system shutdown" | ||
| remote_handler = load_remote_log_handler() |
There was a problem hiding this comment.
Please keep the TODO instead of dropping it silently.
| remote_handler = load_remote_log_handler() | |
| # TODO: Use logging providers to handle the chunked upload for us etc. | |
| remote_handler = load_remote_log_handler() |
| remote_handler = load_remote_log_handler() | ||
| if remote_handler is not None: |
There was a problem hiding this comment.
nit:
| remote_handler = load_remote_log_handler() | |
| if remote_handler is not None: | |
| if (remote_handler := load_remote_log_handler()) is not None: |
| def close(self) -> None: | ||
| """Flush and close the remote log handler.""" | ||
| ... |
There was a problem hiding this comment.
I don't think we need to introduce the close method at RemoteLogIO protocol in this PR.
Since it's still nop for ES and OS remote IO implementation.
| import structlog | ||
| import structlog.typing |
There was a problem hiding this comment.
| import structlog | |
| import structlog.typing |
Three related bugs all produce
WatchtowerWarning: "Received message after logging system shutdown", causing CloudWatch log streams to be truncated or never created.Bug 1 (
task-sdk/src/airflow/sdk/log.py):In
configure_logging(),remote.processorswas accessed before the innerconfigure_logging()call randictConfig().dictConfig()calls_clearExistingHandlers()→logging.shutdown()on all existing handlers, killing the just-created watchtower handler (shutting_down=True) before any task log is emitted. Fix: movegetattr(remote, "processors")to afterdictConfigruns, injecting via a secondstructlog.configure()call.Bug 2 (
providers/amazon/.../cloudwatch_task_handler.py):CloudWatchRemoteLogIO.handlerwas acached_property— once killed bylogging.shutdown()the dead instance was never replaced. Theprocessorscached_propertyalso captured_handlerin a closure, pinning the dead instance. Fix: converthandlerto a regular property that recreates whenshutting_down=True; accessself.handlerdynamically in theprocessorsclosure.Bug 3 (
task-sdk/src/airflow/sdk/execution_time/supervisor.py):_configure_logging()returned a plain tuple with no cleanup path. The only code that ever closed the remote handler waslogging.shutdown()at process exit, which fired aftersupervise_task()returned while final task messages were still in flight. Fix: convert_configure_logging()to acontextmanagerthat explicitly flushes and closes the handler afterprocess.wait()drains all task log messages.closes: #66475