diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 7720797b40808..9e18a2d837d6e 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -2195,6 +2195,13 @@ paths: schema: type: integer title: Task Try Number + - name: map_index + in: query + required: false + schema: + type: integer + default: -1 + title: Map Index responses: '200': description: Successful Response diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index d3393b66a928f..dbc9e9d7b69e9 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -18,6 +18,7 @@ from __future__ import annotations from fastapi import Depends, HTTPException, status +from sqlalchemy.exc import MultipleResultsFound from sqlalchemy.orm import Session, joinedload from sqlalchemy.sql import select from typing_extensions import Annotated @@ -27,6 +28,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.serializers.task_instances import TaskInstanceResponse from airflow.models.taskinstance import TaskInstance as TI +from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH task_instances_router = AirflowRouter( tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" @@ -104,27 +106,31 @@ async def get_task_instance_try_details( task_id: str, task_try_number: int, session: Annotated[Session, Depends(get_session)], + map_index: int = -1, ) -> TaskInstanceResponse: """Get task instance details by try number.""" - query = ( - select(TI) - .where( + + def _query(TI): + query = select(TI).where( TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id, TI.try_number == task_try_number, + TI.map_index == map_index, ) - .join(TI.dag_run) - .options(joinedload(TI.rendered_task_instance_fields)) - ) - task_instance = session.scalar(query) - - if task_instance is None: + try: + task_instance = session.scalar(query) + except MultipleResultsFound: + raise HTTPException( + 404, + f"Multiple Task Instances with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and try_number: `{task_try_number}` were found", + ) + return task_instance + + result = _query(TI) or _query(TIH) + if result is None: raise HTTPException( 404, - f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and try_number: `{task_try_number}` was not found", + f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}`, try_number: `{task_try_number}` and map_index: `{map_index}` was not found", ) - if task_instance.map_index != -1: - raise HTTPException(404, "Task instance is mapped, add the map_index value to the URL") - - return TaskInstanceResponse.model_validate(task_instance, from_attributes=True) + return TaskInstanceResponse.model_validate(result, from_attributes=True) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 5ff90a246d2be..9010b8de33370 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -640,18 +640,20 @@ export const UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn = ( { dagId, dagRunId, + mapIndex, taskId, taskTryNumber, }: { dagId: string; dagRunId: string; + mapIndex?: number; taskId: string; taskTryNumber: number; }, queryKey?: Array, ) => [ useTaskInstanceServiceGetTaskInstanceTryDetailsKey, - ...(queryKey ?? [{ dagId, dagRunId, taskId, taskTryNumber }]), + ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]), ]; export type VariableServiceGetVariableDefaultResponse = Awaited< ReturnType diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index e18dda4d71124..8b8abe1cabf3f 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -822,6 +822,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstance = ( * @param data.dagRunId * @param data.taskId * @param data.taskTryNumber + * @param data.mapIndex * @returns TaskInstanceResponse Successful Response * @throws ApiError */ @@ -830,11 +831,13 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = ( { dagId, dagRunId, + mapIndex, taskId, taskTryNumber, }: { dagId: string; dagRunId: string; + mapIndex?: number; taskId: string; taskTryNumber: number; }, @@ -843,6 +846,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = ( queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn({ dagId, dagRunId, + mapIndex, taskId, taskTryNumber, }), @@ -850,6 +854,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = ( TaskInstanceService.getTaskInstanceTryDetails({ dagId, dagRunId, + mapIndex, taskId, taskTryNumber, }), diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 51d30b7ced6a0..04677b7641d7f 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1016,6 +1016,7 @@ export const useTaskInstanceServiceGetMappedTaskInstance = < * @param data.dagRunId * @param data.taskId * @param data.taskTryNumber + * @param data.mapIndex * @returns TaskInstanceResponse Successful Response * @throws ApiError */ @@ -1027,11 +1028,13 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetails = < { dagId, dagRunId, + mapIndex, taskId, taskTryNumber, }: { dagId: string; dagRunId: string; + mapIndex?: number; taskId: string; taskTryNumber: number; }, @@ -1040,13 +1043,14 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetails = < ) => useQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn( - { dagId, dagRunId, taskId, taskTryNumber }, + { dagId, dagRunId, mapIndex, taskId, taskTryNumber }, queryKey, ), queryFn: () => TaskInstanceService.getTaskInstanceTryDetails({ dagId, dagRunId, + mapIndex, taskId, taskTryNumber, }) as TData, diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 9a91067ea1051..9ecfd76fa7b56 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1002,6 +1002,7 @@ export const useTaskInstanceServiceGetMappedTaskInstanceSuspense = < * @param data.dagRunId * @param data.taskId * @param data.taskTryNumber + * @param data.mapIndex * @returns TaskInstanceResponse Successful Response * @throws ApiError */ @@ -1013,11 +1014,13 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetailsSuspense = < { dagId, dagRunId, + mapIndex, taskId, taskTryNumber, }: { dagId: string; dagRunId: string; + mapIndex?: number; taskId: string; taskTryNumber: number; }, @@ -1026,13 +1029,14 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetailsSuspense = < ) => useSuspenseQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn( - { dagId, dagRunId, taskId, taskTryNumber }, + { dagId, dagRunId, mapIndex, taskId, taskTryNumber }, queryKey, ), queryFn: () => TaskInstanceService.getTaskInstanceTryDetails({ dagId, dagRunId, + mapIndex, taskId, taskTryNumber, }) as TData, diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index e7cc3e07dde3f..873eae2ff3b5e 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -1242,6 +1242,7 @@ export class TaskInstanceService { * @param data.dagRunId * @param data.taskId * @param data.taskTryNumber + * @param data.mapIndex * @returns TaskInstanceResponse Successful Response * @throws ApiError */ @@ -1257,6 +1258,9 @@ export class TaskInstanceService { task_id: data.taskId, task_try_number: data.taskTryNumber, }, + query: { + map_index: data.mapIndex, + }, errors: { 401: "Unauthorized", 403: "Forbidden", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 190707da0a437..a31f88b93475a 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -990,6 +990,7 @@ export type GetMappedTaskInstanceResponse = TaskInstanceResponse; export type GetTaskInstanceTryDetailsData = { dagId: string; dagRunId: string; + mapIndex?: number; taskId: string; taskTryNumber: number; }; diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 8965fb4bcacb1..2cd2076c50063 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -455,7 +455,7 @@ def test_should_respond_404_wrong_map_index(self, test_client, session): class TestGetTaskInstanceTryDetails(TestTaskInstanceEndpoint): def test_should_respond_200_get_task_instance_try_details(self, test_client, session): - self.create_task_instances(session, task_instances=[{"state": State.RUNNING, "try_number": 1}]) + self.create_task_instances(session, task_instances=[{"try_number": 1}]) response = test_client.get( "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1" ) @@ -501,13 +501,5 @@ def test_should_respond_404_wrong_task_instance_try_details(self, test_client, s assert response.status_code == 404 assert response.json() == { - "detail": "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context` and try_number: `1` was not found" + "detail": "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context`, try_number: `1` and map_index: `-1` was not found" } - - def test_should_respond_404_mapped_task_instance_try_details(self, test_client, session): - self.create_task_instances(session, task_instances=[{"map_index": 0}]) - response = test_client.get( - "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/0" - ) - assert response.status_code == 404 - assert response.json() == {"detail": "Task instance is mapped, add the map_index value to the URL"}