-
Notifications
You must be signed in to change notification settings - Fork 17.3k
Fix: CloudWatch/Watchtower logs dropped in Task SDK due to handler lifetime bugs #66633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ad9c3aa
dd641f4
5f89059
2e504ff
142192e
0b60d29
84597f2
2506df0
c2e6283
93a67a7
e4262d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,6 +64,10 @@ def read(self, relative_path: str, ti: RuntimeTI) -> LogResponse: | |
| """Read logs from the given remote log path.""" | ||
| ... | ||
|
|
||
| def close(self) -> None: | ||
| """Flush and close the remote log handler.""" | ||
| ... | ||
|
Comment on lines
+67
to
+69
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need to introduce the |
||
|
|
||
|
|
||
| @runtime_checkable | ||
| class RemoteLogStreamIO(RemoteLogIO, Protocol): | ||
|
|
||
|
korex-f marked this conversation as resolved.
|
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -170,6 +170,7 @@ | |||||||
| from typing_extensions import Self | ||||||||
|
|
||||||||
| from airflow.executors.workloads import BundleInfo | ||||||||
| from airflow.sdk._shared.logging.remote import RemoteLogIO | ||||||||
| from airflow.sdk.bases.secrets_backend import BaseSecretsBackend | ||||||||
| from airflow.sdk.definitions.connection import Connection | ||||||||
| from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO | ||||||||
|
|
@@ -855,11 +856,7 @@ def _serialize_response(self, msg: BaseModel | ErrorResponse, **dump_opts) -> di | |||||||
| return msg.model_dump(**dump_opts) | ||||||||
|
|
||||||||
| def send_msg( | ||||||||
| self, | ||||||||
| msg: BaseModel | None, | ||||||||
| request_id: int, | ||||||||
| error: ErrorResponse | None = None, | ||||||||
| **dump_opts, | ||||||||
| self, msg: BaseModel | None, request_id: int, error: ErrorResponse | None = None, **dump_opts | ||||||||
| ): | ||||||||
| """ | ||||||||
| Send the msg as a length-prefixed response frame. | ||||||||
|
|
@@ -1707,6 +1704,8 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: | |||||||
| resp, dump_opts = handle_get_variable_keys(self.client, msg) | ||||||||
| elif isinstance(msg, GetXCom): | ||||||||
| resp, dump_opts = handle_get_xcom(self.client, msg) | ||||||||
| elif isinstance(msg, GetXComCount): | ||||||||
| resp, dump_opts = handle_get_xcom_count(self.client, msg) | ||||||||
| elif isinstance(msg, GetXComSequenceItem): | ||||||||
| resp, dump_opts = handle_get_xcom_sequence_item(self.client, msg) | ||||||||
| elif isinstance(msg, GetXComSequenceSlice): | ||||||||
|
|
@@ -1774,8 +1773,6 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: | |||||||
| dump_opts = {"exclude_unset": True} | ||||||||
| elif isinstance(msg, GetPrevSuccessfulDagRun): | ||||||||
| resp, dump_opts = handle_get_prev_successful_dag_run(self.client, self.id) | ||||||||
| elif isinstance(msg, GetXComCount): | ||||||||
| resp, dump_opts = handle_get_xcom_count(self.client, msg) | ||||||||
|
Comment on lines
858
to
-1778
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we revert these non-related changes? (the parameters style of |
||||||||
| elif isinstance(msg, TriggerDagRun): | ||||||||
| resp = self.client.dag_runs.trigger( | ||||||||
| msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.run_after, msg.reset_dag_run, msg.note | ||||||||
|
|
@@ -2385,11 +2382,31 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]: | |||||||
| return ensure_secrets_loaded(default_backends=fallback_backends) | ||||||||
|
|
||||||||
|
|
||||||||
| def _configure_logging(log_path: str, client: Client) -> tuple[FilteringBoundLogger, BinaryIO | TextIO]: | ||||||||
| def _close_remote_log_handler(handler: RemoteLogIO) -> None: | ||||||||
| """ | ||||||||
| Close the remote log handler explicitly after all task log messages have been drained from the subprocess pipe. | ||||||||
|
|
||||||||
| Called after process.wait() returns, before process exit triggers | ||||||||
| logging.shutdown(). This ensures the remote handler's internal batch | ||||||||
| queue is flushed before the process tears down. For example, the AWS | ||||||||
| CloudWatch logger will emit: | ||||||||
|
|
||||||||
| WatchtowerWarning: "Received message after logging system shutdown" | ||||||||
|
|
||||||||
| if this is not done before process exit. | ||||||||
| """ | ||||||||
| try: | ||||||||
| handler.close() | ||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be better to have more defensive compatible guard against the
Suggested change
|
||||||||
| except Exception: | ||||||||
| log.warning("Failed to close remote log handler", exc_info=True) | ||||||||
|
|
||||||||
|
|
||||||||
| @contextlib.contextmanager | ||||||||
| def _configure_logging(log_path: str, client: Client) -> Generator[FilteringBoundLogger, None, None]: | ||||||||
| # If we are told to write logs to a file, redirect the task logger to it. Make sure we append to the | ||||||||
| # file though, otherwise when we resume we would lose the logs from the start->deferral segment if it | ||||||||
| # lands on the same node as before. | ||||||||
| from airflow.sdk.log import init_log_file, logging_processors | ||||||||
| from airflow.sdk.log import init_log_file, load_remote_log_handler, logging_processors | ||||||||
|
|
||||||||
| log_file_descriptor: BinaryIO | TextIO | None = None | ||||||||
|
|
||||||||
|
|
@@ -2405,9 +2422,30 @@ def _configure_logging(log_path: str, client: Client) -> tuple[FilteringBoundLog | |||||||
|
|
||||||||
| with _remote_logging_conn(client): | ||||||||
| processors = logging_processors(json_output=json_logs) | ||||||||
|
|
||||||||
| logger = structlog.wrap_logger(underlying_logger, processors=processors, logger_name="task").bind() | ||||||||
|
|
||||||||
| return logger, log_file_descriptor | ||||||||
| try: | ||||||||
| yield logger | ||||||||
| finally: | ||||||||
| # Flush and close the remote handler now — AFTER the supervisor has | ||||||||
| # drained all task log messages from the subprocess pipe (i.e. after | ||||||||
| # process.wait() has returned). | ||||||||
| # | ||||||||
| # Without this, the only thing that ever closes the handler is | ||||||||
| # Python's logging.shutdown() at process exit, which fires after | ||||||||
| # supervise_task() returns. Any messages still queued in the handler | ||||||||
| # at that point are silently dropped. For example, the AWS CloudWatch | ||||||||
| # logger will emit: | ||||||||
|
|
||||||||
| # WatchtowerWarning: "Received message after logging system shutdown" | ||||||||
| remote_handler = load_remote_log_handler() | ||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please keep the TODO instead of dropping it silently.
Suggested change
|
||||||||
| if remote_handler is not None: | ||||||||
|
Comment on lines
+2442
to
+2443
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Suggested change
|
||||||||
| _close_remote_log_handler(remote_handler) | ||||||||
|
|
||||||||
| if log_file_descriptor is not None: | ||||||||
| with contextlib.suppress(Exception): | ||||||||
| log_file_descriptor.close() | ||||||||
|
|
||||||||
|
|
||||||||
| def supervise_task( | ||||||||
|
|
@@ -2495,12 +2533,6 @@ def supervise_task( | |||||||
| with _ensure_client(server, token, client=client, dry_run=dry_run) as client: | ||||||||
| start = time.monotonic() | ||||||||
|
|
||||||||
| # TODO: Use logging providers to handle the chunked upload for us etc. | ||||||||
| logger: FilteringBoundLogger | None = None | ||||||||
| log_file_descriptor: BinaryIO | TextIO | None = None | ||||||||
| if log_path: | ||||||||
| logger, log_file_descriptor = _configure_logging(log_path, client) | ||||||||
|
|
||||||||
| backends = ensure_secrets_backend_loaded() | ||||||||
| log.info( | ||||||||
| "Secrets backends loaded for worker", | ||||||||
|
|
@@ -2511,15 +2543,18 @@ def supervise_task( | |||||||
| reset_secrets_masker() | ||||||||
|
|
||||||||
| try: | ||||||||
| result = coordinator.execute_task( | ||||||||
| what=ti, | ||||||||
| dag_rel_path=dag_rel_path, | ||||||||
| bundle_info=bundle_info, | ||||||||
| client=client, | ||||||||
| logger=logger, | ||||||||
| sentry_integration=sentry_integration, | ||||||||
| subprocess_logs_to_stdout=subprocess_logs_to_stdout, | ||||||||
| ) | ||||||||
| with _configure_logging(log_path, client) if log_path else contextlib.nullcontext(None) as logger: | ||||||||
| result = coordinator.execute_task( | ||||||||
| what=ti, | ||||||||
| dag_rel_path=dag_rel_path, | ||||||||
| bundle_info=bundle_info, | ||||||||
| client=client, | ||||||||
| logger=logger, | ||||||||
| sentry_integration=sentry_integration, | ||||||||
| subprocess_logs_to_stdout=subprocess_logs_to_stdout, | ||||||||
| ) | ||||||||
| # _configure_logging.__exit__ fires here, remote handler is | ||||||||
| # flushed and closed after all task log messages are drained. | ||||||||
| end = time.monotonic() | ||||||||
| log.info( | ||||||||
| "Workload finished", | ||||||||
|
|
@@ -2531,8 +2566,6 @@ def supervise_task( | |||||||
| ) | ||||||||
| return result.exit_code | ||||||||
| finally: | ||||||||
| if log_path and log_file_descriptor: | ||||||||
| log_file_descriptor.close() | ||||||||
| provider = trace.get_tracer_provider() | ||||||||
| if hasattr(provider, "force_flush"): | ||||||||
| provider.force_flush(timeout_millis=5000) # upper bound, not a fixed wait | ||||||||
|
|
||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.