From d3aa55c9038540f5c9fcf9ece845d52f23065541 Mon Sep 17 00:00:00 2001 From: Sameer Mesiah Date: Tue, 5 May 2026 22:06:37 +0100 Subject: [PATCH] Refactor deferrable execution in DbtCloudRunJobOperator to align with PR #64051 patterns. Simplify trigger polling/control flow and propagate execution_timeout via defer. Add tests for best-effort cancellation semantics in execute_complete and on_kill. --- .../providers/dbt/cloud/operators/dbt.py | 38 ++- .../providers/dbt/cloud/triggers/dbt.py | 72 +++-- .../unit/dbt/cloud/operators/test_dbt.py | 133 ++++++++- .../tests/unit/dbt/cloud/triggers/test_dbt.py | 282 +++++------------- scripts/ci/prek/known_airflow_exceptions.txt | 1 - 5 files changed, 273 insertions(+), 253 deletions(-) diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py index 2fff2728abe6f..eec9d7fcc3a9c 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py @@ -23,7 +23,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any -from airflow.providers.common.compat.sdk import AirflowException, BaseOperator, BaseOperatorLink, XCom, conf +from airflow.providers.common.compat.sdk import BaseOperator, BaseOperatorLink, XCom, conf from airflow.providers.dbt.cloud.hooks.dbt import ( DbtCloudHook, DbtCloudJobRunException, @@ -250,16 +250,16 @@ def execute(self, context: Context): # execution_timeout is a hard task-level limit (cancels the job), # while timeout only limits how long we wait for the job to finish. # If both are set, the earliest deadline wins. - end_time = time.time() + self.timeout + end_time = time.monotonic() + self.timeout execution_deadline = None - if self.execution_timeout: - execution_deadline = time.time() + self.execution_timeout.total_seconds() + if self.execution_timeout is not None: + execution_deadline = time.monotonic() + self.execution_timeout.total_seconds() job_run_info = JobRunInfo(account_id=self.account_id, run_id=self.run_id) job_run_status = self.hook.get_job_run_status(**job_run_info) if not DbtCloudJobRunStatus.is_terminal(job_run_status): self.defer( - timeout=None, + timeout=self.execution_timeout, trigger=DbtCloudRunJobTrigger( conn_id=self.dbt_cloud_conn_id, run_id=self.run_id, @@ -293,8 +293,22 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> int: # Enforce execution_timeout semantics in deferrable mode by cancelling the job. if event["status"] == "timeout": - self.hook.cancel_job_run(account_id=self.account_id, run_id=self.run_id) - raise AirflowException(f"Job run {self.run_id} has timed out.") + if self.run_id is not None: + self.log.info("Cancelling DBT job run %s due to execution timeout", self.run_id) + + # Attempt best-effort job run cancellation. + try: + self.hook.cancel_job_run(account_id=self.account_id, run_id=self.run_id) + except Exception: + self.log.warning( + "Failed to cancel DBT job run %s after timeout", + self.run_id, + exc_info=True, + ) + else: + self.log.warning("No run_id found; skipping cancellation") + + raise DbtCloudJobRunException(f"Job run {self.run_id} has timed out.") self.log.info(event["message"]) return int(event["run_id"]) @@ -303,7 +317,15 @@ def on_kill(self) -> None: if not self.run_id: return - self.hook.cancel_job_run(account_id=self.account_id, run_id=self.run_id) + # Attempt best-effort job run cancellation. + try: + self.hook.cancel_job_run(account_id=self.account_id, run_id=self.run_id) + except Exception: + self.log.warning( + "Failed to cancel DBT job run %s during on_kill", + self.run_id, + exc_info=True, + ) # Attempt best-effort confirmation of cancellation. try: diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py index a0bb91861a8ea..8efd918780bc7 100644 --- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py +++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py @@ -79,9 +79,43 @@ async def run(self) -> AsyncIterator[TriggerEvent]: """Make async connection to Dbt, polls for the pipeline run status.""" hook = DbtCloudHook(self.conn_id, **self.hook_params) try: - while await self.is_still_running(hook): + while True: + now = time.monotonic() + + job_run_status = await hook.get_job_status(self.run_id, self.account_id) + + if job_run_status == DbtCloudJobRunStatus.SUCCESS.value: + yield TriggerEvent( + { + "status": "success", + "message": f"Job run {self.run_id} has completed successfully.", + "run_id": self.run_id, + } + ) + return + + elif job_run_status == DbtCloudJobRunStatus.CANCELLED.value: + yield TriggerEvent( + { + "status": "cancelled", + "message": f"Job run {self.run_id} has been cancelled.", + "run_id": self.run_id, + } + ) + return + + elif job_run_status == DbtCloudJobRunStatus.ERROR.value: + yield TriggerEvent( + { + "status": "error", + "message": f"Job run {self.run_id} has failed.", + "run_id": self.run_id, + } + ) + return + if self.execution_deadline is not None: - if self.execution_deadline < time.time(): + if self.execution_deadline <= now: yield TriggerEvent( { "status": "timeout", @@ -91,11 +125,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: ) return - if self.end_time < time.time(): - # Perform a final status check before declaring timeout, in case the - # job completed between the last poll and the timeout expiry. - if not await self.is_still_running(hook): - break + if self.end_time <= now: yield TriggerEvent( { "status": "error", @@ -105,34 +135,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]: } ) return + await asyncio.sleep(self.poll_interval) - job_run_status = await hook.get_job_status(self.run_id, self.account_id) - if job_run_status == DbtCloudJobRunStatus.SUCCESS.value: - yield TriggerEvent( - { - "status": "success", - "message": f"Job run {self.run_id} has completed successfully.", - "run_id": self.run_id, - } - ) - elif job_run_status == DbtCloudJobRunStatus.CANCELLED.value: - yield TriggerEvent( - { - "status": "cancelled", - "message": f"Job run {self.run_id} has been cancelled.", - "run_id": self.run_id, - } - ) - else: - yield TriggerEvent( - { - "status": "error", - "message": f"Job run {self.run_id} has failed.", - "run_id": self.run_id, - } - ) + except Exception as e: yield TriggerEvent({"status": "error", "message": str(e), "run_id": self.run_id}) + return async def is_still_running(self, hook: DbtCloudHook) -> bool: """Check whether the submitted job is running.""" diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py index 784cb62695e0b..55aa3f234029c 100644 --- a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py +++ b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py @@ -23,7 +23,7 @@ import pytest from airflow.models import DAG, Connection -from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred, timezone +from airflow.providers.common.compat.sdk import TaskDeferred, timezone from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus from airflow.providers.dbt.cloud.operators.dbt import ( DbtCloudGetJobRunArtifactOperator, @@ -198,7 +198,7 @@ def test_execute_failed_before_getting_deferred( def test_dbt_run_job_op_async(self, mock_trigger_job_run, mock_dbt_hook, mock_job_run_status, status): """ Asserts that a task is deferred and an DbtCloudRunJobTrigger will be fired - when the DbtCloudRunJobOperator has deferrable param set to True + when the DbtCloudRunJobOperator has deferrable param set to True. """ mock_job_run_status.return_value = status dbt_op = DbtCloudRunJobOperator( @@ -214,6 +214,40 @@ def test_dbt_run_job_op_async(self, mock_trigger_job_run, mock_dbt_hook, mock_jo dbt_op.execute(MagicMock()) assert isinstance(exc.value.trigger, DbtCloudRunJobTrigger), "Trigger is not a DbtCloudRunJobTrigger" + def test_execute_complete_timeout_without_run_id(self): + """ + Verify that when a deferrable dbt job emits a timeout event with no run_id, + the operator cancels the job and fails. + """ + + operator = DbtCloudRunJobOperator( + task_id=TASK_ID, + dbt_cloud_conn_id=ACCOUNT_ID_CONN, + job_id=JOB_ID, + dag=self.dag, + deferrable=True, + ) + + # Pretend the job was already triggered. + operator.run_id = None + + # Mock the hook so we can assert cancellation. + operator.hook = MagicMock() + + timeout_event = { + "status": "timeout", + "run_id": None, + "message": "Job run timed out.", + } + + with pytest.raises(DbtCloudJobRunException): + operator.execute_complete( + context=self.mock_context, + event=timeout_event, + ) + + operator.hook.cancel_job_run.assert_not_called() + def test_execute_complete_timeout_cancels_job(self): """ Verify that when a deferrable dbt job emits a timeout event, @@ -239,7 +273,45 @@ def test_execute_complete_timeout_cancels_job(self): "message": "Job run timed out.", } - with pytest.raises(AirflowException, match="has timed out"): + with pytest.raises(DbtCloudJobRunException, match="has timed out"): + operator.execute_complete( + context=self.mock_context, + event=timeout_event, + ) + + operator.hook.cancel_job_run.assert_called_once_with( + account_id=operator.account_id, + run_id=RUN_ID, + ) + + def test_execute_complete_timeout_cancel_job_does_not_mask_original_error(self): + """ + Verify that when a deferrable dbt job is cancelled after a timeout event is received, + the original error is not masked. + """ + operator = DbtCloudRunJobOperator( + task_id=TASK_ID, + dbt_cloud_conn_id=ACCOUNT_ID_CONN, + job_id=JOB_ID, + dag=self.dag, + deferrable=True, + ) + + # Pretend the job was already triggered. + operator.run_id = RUN_ID + + # Mock the hook so we can assert cancellation. + operator.hook = MagicMock() + + operator.hook.cancel_job_run.side_effect = Exception("Cancellation failed") + + timeout_event = { + "status": "timeout", + "run_id": RUN_ID, + "message": "Job run timed out.", + } + + with pytest.raises(DbtCloudJobRunException, match="has timed out"): operator.execute_complete( context=self.mock_context, event=timeout_event, @@ -689,6 +761,61 @@ def test_custom_trigger_reason(self, mock_run_job, conn_id, account_id): additional_run_config=self.config["additional_run_config"], ) + def test_on_kill_cancels_job_and_confirms_success(self): + operator = DbtCloudRunJobOperator( + task_id=TASK_ID, + dbt_cloud_conn_id=ACCOUNT_ID_CONN, + job_id=JOB_ID, + dag=self.dag, + ) + + operator.run_id = RUN_ID + operator.hook = MagicMock() + + # Simulate successful cancellation confirmation. + operator.hook.wait_for_job_run_status.return_value = True + + operator.on_kill() + + operator.hook.cancel_job_run.assert_called_once_with( + account_id=operator.account_id, + run_id=RUN_ID, + ) + + operator.hook.wait_for_job_run_status.assert_called_once_with( + run_id=RUN_ID, + account_id=operator.account_id, + expected_statuses=DbtCloudJobRunStatus.CANCELLED.value, + check_interval=operator.check_interval, + timeout=operator.timeout, + ) + + def test_on_kill_best_effort_cancellation_does_not_raise(self): + operator = DbtCloudRunJobOperator( + task_id=TASK_ID, + dbt_cloud_conn_id=ACCOUNT_ID_CONN, + job_id=JOB_ID, + dag=self.dag, + ) + + operator.run_id = RUN_ID + operator.hook = MagicMock() + + # Simulate cancellation failure. + operator.hook.cancel_job_run.side_effect = Exception("Cancellation failed") + + # Simulate confirmation also failing (normal path). + operator.hook.wait_for_job_run_status.side_effect = DbtCloudJobRunException("Still running") + + operator.on_kill() + + operator.hook.cancel_job_run.assert_called_once_with( + account_id=operator.account_id, + run_id=RUN_ID, + ) + + operator.hook.wait_for_job_run_status.assert_called_once() + @pytest.mark.parametrize( ("conn_id", "account_id"), [(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)], diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py index 4d1d971f60d0c..12c94d19b9e38 100644 --- a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py +++ b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py @@ -34,17 +34,23 @@ class TestDbtCloudRunJobTrigger: RUN_ID = 1234 CONN_ID = "dbt_cloud_default" ACCOUNT_ID = 12340 - END_TIME = time.time() + 60 * 60 * 24 * 7 - EXECUTION_DEADLINE = time.time() + 60 * 60 * 24 * 7 POLL_INTERVAL = 3.0 - def test_serialization(self): + @pytest.fixture + def end_time(self): + return time.monotonic() + 60 * 60 * 24 * 7 + + @pytest.fixture + def execution_deadline(self): + return time.monotonic() + 60 * 60 * 24 * 7 + + def test_serialization(self, end_time, execution_deadline): """Assert DbtCloudRunJobTrigger correctly serializes its arguments and classpath.""" trigger = DbtCloudRunJobTrigger( conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, - end_time=self.END_TIME, - execution_deadline=self.EXECUTION_DEADLINE, + end_time=end_time, + execution_deadline=execution_deadline, run_id=self.RUN_ID, account_id=self.ACCOUNT_ID, hook_params={"retry_delay": 10}, @@ -55,28 +61,29 @@ def test_serialization(self): "run_id": self.RUN_ID, "account_id": self.ACCOUNT_ID, "conn_id": self.CONN_ID, - "end_time": self.END_TIME, - "execution_deadline": self.EXECUTION_DEADLINE, + "end_time": end_time, + "execution_deadline": execution_deadline, "poll_interval": self.POLL_INTERVAL, "hook_params": {"retry_delay": 10}, } @pytest.mark.asyncio - @mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running") - async def test_dbt_run_job_trigger(self, mocked_is_still_running): + @mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status") + async def test_dbt_run_job_trigger(self, mock_get_job_status, end_time): """Test DbtCloudRunJobTrigger is triggered with mocked details and run successfully.""" - mocked_is_still_running.return_value = True + + mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value trigger = DbtCloudRunJobTrigger( conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, - end_time=self.END_TIME, + end_time=end_time, run_id=self.RUN_ID, account_id=self.ACCOUNT_ID, ) task = asyncio.create_task(trigger.run().__anext__()) await asyncio.sleep(0.5) - # TriggerEvent was not returned + # TriggerEvent was not returned. assert task.done() is False asyncio.get_event_loop().stop() @@ -85,20 +92,21 @@ async def test_dbt_run_job_trigger(self, mocked_is_still_running): ("mock_value", "mock_status", "mock_message"), [ (DbtCloudJobRunStatus.SUCCESS.value, "success", "Job run 1234 has completed successfully."), + (DbtCloudJobRunStatus.CANCELLED.value, "cancelled", "Job run 1234 has been cancelled."), + (DbtCloudJobRunStatus.ERROR.value, "error", "Job run 1234 has failed."), ], ) - @mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running") @mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status") - async def test_dbt_job_run_for_terminal_status_success( - self, mock_get_job_status, mocked_is_still_running, mock_value, mock_status, mock_message + async def test_dbt_job_run_for_terminal_status( + self, mock_get_job_status, mock_value, mock_status, mock_message, end_time ): - """Assert that run trigger success message in case of job success""" - mocked_is_still_running.return_value = False + """Assert run trigger messages when job reaches terminal status.""" + mock_get_job_status.return_value = mock_value trigger = DbtCloudRunJobTrigger( conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, - end_time=self.END_TIME, + end_time=end_time, run_id=self.RUN_ID, account_id=self.ACCOUNT_ID, ) @@ -107,254 +115,110 @@ async def test_dbt_job_run_for_terminal_status_success( "message": mock_message, "run_id": self.RUN_ID, } - task = asyncio.create_task(trigger.run().__anext__()) - await asyncio.sleep(0.5) - assert TriggerEvent(expected_result) == task.result() - asyncio.get_event_loop().stop() + + events = [e async for e in trigger.run()] + assert len(events) == 1 + assert TriggerEvent(expected_result) == events[0] @pytest.mark.asyncio - @pytest.mark.parametrize( - ("mock_value", "mock_status", "mock_message"), - [ - (DbtCloudJobRunStatus.CANCELLED.value, "cancelled", "Job run 1234 has been cancelled."), - ], - ) - @mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running") @mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status") - async def test_dbt_job_run_for_terminal_status_cancelled( - self, mock_get_job_status, mocked_is_still_running, mock_value, mock_status, mock_message - ): - """Assert that run trigger success message in case of job success""" - mocked_is_still_running.return_value = False - mock_get_job_status.return_value = mock_value + async def test_dbt_job_run_exception(self, mock_get_job_status, end_time): + """Assert that run catch exception if dbt cloud job API throw exception.""" + + mock_get_job_status.side_effect = Exception("Test exception") trigger = DbtCloudRunJobTrigger( conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, - end_time=self.END_TIME, + end_time=end_time, run_id=self.RUN_ID, account_id=self.ACCOUNT_ID, ) + expected_result = { - "status": mock_status, - "message": mock_message, + "status": "error", + "message": "Test exception", "run_id": self.RUN_ID, } - task = asyncio.create_task(trigger.run().__anext__()) - await asyncio.sleep(0.5) - assert TriggerEvent(expected_result) == task.result() - asyncio.get_event_loop().stop() + + events = [e async for e in trigger.run()] + assert len(events) == 1 + assert TriggerEvent(expected_result) == events[0] @pytest.mark.asyncio - @pytest.mark.parametrize( - ("mock_value", "mock_status", "mock_message"), - [ - (DbtCloudJobRunStatus.ERROR.value, "error", "Job run 1234 has failed."), - ], - ) - @mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running") @mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status") - async def test_dbt_job_run_for_terminal_status_error( - self, mock_get_job_status, mocked_is_still_running, mock_value, mock_status, mock_message - ): - """Assert that run trigger success message in case of job success""" - mocked_is_still_running.return_value = False - mock_get_job_status.return_value = mock_value + async def test_dbt_job_run_timeout(self, mock_get_job_status): + """Assert that run timeout after end_time elapsed.""" + + mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value + + end_time = time.monotonic() - 1 trigger = DbtCloudRunJobTrigger( conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, - end_time=self.END_TIME, + end_time=end_time, run_id=self.RUN_ID, account_id=self.ACCOUNT_ID, ) + expected_result = { - "status": mock_status, - "message": mock_message, + "status": "error", + "message": f"Job run {self.RUN_ID} has not reached a terminal status " + f"within the configured timeout.", "run_id": self.RUN_ID, } - task = asyncio.create_task(trigger.run().__anext__()) - await asyncio.sleep(0.5) - assert TriggerEvent(expected_result) == task.result() - asyncio.get_event_loop().stop() - @pytest.mark.asyncio - @mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running") - @mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status") - async def test_dbt_job_run_exception(self, mock_get_job_status, mocked_is_still_running): - """Assert that run catch exception if dbt cloud job API throw exception""" - mocked_is_still_running.return_value = False - mock_get_job_status.side_effect = Exception("Test exception") - trigger = DbtCloudRunJobTrigger( - conn_id=self.CONN_ID, - poll_interval=self.POLL_INTERVAL, - end_time=self.END_TIME, - run_id=self.RUN_ID, - account_id=self.ACCOUNT_ID, - ) - task = [i async for i in trigger.run()] - response = TriggerEvent( - { - "status": "error", - "message": "Test exception", - "run_id": self.RUN_ID, - } - ) - assert len(task) == 1 - assert response in task + events = [e async for e in trigger.run()] + assert len(events) == 1 + assert TriggerEvent(expected_result) == events[0] @pytest.mark.asyncio - @mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running") @mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status") - async def test_dbt_job_run_timeout(self, mock_get_job_status, mocked_is_still_running): - """Assert that run timeout after end_time elapsed""" - mocked_is_still_running.return_value = True - mock_get_job_status.side_effect = Exception("Test exception") - end_time = time.time() - trigger = DbtCloudRunJobTrigger( - conn_id=self.CONN_ID, - poll_interval=self.POLL_INTERVAL, - end_time=end_time, - run_id=self.RUN_ID, - account_id=self.ACCOUNT_ID, - ) - generator = trigger.run() - actual = await generator.asend(None) - expected = TriggerEvent( - { - "status": "error", - "message": f"Job run {self.RUN_ID} has not reached a terminal status " - f"within the configured timeout.", - "run_id": self.RUN_ID, - } - ) - assert expected == actual - - @pytest.mark.asyncio - @mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status") - async def test_dbt_job_run_timeout_with_final_status_check(self, mock_get_job_status): - """Assert that a final status check prevents false timeout when job completes near timeout.""" - mock_get_job_status.return_value = DbtCloudJobRunStatus.SUCCESS.value - # Simulate: first is_still_running call returns True (job running), - # then the timeout check fires, but the final is_still_running call returns False - # (job just completed). The trigger should yield success, not a timeout error. - end_time = time.time() # Already expired - trigger = DbtCloudRunJobTrigger( - conn_id=self.CONN_ID, - poll_interval=self.POLL_INTERVAL, - end_time=end_time, - run_id=self.RUN_ID, - account_id=self.ACCOUNT_ID, - ) - with mock.patch.object(trigger, "is_still_running", new_callable=AsyncMock) as mock_running: - # First call: still running; second call (final check): no longer running - mock_running.side_effect = [True, False] - generator = trigger.run() - actual = await generator.asend(None) - expected = TriggerEvent( - { - "status": "success", - "message": f"Job run {self.RUN_ID} has completed successfully.", - "run_id": self.RUN_ID, - } - ) - assert expected == actual + async def test_dbt_job_run_execution_timeout(self, mock_get_job_status, end_time): + """Assert that run emits timeout event after execution_deadline elapsed.""" - @pytest.mark.asyncio - @mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running") - async def test_dbt_job_run_execution_timeout(self, mocked_is_still_running): - """Assert that run emits timeout event after execution_deadline elapsed""" - mocked_is_still_running.return_value = True + mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value - execution_deadline = time.time() + execution_deadline = time.monotonic() - 1 trigger = DbtCloudRunJobTrigger( conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, - end_time=time.time() + 60, + end_time=end_time, execution_deadline=execution_deadline, run_id=self.RUN_ID, account_id=self.ACCOUNT_ID, ) - generator = trigger.run() - actual = await generator.asend(None) - - expected = TriggerEvent( - { - "status": "timeout", - "message": f"Job run {self.RUN_ID} has timed out.", - "run_id": self.RUN_ID, - } - ) + expected_result = { + "status": "timeout", + "message": f"Job run {self.RUN_ID} has timed out.", + "run_id": self.RUN_ID, + } - assert expected == actual + events = [e async for e in trigger.run()] + assert len(events) == 1 + assert TriggerEvent(expected_result) == events[0] @pytest.mark.asyncio @pytest.mark.parametrize( ("mock_response", "expected_status"), [ (DbtCloudJobRunStatus.SUCCESS.value, False), - ], - ) - @mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status") - async def test_dbt_job_run_is_still_running_success( - self, mock_get_job_status, mock_response, expected_status - ): - """Test is_still_running with mocked response job status and assert - the return response with expected value""" - hook = AsyncMock(DbtCloudHook) - hook.get_job_status.return_value = mock_response - trigger = DbtCloudRunJobTrigger( - conn_id=self.CONN_ID, - poll_interval=self.POLL_INTERVAL, - end_time=self.END_TIME, - run_id=self.RUN_ID, - account_id=self.ACCOUNT_ID, - ) - response = await trigger.is_still_running(hook) - assert response == expected_status - - @pytest.mark.asyncio - @pytest.mark.parametrize( - ("mock_response", "expected_status"), - [ (DbtCloudJobRunStatus.RUNNING.value, True), ], ) @mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status") - async def test_dbt_job_run_is_still_running(self, mock_get_job_status, mock_response, expected_status): - """Test is_still_running with mocked response job status and assert - the return response with expected value""" - hook = AsyncMock(DbtCloudHook) - hook.get_job_status.return_value = mock_response - trigger = DbtCloudRunJobTrigger( - conn_id=self.CONN_ID, - poll_interval=self.POLL_INTERVAL, - end_time=self.END_TIME, - run_id=self.RUN_ID, - account_id=self.ACCOUNT_ID, - ) - response = await trigger.is_still_running(hook) - assert response == expected_status - - @pytest.mark.asyncio - @pytest.mark.parametrize( - ("mock_response", "expected_status"), - [ - (DbtCloudJobRunStatus.QUEUED.value, True), - ], - ) - @mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status") - async def test_dbt_job_run_is_still_running_queued( - self, mock_get_job_status, mock_response, expected_status + async def test_dbt_job_run_is_still_running( + self, mock_get_job_status, mock_response, expected_status, end_time ): """Test is_still_running with mocked response job status and assert - the return response with expected value""" + the return response with expected value.""" hook = AsyncMock(DbtCloudHook) hook.get_job_status.return_value = mock_response trigger = DbtCloudRunJobTrigger( conn_id=self.CONN_ID, poll_interval=self.POLL_INTERVAL, - end_time=self.END_TIME, + end_time=end_time, run_id=self.RUN_ID, account_id=self.ACCOUNT_ID, ) diff --git a/scripts/ci/prek/known_airflow_exceptions.txt b/scripts/ci/prek/known_airflow_exceptions.txt index 6175e6ad00c00..a1b72c804c195 100644 --- a/scripts/ci/prek/known_airflow_exceptions.txt +++ b/scripts/ci/prek/known_airflow_exceptions.txt @@ -190,7 +190,6 @@ providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py::1 providers/datadog/src/airflow/providers/datadog/hooks/datadog.py::2 providers/datadog/src/airflow/providers/datadog/sensors/datadog.py::1 providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py::3 -providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py::1 providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py::1 providers/dingding/src/airflow/providers/dingding/hooks/dingding.py::2 providers/discord/src/airflow/providers/discord/operators/discord_webhook.py::1