Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,9 @@ def ti_run(
"/{task_instance_id}/state",
status_code=status.HTTP_204_NO_CONTENT,
responses={
status.HTTP_200_OK: {"description": "The TI was already in the requested state"},
Comment thread
sidshas03 marked this conversation as resolved.
Comment thread
sidshas03 marked this conversation as resolved.
status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
status.HTTP_409_CONFLICT: {"description": "The TI is already in the requested state"},
status.HTTP_409_CONFLICT: {"description": "The TI is not in a valid state for this transition"},
HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for the state transition"},
},
)
Expand Down Expand Up @@ -389,6 +390,18 @@ def ti_update_state(
},
)

# TIStateUpdate can include terminal and intermediate states. This idempotency check handles
# duplicate updates when the requested state is already persisted (for example SUCCESS ->
# SUCCESS or DEFERRED -> DEFERRED), including duplicates that would not pass the RUNNING
# transition check below.
if ti_patch_payload.state.value == previous_state:
log.info(
"Duplicate state update request received; state already set",
requested_state=ti_patch_payload.state.value,
previous_state=previous_state,
)
return Response(status_code=status.HTTP_200_OK)

if previous_state != TaskInstanceState.RUNNING:
log.warning(
"Cannot update Task Instance in invalid state",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,62 @@ def test_ti_update_state_not_running(self, client, session, create_task_instance
session.refresh(ti)
assert ti.state == State.SUCCESS

Comment thread
sidshas03 marked this conversation as resolved.
def test_ti_update_state_same_state_is_idempotent(self, client, session, create_task_instance):
"""Test that setting a TI to its current state is treated as an idempotent no-op."""
ti = create_task_instance(
task_id="test_ti_update_state_same_state_is_idempotent",
state=State.SUCCESS,
session=session,
start_date=DEFAULT_START_DATE,
)
ti.end_date = DEFAULT_END_DATE
session.commit()

response = client.patch(
f"/execution/task-instances/{ti.id}/state",
json={
"state": "success",
"end_date": timezone.parse("2024-10-31T13:00:00Z").isoformat(),
},
)
assert response.status_code == 200
assert response.content == b""

session.refresh(ti)
assert ti.state == State.SUCCESS
assert ti.end_date == DEFAULT_END_DATE

def test_ti_update_state_terminal_state_mismatch_returns_conflict(
self, client, session, create_task_instance
):
"""A completed TI cannot be updated to a different state."""
ti = create_task_instance(
task_id="test_ti_update_state_terminal_state_mismatch_returns_conflict",
state=State.SUCCESS,
session=session,
start_date=DEFAULT_START_DATE,
)
ti.end_date = DEFAULT_END_DATE
session.commit()

response = client.patch(
f"/execution/task-instances/{ti.id}/state",
json={
"state": "failed",
"end_date": timezone.parse("2024-10-31T13:00:00Z").isoformat(),
},
)
assert response.status_code == 409
assert response.json()["detail"] == {
"reason": "invalid_state",
"message": "TI was not in the running state so it cannot be updated",
"previous_state": State.SUCCESS,
}

session.refresh(ti)
assert ti.state == State.SUCCESS
assert ti.end_date == DEFAULT_END_DATE

def test_ti_update_state_to_failed_without_fail_fast(self, client, session, dag_maker):
"""Test that SerializedDAG is NOT loaded when fail_fast=False (default)."""
with dag_maker(dag_id="test_dag_no_fail_fast", serialized=True):
Expand Down
Loading