From 8bebf84f6194da98f420ab460c42a0515d07fc34 Mon Sep 17 00:00:00 2001 From: kandharvishnuu <148410552+kandharvishnuu@users.noreply.github.com> Date: Wed, 20 Nov 2024 11:40:32 +0530 Subject: [PATCH 1/4] adding get_mapped_task_instance_try_details --- .../endpoints/task_instance_endpoint.py | 1 + .../core_api/openapi/v1-generated.yaml | 68 ++++++++++++++++++ .../core_api/routes/public/task_instances.py | 22 ++++++ airflow/ui/openapi-gen/queries/common.ts | 29 ++++++++ airflow/ui/openapi-gen/queries/prefetch.ts | 40 +++++++++++ airflow/ui/openapi-gen/queries/queries.ts | 47 +++++++++++++ airflow/ui/openapi-gen/queries/suspense.ts | 47 +++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 35 ++++++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 38 ++++++++++ .../routes/public/test_task_instances.py | 69 +++++++++++++++++++ 10 files changed, 396 insertions(+) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 7f43c160f3273..b3e37fd412bea 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -766,6 +766,7 @@ def _query(orm_object): return task_instance_history_schema.dump(result[0]) +@mark_fastapi_migration_done @provide_session def get_mapped_task_instance_try_details( *, diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 11c701f953e9b..c41bb38c3d8cf 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -3800,6 +3800,74 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}: + get: + tags: + - Task Instance + summary: Get Mapped Task Instance Try Details + operationId: get_mapped_task_instance_try_details + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: task_try_number + in: path + required: true + schema: + type: integer + title: Task Try Number + - name: map_index + in: path + required: true + schema: + type: integer + title: Map Index + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TaskInstanceHistoryResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/tasks/: get: tags: diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index 9e4ea49e08890..f4769a981b882 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -460,3 +460,25 @@ def _query(orm_object: Base) -> TI | TIH | None: f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}`, try_number: `{task_try_number}` and map_index: `{map_index}` was not found", ) return TaskInstanceHistoryResponse.model_validate(result, from_attributes=True) + + +@task_instances_router.get( + "/{task_id}/{map_index}/tries/{task_try_number}", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +) +def get_mapped_task_instance_try_details( + dag_id: str, + dag_run_id: str, + task_id: str, + task_try_number: int, + session: Annotated[Session, Depends(get_session)], + map_index: int, +) -> TaskInstanceHistoryResponse: + return get_task_instance_try_details( + dag_id=dag_id, + dag_run_id=dag_run_id, + task_id=task_id, + task_try_number=task_try_number, + map_index=map_index, + session=session, + ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 3d52ef341f31e..10d63655c9687 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1050,6 +1050,35 @@ export const UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn = ( useTaskInstanceServiceGetTaskInstanceTryDetailsKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]), ]; +export type TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse = + Awaited< + ReturnType + >; +export type TaskInstanceServiceGetMappedTaskInstanceTryDetailsQueryResult< + TData = TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey = + "TaskInstanceServiceGetMappedTaskInstanceTryDetails"; +export const UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn = ( + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + taskTryNumber: number; + }, + queryKey?: Array, +) => [ + useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey, + ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]), +]; export type TaskServiceGetTasksDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index baf49a781b5c5..1e96e3ba5e835 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1419,6 +1419,46 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = ( taskTryNumber, }), }); +/** + * Get Mapped Task Instance Try Details + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ +export const prefetchUseTaskInstanceServiceGetMappedTaskInstanceTryDetails = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + taskTryNumber: number; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn( + { dagId, dagRunId, mapIndex, taskId, taskTryNumber }, + ), + queryFn: () => + TaskInstanceService.getMappedTaskInstanceTryDetails({ + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }), + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 6a228e5f520f5..c3e200f3a94e2 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1695,6 +1695,53 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetails = < }) as TData, ...options, }); +/** + * Get Mapped Task Instance Try Details + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetMappedTaskInstanceTryDetails = < + TData = Common.TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + taskTryNumber: number; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn( + { dagId, dagRunId, mapIndex, taskId, taskTryNumber }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getMappedTaskInstanceTryDetails({ + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }) as TData, + ...options, + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index d663543cfd9ca..ebacf98f44013 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1677,6 +1677,53 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetailsSuspense = < }) as TData, ...options, }); +/** + * Get Mapped Task Instance Try Details + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsSuspense = < + TData = Common.TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + taskTryNumber: number; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn( + { dagId, dagRunId, mapIndex, taskId, taskTryNumber }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getMappedTaskInstanceTryDetails({ + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }) as TData, + ...options, + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 72ee239974201..7362f3ed59458 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -121,6 +121,8 @@ import type { GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, + GetMappedTaskInstanceTryDetailsData, + GetMappedTaskInstanceTryDetailsResponse, GetTasksData, GetTasksResponse, GetTaskData, @@ -2027,6 +2029,39 @@ export class TaskInstanceService { }, }); } + + /** + * Get Mapped Task Instance Try Details + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ + public static getMappedTaskInstanceTryDetails( + data: GetMappedTaskInstanceTryDetailsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + task_try_number: data.taskTryNumber, + map_index: data.mapIndex, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class TaskService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 50fb44a057a25..64b65bf5e0dce 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1557,6 +1557,17 @@ export type GetTaskInstanceTryDetailsData = { export type GetTaskInstanceTryDetailsResponse = TaskInstanceHistoryResponse; +export type GetMappedTaskInstanceTryDetailsData = { + dagId: string; + dagRunId: string; + mapIndex: number; + taskId: string; + taskTryNumber: number; +}; + +export type GetMappedTaskInstanceTryDetailsResponse = + TaskInstanceHistoryResponse; + export type GetTasksData = { dagId: string; orderBy?: string; @@ -3189,6 +3200,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}": { + get: { + req: GetMappedTaskInstanceTryDetailsData; + res: { + /** + * Successful Response + */ + 200: TaskInstanceHistoryResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dags/{dag_id}/tasks/": { get: { req: GetTasksData; diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 56e2ee5e0e178..29ecdee609cc4 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1497,6 +1497,75 @@ def test_should_respond_200_with_different_try_numbers(self, test_client, try_nu "dag_run_id": "TEST_DAG_RUN_ID", } + @pytest.mark.parametrize("try_number", [1, 2]) + def test_should_respond_200_with_mapped_task_at_different_try_numbers( + self, test_client, try_number, session + ): + tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}]) + print(tis) + old_ti = tis[0] + print(old_ti) + for idx in (1, 2): + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) + ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) + ti.try_number = 1 + for attr in ["duration", "end_date", "pid", "start_date", "state", "queue", "note"]: + setattr(ti, attr, getattr(old_ti, attr)) + session.add(ti) + session.commit() + tis = session.query(TaskInstance).all() + print(tis) + # Record the task instance history + from airflow.models.taskinstance import clear_task_instances + + clear_task_instances(tis, session) + # Simulate the try_number increasing to new values in TI + for ti in tis: + if ti.map_index > 0: + ti.try_number += 1 + ti.queue = "default_queue" + session.merge(ti) + session.commit() + tis = session.query(TaskInstance).all() + print(tis) + # in each loop, we should get the right mapped TI back + for map_index in (1, 2): + # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) + print( + "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" + f"/print_the_context/{map_index}/tries/{try_number}", + ) + response = test_client.get( + "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" + f"/print_the_context/{map_index}/tries/{try_number}", + ) + assert response.status_code == 200 + + assert response.json() == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": map_index, + "max_tries": 0 if try_number == 1 else 1, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "failed" if try_number == 1 else None, + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": try_number, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + def test_should_respond_200_with_task_state_in_deferred(self, test_client, session): now = pendulum.now("UTC") ti = self.create_task_instances( From 3410e6bdaa9d777fc441c6bf0dd0f1226ff5dddb Mon Sep 17 00:00:00 2001 From: kandharvishnuu <148410552+kandharvishnuu@users.noreply.github.com> Date: Wed, 20 Nov 2024 11:58:07 +0530 Subject: [PATCH 2/4] dummy change --- airflow/api_connexion/endpoints/task_instance_endpoint.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index b3e37fd412bea..7f43c160f3273 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -766,7 +766,6 @@ def _query(orm_object): return task_instance_history_schema.dump(result[0]) -@mark_fastapi_migration_done @provide_session def get_mapped_task_instance_try_details( *, From 09908c12435d1768187397e0a3e9b404367ab4fc Mon Sep 17 00:00:00 2001 From: kandharvishnuu <148410552+kandharvishnuu@users.noreply.github.com> Date: Wed, 20 Nov 2024 12:00:28 +0530 Subject: [PATCH 3/4] revert dummy change --- airflow/api_connexion/endpoints/task_instance_endpoint.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 7f43c160f3273..b3e37fd412bea 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -766,6 +766,7 @@ def _query(orm_object): return task_instance_history_schema.dump(result[0]) +@mark_fastapi_migration_done @provide_session def get_mapped_task_instance_try_details( *, From fe572999669eafc12b29a63327c29c5508c50d54 Mon Sep 17 00:00:00 2001 From: kandharvishnuu <148410552+kandharvishnuu@users.noreply.github.com> Date: Wed, 20 Nov 2024 21:16:37 +0530 Subject: [PATCH 4/4] removing print statements --- .../core_api/routes/public/test_task_instances.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 29ecdee609cc4..f8e75600171b3 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1502,9 +1502,7 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers( self, test_client, try_number, session ): tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}]) - print(tis) old_ti = tis[0] - print(old_ti) for idx in (1, 2): ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) ti.rendered_task_instance_fields = RTIF(ti, render_templates=False) @@ -1514,7 +1512,6 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers( session.add(ti) session.commit() tis = session.query(TaskInstance).all() - print(tis) # Record the task instance history from airflow.models.taskinstance import clear_task_instances @@ -1527,14 +1524,10 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers( session.merge(ti) session.commit() tis = session.query(TaskInstance).all() - print(tis) # in each loop, we should get the right mapped TI back for map_index in (1, 2): # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) - print( - "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" - f"/print_the_context/{map_index}/tries/{try_number}", - ) + # TODO: Add "REMOTE_USER": "test" as per legacy code after adding Authentication response = test_client.get( "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" f"/print_the_context/{map_index}/tries/{try_number}",