Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import attrs
import watchtower
from botocore.exceptions import ClientError

from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
from airflow.providers.amazon.aws.utils import datetime_to_epoch_utc_ms
Expand Down Expand Up @@ -229,12 +230,35 @@ def get_cloudwatch_logs(
if (end_date := getattr(task_instance, "end_date", None)) is None
else datetime_to_epoch_utc_ms(end_date + timedelta(seconds=30))
)
return self.hook.get_log_events(
events = self.hook.get_log_events(
log_group=self.log_group,
log_stream_name=stream_name,
end_time=end_time,
)

def _iter_events() -> Generator[CloudWatchLogEvent, None, None]:
try:
yield from events
except ClientError as e:
# A missing log stream means no logs were written for this stream
# (e.g. the task logged to stdout instead of remote storage, or has
# not produced any logs). Surface a hint instead of a 500 error, and
# instead of an empty view that looks like remote logging silently failed.
if e.response.get("Error", {}).get("Code") != "ResourceNotFoundException":
raise
notice_ts = end_time or datetime_to_epoch_utc_ms(datetime.now(tz=timezone.utc))
yield {
"timestamp": notice_ts,
"ingestionTime": notice_ts,
"message": (
f"No log stream found in CloudWatch (log_group={self.log_group}, "
f"log_stream={stream_name}). The task may have logged to stdout only, "
f"not produced any logs yet, or remote logging may be misconfigured."
),
}

return _iter_events()

def _parse_log_event_as_dumped_json(self, event: CloudWatchLogEvent) -> str:
event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc).isoformat()
event_msg = event["message"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import pendulum
import pytest
import time_machine
from botocore.exceptions import ClientError
from moto import mock_aws
from pydantic import TypeAdapter
from watchtower import CloudWatchLogHandler
Expand Down Expand Up @@ -423,6 +424,33 @@ def test_get_cloudwatch_logs(self, mock_get_log_events, end_date, expected_end_t
end_time=expected_end_time,
)

@mock.patch.object(AwsLogsHook, "get_log_events")
def test_get_cloudwatch_logs_missing_stream_yields_hint(self, mock_get_log_events):
# A missing log stream (no logs written for this try -- e.g. the task logged
# to stdout instead of remote storage) must not raise (so the log reader does
# not surface a 500) and must yield a hint instead of nothing, so the reader
# does not show a blank view that looks like remote logging silently failed.
def _raise_not_found(*args, **kwargs):
raise ClientError({"Error": {"Code": "ResourceNotFoundException"}}, "GetLogEvents")
yield # pragma: no cover -- makes this a generator function

mock_get_log_events.side_effect = _raise_not_found
events = list(self.cloudwatch_task_handler.io.get_cloudwatch_logs(self.remote_log_stream, self.ti))
assert len(events) == 1
assert "No log stream found in CloudWatch" in events[0]["message"]
assert self.remote_log_stream in events[0]["message"]

@mock.patch.object(AwsLogsHook, "get_log_events")
def test_get_cloudwatch_logs_other_client_error_propagates(self, mock_get_log_events):
# Errors other than a missing stream must still surface.
def _raise_access_denied(*args, **kwargs):
raise ClientError({"Error": {"Code": "AccessDeniedException"}}, "GetLogEvents")
yield # pragma: no cover -- makes this a generator function

mock_get_log_events.side_effect = _raise_access_denied
with pytest.raises(ClientError):
list(self.cloudwatch_task_handler.io.get_cloudwatch_logs(self.remote_log_stream, self.ti))

@pytest.mark.parametrize(
("conf_json_serialize", "expected_serialized_output"),
[
Expand Down
Loading