From 972ff6de4fe7597528c881e75f7d6d933fcb2c4c Mon Sep 17 00:00:00 2001 From: "Ryuk.Kim" Date: Fri, 1 May 2026 22:57:27 +0900 Subject: [PATCH 1/5] Add tasks state command to airflowctl Introduce `airflowctl tasks state` to retrieve the state of a task instance by calling GET /api/v2/dags/{dag_id}/dagRuns/{run_id}/ taskInstances/{task_id}. Also add TaskInstancesOperations, help text, and an integration test for the auto-generated taskinstances get command. # Conflicts: # airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py --- .../test_airflowctl_commands.py | 2 + airflow-ctl/docs/images/command_hashes.txt | 2 +- airflow-ctl/docs/images/output_main.svg | 142 ++++++++++-------- airflow-ctl/src/airflowctl/api/client.py | 7 + airflow-ctl/src/airflowctl/api/operations.py | 13 ++ airflow-ctl/src/airflowctl/ctl/cli_config.py | 42 ++++++ .../airflowctl/ctl/commands/task_command.py | 32 ++++ .../src/airflowctl/ctl/help_texts.yaml | 3 + 8 files changed, 177 insertions(+), 66 deletions(-) create mode 100644 airflow-ctl/src/airflowctl/ctl/commands/task_command.py diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index 15595269d26c7..e92cb450e5234 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -93,6 +93,8 @@ def date_param(): "dags update --dag-id=example_bash_operator --no-is-paused", # Dag Run commands "dagrun list --dag-id example_bash_operator --state success --limit=1", + # Task instance commands - need a Dag run with completed tasks + 'taskinstances get --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0', # XCom commands - need a Dag run with completed tasks 'xcom add --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key} --value=\'{{"test": "value"}}\'', 'xcom get --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0 --key={xcom_key}', diff --git a/airflow-ctl/docs/images/command_hashes.txt b/airflow-ctl/docs/images/command_hashes.txt index 8e590f3b820cd..3c8b7523e7e08 100644 --- a/airflow-ctl/docs/images/command_hashes.txt +++ b/airflow-ctl/docs/images/command_hashes.txt @@ -1,4 +1,4 @@ -main:27a22c00dcf32e7a1a4f06672dc8e3c8 +main:0460d9c03248bee26207b20b05aa36b9 assets:70619a2d92bda80930cde2aefcd8e1cd auth:d79e9c7d00c432bdbcbc2a86e2e32053 backfill:74c8737b0a62a86ed3605fa9e6165874 diff --git a/airflow-ctl/docs/images/output_main.svg b/airflow-ctl/docs/images/output_main.svg index f586877bce8eb..9c8cec5269b68 100644 --- a/airflow-ctl/docs/images/output_main.svg +++ b/airflow-ctl/docs/images/output_main.svg @@ -1,4 +1,4 @@ - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + + + + - + - + - - Usage:airflowctl [-hGROUP_OR_COMMAND... - -Positional Arguments: -GROUP_OR_COMMAND - -    Groups -assetsPerform Assets operations -authManage authentication for CLI. Either pass token from -environment variable/parameter or pass username and -password. -backfillPerform Backfill operations -configPerform Config operations -connectionsPerform Connections operations -dagrunPerform DagRun operations -dagsPerform Dags operations -jobsPerform Jobs operations -pluginsPerform Plugins operations -poolsPerform Pools operations -providersPerform Providers operations -variablesPerform Variables operations -xcomPerform XCom operations - -    Commands: -versionShow version information - -Options: --h--helpshow this help message and exit + + Usage:airflowctl [-hGROUP_OR_COMMAND... + +Positional Arguments: +GROUP_OR_COMMAND + +    Groups +assetsPerform Assets operations +authManage authentication for CLI. Either pass token from +environment variable/parameter or pass username and +password. +backfillPerform Backfill operations +configPerform Config operations +connectionsPerform Connections operations +dagrunPerform DagRun operations +dagsPerform Dags operations +jobsPerform Jobs operations +pluginsPerform Plugins operations +poolsPerform Pools operations +providersPerform Providers operations +taskinstances +Perform TaskInstances operations +tasksManage Airflow tasks +variablesPerform Variables operations +xcomPerform XCom operations + +    Commands: +versionShow version information + +Options: +-h--helpshow this help message and exit diff --git a/airflow-ctl/src/airflowctl/api/client.py b/airflow-ctl/src/airflowctl/api/client.py index b01200fac1c7f..cd957060cb1db 100644 --- a/airflow-ctl/src/airflowctl/api/client.py +++ b/airflow-ctl/src/airflowctl/api/client.py @@ -57,6 +57,7 @@ PoolsOperations, ProvidersOperations, ServerResponseError, + TaskInstancesOperations, VariablesOperations, VersionOperations, XComOperations, @@ -467,6 +468,12 @@ def xcom(self): """Operations related to XComs.""" return XComOperations(self) + @lru_cache() # type: ignore[prop-decorator] + @property + def task_instances(self): + """Operations related to task instances.""" + return TaskInstancesOperations(self) + @lru_cache() # type: ignore[prop-decorator] @property def plugins(self): diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index f52ba055c1c72..343b9dcc17567 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -68,6 +68,7 @@ ProviderCollectionResponse, QueuedEventCollectionResponse, QueuedEventResponse, + TaskInstanceResponse, TriggerDAGRunPostBody, VariableBody, VariableCollectionResponse, @@ -906,6 +907,18 @@ def delete( raise e +class TaskInstancesOperations(BaseOperations): + """Task instance operations.""" + + def get(self, dag_id: str, dag_run_id: str, task_id: str) -> TaskInstanceResponse | ServerResponseError: + """Get a task instance.""" + try: + self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") + return TaskInstanceResponse.model_validate_json(self.response.content) + except ServerResponseError as e: + raise e + + class PluginsOperations(BaseOperations): """Plugins operations.""" diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index aa96304f501fb..95ce6751936d3 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -268,6 +268,29 @@ def _load_help_texts_yaml() -> dict[str, dict[str, str]]: help="The Dag ID of the Dag to pause or unpause", ) +# Task Commands Args +ARG_TASK_DAG_ID = Arg( + flags=("--dag-id",), + type=str, + dest="dag_id", + required=True, + help="The Dag ID", +) +ARG_DAG_RUN_ID = Arg( + flags=("--dag-run-id",), + type=str, + dest="dag_run_id", + required=True, + help="The Dag Run ID", +) +ARG_TASK_ID = Arg( + flags=("--task-id",), + type=str, + dest="task_id", + required=True, + help="The Task ID", +) + ARG_ACTION_ON_EXISTING_KEY = Arg( flags=("-a", "--action-on-existing-key"), type=str, @@ -953,6 +976,20 @@ def merge_commands( ), ) +TASK_COMMANDS = ( + ActionCommand( + name="state", + help="Get the state of a task instance", + func=lazy_load_command("airflowctl.ctl.commands.task_command.task_state"), + args=( + ARG_TASK_DAG_ID, + ARG_DAG_RUN_ID, + ARG_TASK_ID, + ARG_OUTPUT, + ), + ), +) + core_commands: list[CLICommand] = [ GroupCommand( name="auth", @@ -995,6 +1032,11 @@ def merge_commands( help="Manage Airflow variables", subcommands=VARIABLE_COMMANDS, ), + GroupCommand( + name="tasks", + help="Manage Airflow tasks", + subcommands=TASK_COMMANDS, + ), ] # Add generated group commands core_commands = merge_commands( diff --git a/airflow-ctl/src/airflowctl/ctl/commands/task_command.py b/airflow-ctl/src/airflowctl/ctl/commands/task_command.py new file mode 100644 index 0000000000000..7e54363017c63 --- /dev/null +++ b/airflow-ctl/src/airflowctl/ctl/commands/task_command.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from airflowctl.api.client import NEW_API_CLIENT, ClientKind, provide_api_client +from airflowctl.ctl.console_formatting import AirflowConsole + + +@provide_api_client(kind=ClientKind.CLI) +def task_state(args, api_client=NEW_API_CLIENT) -> None: + """Get the state of a task instance.""" + ti = api_client.task_instances.get( + dag_id=args.dag_id, + dag_run_id=args.dag_run_id, + task_id=args.task_id, + ) + AirflowConsole().print_as(data=[{"state": ti.state}], output=args.output) diff --git a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml index eb566a96b1fb8..b60082bfacfa3 100644 --- a/airflow-ctl/src/airflowctl/ctl/help_texts.yaml +++ b/airflow-ctl/src/airflowctl/ctl/help_texts.yaml @@ -100,3 +100,6 @@ xcom: plugins: list: "List all installed Airflow plugins" list-import-errors: "List all plugin import errors" + +taskinstances: + get: "Retrieve a task instance by Dag ID, run ID, and task ID" From 2440e4bafba1de3579433548cf9a6dc59cc56bc1 Mon Sep 17 00:00:00 2001 From: "Ryuk.Kim" Date: Sat, 2 May 2026 12:40:03 +0900 Subject: [PATCH 2/5] Add unit tests for task_state command and TaskInstancesOperations --- .../ctl/commands/test_task_command.py | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py new file mode 100644 index 0000000000000..497554a0c7853 --- /dev/null +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import datetime +import uuid + +import pytest + +from airflowctl.api.client import ClientKind +from airflowctl.api.datamodels.generated import TaskInstanceResponse, TaskInstanceState +from airflowctl.api.operations import ServerResponseError +from airflowctl.ctl import cli_parser +from airflowctl.ctl.commands import task_command + + +class TestTaskCommands: + parser = cli_parser.get_parser() + dag_id = "example_dag" + dag_run_id = "manual__2024-01-01T00:00:00+00:00" + task_id = "my_task" + + task_instance_response = TaskInstanceResponse( + id=uuid.uuid4(), + task_id=task_id, + dag_id=dag_id, + dag_run_id=dag_run_id, + map_index=-1, + run_after=datetime.datetime(2024, 1, 1, 0, 0, 0), + try_number=1, + max_tries=1, + task_display_name=task_id, + dag_display_name=dag_id, + pool="default_pool", + pool_slots=1, + executor_config="{}", + state=TaskInstanceState.SUCCESS, + ) + + def test_task_state(self, api_client_maker): + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}", + response_json=self.task_instance_response.model_dump(mode="json"), + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + task_command.task_state( + self.parser.parse_args( + [ + "tasks", + "state", + f"--dag-id={self.dag_id}", + f"--dag-run-id={self.dag_run_id}", + f"--task-id={self.task_id}", + ] + ), + api_client=api_client, + ) + + def test_task_state_not_found(self, api_client_maker): + api_client = api_client_maker( + path=f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}", + response_json={"detail": "Task instance not found"}, + expected_http_status_code=404, + kind=ClientKind.CLI, + ) + with pytest.raises(ServerResponseError): + task_command.task_state( + self.parser.parse_args( + [ + "tasks", + "state", + f"--dag-id={self.dag_id}", + f"--dag-run-id={self.dag_run_id}", + f"--task-id={self.task_id}", + ] + ), + api_client=api_client, + ) From 9f9eb16848f0fa3507b39f72cb0e856973ee3fcd Mon Sep 17 00:00:00 2001 From: jasonzb Date: Wed, 6 May 2026 23:56:37 -0400 Subject: [PATCH 3/5] Support --map-index in airflowctl tasks state Allow `airflowctl tasks state` to query mapped task instances, matching the legacy `airflow tasks state --map-index` behaviour. When the flag is non-negative the mapped task instance endpoint is called; otherwise the existing unmapped endpoint is used. The `taskinstances get` auto-generated command also gains optional mapped-instance support via the same `map_index` parameter on `TaskInstancesOperations.get`. --- airflow-ctl/src/airflowctl/api/operations.py | 20 +++++++++++--- airflow-ctl/src/airflowctl/ctl/cli_config.py | 8 ++++++ .../airflowctl/ctl/commands/task_command.py | 1 + .../ctl/commands/test_task_command.py | 26 +++++++++++++++++++ docs/spelling_wordlist.txt | 2 ++ 5 files changed, 54 insertions(+), 3 deletions(-) diff --git a/airflow-ctl/src/airflowctl/api/operations.py b/airflow-ctl/src/airflowctl/api/operations.py index 343b9dcc17567..ca3774d89022f 100644 --- a/airflow-ctl/src/airflowctl/api/operations.py +++ b/airflow-ctl/src/airflowctl/api/operations.py @@ -910,10 +910,24 @@ def delete( class TaskInstancesOperations(BaseOperations): """Task instance operations.""" - def get(self, dag_id: str, dag_run_id: str, task_id: str) -> TaskInstanceResponse | ServerResponseError: - """Get a task instance.""" + def get( + self, + dag_id: str, + dag_run_id: str, + task_id: str, + map_index: int = None, # type: ignore + ) -> TaskInstanceResponse | ServerResponseError: + """ + Get a task instance. + + When ``map_index`` is non-negative, the mapped task instance endpoint is + called; otherwise the standard (unmapped) endpoint is used. + """ + path = f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}" + if map_index is not None and map_index >= 0: + path = f"{path}/{map_index}" try: - self.response = self.client.get(f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}") + self.response = self.client.get(path) return TaskInstanceResponse.model_validate_json(self.response.content) except ServerResponseError as e: raise e diff --git a/airflow-ctl/src/airflowctl/ctl/cli_config.py b/airflow-ctl/src/airflowctl/ctl/cli_config.py index 95ce6751936d3..1b10be6292743 100755 --- a/airflow-ctl/src/airflowctl/ctl/cli_config.py +++ b/airflow-ctl/src/airflowctl/ctl/cli_config.py @@ -290,6 +290,13 @@ def _load_help_texts_yaml() -> dict[str, dict[str, str]]: required=True, help="The Task ID", ) +ARG_MAP_INDEX = Arg( + flags=("--map-index",), + type=int, + dest="map_index", + default=-1, + help="If set, query the mapped task instance with this map index (negative means non-mapped)", +) ARG_ACTION_ON_EXISTING_KEY = Arg( flags=("-a", "--action-on-existing-key"), @@ -985,6 +992,7 @@ def merge_commands( ARG_TASK_DAG_ID, ARG_DAG_RUN_ID, ARG_TASK_ID, + ARG_MAP_INDEX, ARG_OUTPUT, ), ), diff --git a/airflow-ctl/src/airflowctl/ctl/commands/task_command.py b/airflow-ctl/src/airflowctl/ctl/commands/task_command.py index 7e54363017c63..b5dd1b56b9fac 100644 --- a/airflow-ctl/src/airflowctl/ctl/commands/task_command.py +++ b/airflow-ctl/src/airflowctl/ctl/commands/task_command.py @@ -28,5 +28,6 @@ def task_state(args, api_client=NEW_API_CLIENT) -> None: dag_id=args.dag_id, dag_run_id=args.dag_run_id, task_id=args.task_id, + map_index=args.map_index, ) AirflowConsole().print_as(data=[{"state": ti.state}], output=args.output) diff --git a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py index 497554a0c7853..6cccba8dcce30 100644 --- a/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py +++ b/airflow-ctl/tests/airflow_ctl/ctl/commands/test_task_command.py @@ -91,3 +91,29 @@ def test_task_state_not_found(self, api_client_maker): ), api_client=api_client, ) + + @pytest.mark.parametrize("map_index", [0, 1, 7]) + def test_task_state_mapped(self, api_client_maker, map_index): + mapped_response = self.task_instance_response.model_copy(update={"map_index": map_index}) + api_client = api_client_maker( + path=( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}" + f"/taskInstances/{self.task_id}/{map_index}" + ), + response_json=mapped_response.model_dump(mode="json"), + expected_http_status_code=200, + kind=ClientKind.CLI, + ) + task_command.task_state( + self.parser.parse_args( + [ + "tasks", + "state", + f"--dag-id={self.dag_id}", + f"--dag-run-id={self.dag_run_id}", + f"--task-id={self.task_id}", + f"--map-index={map_index}", + ] + ), + api_client=api_client, + ) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index b4460b21e7563..ce262d8a2f736 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1627,6 +1627,8 @@ taskInstance taskinstance TaskInstanceKey taskinstancekey +TaskInstances +taskinstances taskmeta tasksDuration tasksetmeta From af5fb7fe83f46555f9ba6c2dc3e9f3b954433fa3 Mon Sep 17 00:00:00 2001 From: Jason Bian Date: Fri, 19 Jun 2026 17:50:27 -0400 Subject: [PATCH 4/5] Fix airflowctl integration tests for tasks state and taskinstances get The integration test invoked `taskinstances get` with `--dag-id`/`--dag-run-id`/ `--task-id` flags, but that auto-generated command takes positional arguments, so argparse exited with code 2 and the test failed. The flag syntax belongs to the new `tasks state` command, which had no integration coverage at all. Point the entry at `tasks state` (flags) to cover the new command, and add a correctly-formed `taskinstances get` entry using positional args so both commands are exercised. --- .../tests/airflowctl_tests/test_airflowctl_commands.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py index 2965bf1dbb03e..8e7486005d50e 100644 --- a/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py +++ b/airflow-ctl-tests/tests/airflowctl_tests/test_airflowctl_commands.py @@ -93,8 +93,10 @@ def date_param(): "dags update example_bash_operator --no-is-paused", # Dag Run commands "dagrun list --dag-id example_bash_operator --state success --limit=1", - # Task instance commands - need a Dag run with completed tasks - 'taskinstances get --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0', + # Tasks state command (manual command, uses flags) - needs a Dag run with completed tasks + 'tasks state --dag-id=example_bash_operator --dag-run-id="manual__{date_param}" --task-id=runme_0', + # Task instance get (auto-generated command, uses positional args) - needs a Dag run with completed tasks + 'taskinstances get example_bash_operator "manual__{date_param}" runme_0', # XCom commands - need a Dag run with completed tasks 'xcom add example_bash_operator "manual__{date_param}" runme_0 {xcom_key} \'{{"test": "value"}}\'', 'xcom get example_bash_operator "manual__{date_param}" runme_0 {xcom_key}', From 66f21f7400fb6324a6ac74fecce855d9141214f1 Mon Sep 17 00:00:00 2001 From: Jason Bian Date: Sat, 20 Jun 2026 17:40:45 -0400 Subject: [PATCH 5/5] Add TaskInstancesOperations.get tests for airflowctl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cover the underlying task instances get operation directly — mapped and unmapped endpoints plus 404 handling — not just the tasks state command. --- .../tests/airflow_ctl/api/test_operations.py | 100 +++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/airflow-ctl/tests/airflow_ctl/api/test_operations.py b/airflow-ctl/tests/airflow_ctl/api/test_operations.py index 52faecee73ea0..7058829034e45 100644 --- a/airflow-ctl/tests/airflow_ctl/api/test_operations.py +++ b/airflow-ctl/tests/airflow_ctl/api/test_operations.py @@ -91,6 +91,8 @@ QueuedEventCollectionResponse, QueuedEventResponse, ReprocessBehavior, + TaskInstanceResponse, + TaskInstanceState, TriggerDAGRunPostBody, VariableBody, VariableCollectionResponse, @@ -100,7 +102,7 @@ XComResponse, XComResponseNative, ) -from airflowctl.api.operations import BaseOperations +from airflowctl.api.operations import BaseOperations, ServerResponseError from airflowctl.exceptions import AirflowCtlConnectionException if TYPE_CHECKING: @@ -1907,6 +1909,102 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert response == self.key +class TestTaskInstancesOperations: + """Test suite for task instance operations.""" + + dag_id: str = "test_dag" + dag_run_id: str = "manual__2025-01-24T00:00:00+00:00" + task_id: str = "test_task" + + task_instance_response = TaskInstanceResponse( + id=uuid.uuid4(), + task_id=task_id, + dag_id=dag_id, + dag_run_id=dag_run_id, + map_index=-1, + run_after=datetime.datetime(2025, 1, 24, 0, 0, 0), + try_number=1, + max_tries=1, + task_display_name=task_id, + dag_display_name=dag_id, + pool="default_pool", + pool_slots=1, + executor_config="{}", + state=TaskInstanceState.SUCCESS, + ) + + def test_get(self): + """Test fetching an unmapped task instance hits the standard endpoint.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == ( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" + ) + return httpx.Response(200, json=json.loads(self.task_instance_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.task_instances.get( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + ) + assert response == self.task_instance_response + + @pytest.mark.parametrize("map_index", [-1, None]) + def test_get_without_map_index_uses_unmapped_endpoint(self, map_index): + """A negative or omitted ``map_index`` must not append a map index to the path.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == ( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/taskInstances/{self.task_id}" + ) + return httpx.Response(200, json=json.loads(self.task_instance_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.task_instances.get( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + map_index=map_index, + ) + assert response == self.task_instance_response + + @pytest.mark.parametrize("map_index", [0, 1, 7]) + def test_get_with_map_index_uses_mapped_endpoint(self, map_index): + """A non-negative ``map_index`` must hit the mapped task instance endpoint.""" + mapped_response = self.task_instance_response.model_copy(update={"map_index": map_index}) + + def handle_request(request: httpx.Request) -> httpx.Response: + assert request.url.path == ( + f"/api/v2/dags/{self.dag_id}/dagRuns/{self.dag_run_id}/" + f"taskInstances/{self.task_id}/{map_index}" + ) + return httpx.Response(200, json=json.loads(mapped_response.model_dump_json())) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + response = client.task_instances.get( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + map_index=map_index, + ) + assert response == mapped_response + + def test_get_not_found_raises(self): + """A 404 from the server must surface as a ServerResponseError.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + return httpx.Response(404, json={"detail": "Task instance not found"}) + + client = make_api_client(transport=httpx.MockTransport(handle_request)) + with pytest.raises(ServerResponseError): + client.task_instances.get( + dag_id=self.dag_id, + dag_run_id=self.dag_run_id, + task_id=self.task_id, + ) + + class TestPluginsOperations: plugin_response = PluginResponse( name="test-plugin",