diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 00eb51bae10b2..f4f7ac23cb859 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -756,6 +756,7 @@ def _query(orm_object): return task_instance_history_schema.dump(result[0]) +@mark_fastapi_migration_done @provide_session def get_mapped_task_instance_try_details( *, diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index c1aadeb6b6dac..c58bf1c6be433 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -3961,6 +3961,74 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}: + get: + tags: + - Task Instance + summary: Get Mapped Task Instance Try Details + operationId: get_mapped_task_instance_try_details + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: task_try_number + in: path + required: true + schema: + type: integer + title: Task Try Number + - name: map_index + in: path + required: true + schema: + type: integer + title: Map Index + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TaskInstanceHistoryResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/tasks/: get: tags: diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index 9e4ea49e08890..f4769a981b882 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -460,3 +460,25 @@ def _query(orm_object: Base) -> TI | TIH | None: f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}`, try_number: `{task_try_number}` and map_index: `{map_index}` was not found", ) return TaskInstanceHistoryResponse.model_validate(result, from_attributes=True) + + +@task_instances_router.get( + "/{task_id}/{map_index}/tries/{task_try_number}", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +) +def get_mapped_task_instance_try_details( + dag_id: str, + dag_run_id: str, + task_id: str, + task_try_number: int, + session: Annotated[Session, Depends(get_session)], + map_index: int, +) -> TaskInstanceHistoryResponse: + return get_task_instance_try_details( + dag_id=dag_id, + dag_run_id=dag_run_id, + task_id=task_id, + task_try_number=task_try_number, + map_index=map_index, + session=session, + ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index ea4e65d44c91d..fe281c5640f9e 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1092,6 +1092,35 @@ export const UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn = ( useTaskInstanceServiceGetTaskInstanceTryDetailsKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]), ]; +export type TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse = + Awaited< + ReturnType + >; +export type TaskInstanceServiceGetMappedTaskInstanceTryDetailsQueryResult< + TData = TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey = + "TaskInstanceServiceGetMappedTaskInstanceTryDetails"; +export const UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn = ( + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + taskTryNumber: number; + }, + queryKey?: Array, +) => [ + useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey, + ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]), +]; export type TaskServiceGetTasksDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 273882c787dcb..f7872fcc7f8cc 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1471,6 +1471,46 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = ( taskTryNumber, }), }); +/** + * Get Mapped Task Instance Try Details + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ +export const prefetchUseTaskInstanceServiceGetMappedTaskInstanceTryDetails = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + taskTryNumber: number; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn( + { dagId, dagRunId, mapIndex, taskId, taskTryNumber }, + ), + queryFn: () => + TaskInstanceService.getMappedTaskInstanceTryDetails({ + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }), + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 386575204aa0b..74e25c0258a25 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1762,6 +1762,53 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetails = < }) as TData, ...options, }); +/** + * Get Mapped Task Instance Try Details + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetMappedTaskInstanceTryDetails = < + TData = Common.TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + taskTryNumber: number; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn( + { dagId, dagRunId, mapIndex, taskId, taskTryNumber }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getMappedTaskInstanceTryDetails({ + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }) as TData, + ...options, + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 3d57a08a69e64..87b1a7aa6a2ba 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1744,6 +1744,53 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetailsSuspense = < }) as TData, ...options, }); +/** + * Get Mapped Task Instance Try Details + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsSuspense = < + TData = Common.TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + taskTryNumber: number; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn( + { dagId, dagRunId, mapIndex, taskId, taskTryNumber }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getMappedTaskInstanceTryDetails({ + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }) as TData, + ...options, + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 7c0c348c753b8..908a715ab6933 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -125,6 +125,8 @@ import type { GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, + GetMappedTaskInstanceTryDetailsData, + GetMappedTaskInstanceTryDetailsResponse, GetTasksData, GetTasksResponse, GetTaskData, @@ -2095,6 +2097,39 @@ export class TaskInstanceService { }, }); } + + /** + * Get Mapped Task Instance Try Details + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ + public static getMappedTaskInstanceTryDetails( + data: GetMappedTaskInstanceTryDetailsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + task_try_number: data.taskTryNumber, + map_index: data.mapIndex, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class TaskService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index e78e3a186855c..4a9d46d3c214a 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1595,6 +1595,17 @@ export type GetTaskInstanceTryDetailsData = { export type GetTaskInstanceTryDetailsResponse = TaskInstanceHistoryResponse; +export type GetMappedTaskInstanceTryDetailsData = { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + taskTryNumber: number; +}; + +export type GetMappedTaskInstanceTryDetailsResponse = + TaskInstanceHistoryResponse; + export type GetTasksData = { dagId: string; orderBy?: string; @@ -3289,6 +3300,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}": { + get: { + req: GetMappedTaskInstanceTryDetailsData; + res: { + /** + * Successful Response + */ + 200: TaskInstanceHistoryResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dags/{dag_id}/tasks/": { get: { req: GetTasksData; diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 56e2ee5e0e178..f8e75600171b3 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1497,6 +1497,68 @@ def test_should_respond_200_with_different_try_numbers(self, test_client, try_nu "dag_run_id": "TEST_DAG_RUN_ID", } + @pytest.mark.parametrize("try_number", [1, 2]) + def test_should_respond_200_with_mapped_task_at_different_try_numbers( + self, test_client, try_number, session + ): + tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}]) + old_ti = tis[0] + for idx in (1, 2): + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + ti.try_number = 1 + for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "note"]: + setattr(ti, attr, getattr(old_ti, attr)) + session.add(ti) + session.commit() + tis = session.query(TaskInstance).all() + # Record the task instance history + from airflow.models.taskinstance import clear_task_instances + + clear_task_instances(tis, session) + # Simulate the try_number increasing to new values in TI + for ti in tis: + if ti.map_index > 0: + ti.try_number += 1 + ti.queue = "default_queue" + session.merge(ti) + session.commit() + tis = session.query(TaskInstance).all() + # in each loop, we should get the right mapped TI back + for map_index in (1, 2): + # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) + # TODO: Add "REMOTE_USER": "test" as per legacy code after adding Authentication + response = test_client.get( + "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" + f"/print_the_context/{map_index}/tries/{try_number}", + ) + assert response.status_code == 200 + + assert response.json() == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": map_index, + "max_tries": 0 if try_number == 1 else 1, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "failed" if try_number == 1 else None, + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": try_number, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + def test_should_respond_200_with_task_state_in_deferred(self, test_client, session): now = pendulum.now("UTC") ti = self.create_task_instances(