diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 0fccd6ad1b61c..7740b6c72f45d 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -321,8 +321,9 @@ def ti_run( "/{task_instance_id}/state", status_code=status.HTTP_204_NO_CONTENT, responses={ + status.HTTP_200_OK: {"description": "The TI was already in the requested state"}, status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"}, - status.HTTP_409_CONFLICT: {"description": "The TI is already in the requested state"}, + status.HTTP_409_CONFLICT: {"description": "The TI is not in a valid state for this transition"}, HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for the state transition"}, }, ) @@ -389,6 +390,18 @@ def ti_update_state( }, ) + # TIStateUpdate can include terminal and intermediate states. This idempotency check handles + # duplicate updates when the requested state is already persisted (for example SUCCESS -> + # SUCCESS or DEFERRED -> DEFERRED), including duplicates that would not pass the RUNNING + # transition check below. + if ti_patch_payload.state.value == previous_state: + log.info( + "Duplicate state update request received; state already set", + requested_state=ti_patch_payload.state.value, + previous_state=previous_state, + ) + return Response(status_code=status.HTTP_200_OK) + if previous_state != TaskInstanceState.RUNNING: log.warning( "Cannot update Task Instance in invalid state", diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 94b3fed0e071b..62c221dd0e933 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1826,6 +1826,62 @@ def test_ti_update_state_not_running(self, client, session, create_task_instance session.refresh(ti) assert ti.state == State.SUCCESS + def test_ti_update_state_same_state_is_idempotent(self, client, session, create_task_instance): + """Test that setting a TI to its current state is treated as an idempotent no-op.""" + ti = create_task_instance( + task_id="test_ti_update_state_same_state_is_idempotent", + state=State.SUCCESS, + session=session, + start_date=DEFAULT_START_DATE, + ) + ti.end_date = DEFAULT_END_DATE + session.commit() + + response = client.patch( + f"/execution/task-instances/{ti.id}/state", + json={ + "state": "success", + "end_date": timezone.parse("2024-10-31T13:00:00Z").isoformat(), + }, + ) + assert response.status_code == 200 + assert response.content == b"" + + session.refresh(ti) + assert ti.state == State.SUCCESS + assert ti.end_date == DEFAULT_END_DATE + + def test_ti_update_state_terminal_state_mismatch_returns_conflict( + self, client, session, create_task_instance + ): + """A completed TI cannot be updated to a different state.""" + ti = create_task_instance( + task_id="test_ti_update_state_terminal_state_mismatch_returns_conflict", + state=State.SUCCESS, + session=session, + start_date=DEFAULT_START_DATE, + ) + ti.end_date = DEFAULT_END_DATE + session.commit() + + response = client.patch( + f"/execution/task-instances/{ti.id}/state", + json={ + "state": "failed", + "end_date": timezone.parse("2024-10-31T13:00:00Z").isoformat(), + }, + ) + assert response.status_code == 409 + assert response.json()["detail"] == { + "reason": "invalid_state", + "message": "TI was not in the running state so it cannot be updated", + "previous_state": State.SUCCESS, + } + + session.refresh(ti) + assert ti.state == State.SUCCESS + assert ti.end_date == DEFAULT_END_DATE + def test_ti_update_state_to_failed_without_fail_fast(self, client, session, dag_maker): """Test that SerializedDAG is NOT loaded when fail_fast=False (default).""" with dag_maker(dag_id="test_dag_no_fail_fast", serialized=True):