From 6356f1043cc96b41d12b94a2fc1ffc0f7f39a7fc Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Sat, 20 Jun 2026 15:19:37 +0900 Subject: [PATCH] Fix Cloudwatch remote logging crash API server if ResourceNotFoundException --- .../amazon/aws/log/cloudwatch_task_handler.py | 26 ++++++++++++++++- .../aws/log/test_cloudwatch_task_handler.py | 28 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index 5bc286cb9631d..a476491b31786 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -31,6 +31,7 @@ import attrs import watchtower +from botocore.exceptions import ClientError from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook from airflow.providers.amazon.aws.utils import datetime_to_epoch_utc_ms @@ -229,12 +230,35 @@ def get_cloudwatch_logs( if (end_date := getattr(task_instance, "end_date", None)) is None else datetime_to_epoch_utc_ms(end_date + timedelta(seconds=30)) ) - return self.hook.get_log_events( + events = self.hook.get_log_events( log_group=self.log_group, log_stream_name=stream_name, end_time=end_time, ) + def _iter_events() -> Generator[CloudWatchLogEvent, None, None]: + try: + yield from events + except ClientError as e: + # A missing log stream means no logs were written for this stream + # (e.g. the task logged to stdout instead of remote storage, or has + # not produced any logs). Surface a hint instead of a 500 error, and + # instead of an empty view that looks like remote logging silently failed. + if e.response.get("Error", {}).get("Code") != "ResourceNotFoundException": + raise + notice_ts = end_time or datetime_to_epoch_utc_ms(datetime.now(tz=timezone.utc)) + yield { + "timestamp": notice_ts, + "ingestionTime": notice_ts, + "message": ( + f"No log stream found in CloudWatch (log_group={self.log_group}, " + f"log_stream={stream_name}). The task may have logged to stdout only, " + f"not produced any logs yet, or remote logging may be misconfigured." + ), + } + + return _iter_events() + def _parse_log_event_as_dumped_json(self, event: CloudWatchLogEvent) -> str: event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc).isoformat() event_msg = event["message"] diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index c13515e044039..7a62e4caaf501 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -30,6 +30,7 @@ import pendulum import pytest import time_machine +from botocore.exceptions import ClientError from moto import mock_aws from pydantic import TypeAdapter from watchtower import CloudWatchLogHandler @@ -423,6 +424,33 @@ def test_get_cloudwatch_logs(self, mock_get_log_events, end_date, expected_end_t end_time=expected_end_time, ) + @mock.patch.object(AwsLogsHook, "get_log_events") + def test_get_cloudwatch_logs_missing_stream_yields_hint(self, mock_get_log_events): + # A missing log stream (no logs written for this try -- e.g. the task logged + # to stdout instead of remote storage) must not raise (so the log reader does + # not surface a 500) and must yield a hint instead of nothing, so the reader + # does not show a blank view that looks like remote logging silently failed. + def _raise_not_found(*args, **kwargs): + raise ClientError({"Error": {"Code": "ResourceNotFoundException"}}, "GetLogEvents") + yield # pragma: no cover -- makes this a generator function + + mock_get_log_events.side_effect = _raise_not_found + events = list(self.cloudwatch_task_handler.io.get_cloudwatch_logs(self.remote_log_stream, self.ti)) + assert len(events) == 1 + assert "No log stream found in CloudWatch" in events[0]["message"] + assert self.remote_log_stream in events[0]["message"] + + @mock.patch.object(AwsLogsHook, "get_log_events") + def test_get_cloudwatch_logs_other_client_error_propagates(self, mock_get_log_events): + # Errors other than a missing stream must still surface. + def _raise_access_denied(*args, **kwargs): + raise ClientError({"Error": {"Code": "AccessDeniedException"}}, "GetLogEvents") + yield # pragma: no cover -- makes this a generator function + + mock_get_log_events.side_effect = _raise_access_denied + with pytest.raises(ClientError): + list(self.cloudwatch_task_handler.io.get_cloudwatch_logs(self.remote_log_stream, self.ti)) + @pytest.mark.parametrize( ("conf_json_serialize", "expected_serialized_output"), [