Skip to content

Commit

Permalink
change in get_task_instance_try_details
Browse files Browse the repository at this point in the history
  • Loading branch information
kandharvishnuu committed Nov 5, 2024
1 parent 3369cde commit 7a55eb9
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 27 deletions.
7 changes: 7 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,13 @@ paths:
schema:
type: integer
title: Task Try Number
- name: map_index
in: query
required: false
schema:
type: integer
default: -1
title: Map Index
responses:
'200':
description: Successful Response
Expand Down
34 changes: 20 additions & 14 deletions airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

from fastapi import Depends, HTTPException, status
from sqlalchemy.exc import MultipleResultsFound
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.sql import select
from typing_extensions import Annotated
Expand All @@ -27,6 +28,7 @@
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.task_instances import TaskInstanceResponse
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH

task_instances_router = AirflowRouter(
tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances"
Expand Down Expand Up @@ -104,27 +106,31 @@ async def get_task_instance_try_details(
task_id: str,
task_try_number: int,
session: Annotated[Session, Depends(get_session)],
map_index: int = -1,
) -> TaskInstanceResponse:
"""Get task instance details by try number."""
query = (
select(TI)
.where(

def _query(TI):
query = select(TI).where(
TI.dag_id == dag_id,
TI.run_id == dag_run_id,
TI.task_id == task_id,
TI.try_number == task_try_number,
TI.map_index == map_index,
)
.join(TI.dag_run)
.options(joinedload(TI.rendered_task_instance_fields))
)
task_instance = session.scalar(query)

if task_instance is None:
try:
task_instance = session.scalar(query)
except MultipleResultsFound:
raise HTTPException(
404,
f"Multiple Task Instances with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and try_number: `{task_try_number}` were found",
)
return task_instance

result = _query(TI) or _query(TIH)
if result is None:
raise HTTPException(
404,
f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and try_number: `{task_try_number}` was not found",
f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}`, try_number: `{task_try_number}` and map_index: `{map_index}` was not found",
)
if task_instance.map_index != -1:
raise HTTPException(404, "Task instance is mapped, add the map_index value to the URL")

return TaskInstanceResponse.model_validate(task_instance, from_attributes=True)
return TaskInstanceResponse.model_validate(result, from_attributes=True)
4 changes: 3 additions & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,18 +640,20 @@ export const UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn = (
{
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetTaskInstanceTryDetailsKey,
...(queryKey ?? [{ dagId, dagRunId, taskId, taskTryNumber }]),
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]),
];
export type VariableServiceGetVariableDefaultResponse = Awaited<
ReturnType<typeof VariableService.getVariable>
Expand Down
5 changes: 5 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstance = (
* @param data.dagRunId
* @param data.taskId
* @param data.taskTryNumber
* @param data.mapIndex
* @returns TaskInstanceResponse Successful Response
* @throws ApiError
*/
Expand All @@ -830,11 +831,13 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = (
{
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
},
Expand All @@ -843,13 +846,15 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = (
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn({
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}),
queryFn: () =>
TaskInstanceService.getTaskInstanceTryDetails({
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}),
Expand Down
6 changes: 5 additions & 1 deletion airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,7 @@ export const useTaskInstanceServiceGetMappedTaskInstance = <
* @param data.dagRunId
* @param data.taskId
* @param data.taskTryNumber
* @param data.mapIndex
* @returns TaskInstanceResponse Successful Response
* @throws ApiError
*/
Expand All @@ -1027,11 +1028,13 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetails = <
{
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
},
Expand All @@ -1040,13 +1043,14 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetails = <
) =>
useQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn(
{ dagId, dagRunId, taskId, taskTryNumber },
{ dagId, dagRunId, mapIndex, taskId, taskTryNumber },
queryKey,
),
queryFn: () =>
TaskInstanceService.getTaskInstanceTryDetails({
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}) as TData,
Expand Down
6 changes: 5 additions & 1 deletion airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,7 @@ export const useTaskInstanceServiceGetMappedTaskInstanceSuspense = <
* @param data.dagRunId
* @param data.taskId
* @param data.taskTryNumber
* @param data.mapIndex
* @returns TaskInstanceResponse Successful Response
* @throws ApiError
*/
Expand All @@ -1013,11 +1014,13 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetailsSuspense = <
{
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
},
Expand All @@ -1026,13 +1029,14 @@ export const useTaskInstanceServiceGetTaskInstanceTryDetailsSuspense = <
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn(
{ dagId, dagRunId, taskId, taskTryNumber },
{ dagId, dagRunId, mapIndex, taskId, taskTryNumber },
queryKey,
),
queryFn: () =>
TaskInstanceService.getTaskInstanceTryDetails({
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}) as TData,
Expand Down
4 changes: 4 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,7 @@ export class TaskInstanceService {
* @param data.dagRunId
* @param data.taskId
* @param data.taskTryNumber
* @param data.mapIndex
* @returns TaskInstanceResponse Successful Response
* @throws ApiError
*/
Expand All @@ -1257,6 +1258,9 @@ export class TaskInstanceService {
task_id: data.taskId,
task_try_number: data.taskTryNumber,
},
query: {
map_index: data.mapIndex,
},
errors: {
401: "Unauthorized",
403: "Forbidden",
Expand Down
1 change: 1 addition & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,7 @@ export type GetMappedTaskInstanceResponse = TaskInstanceResponse;
export type GetTaskInstanceTryDetailsData = {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
};
Expand Down
12 changes: 2 additions & 10 deletions tests/api_fastapi/core_api/routes/public/test_task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def test_should_respond_404_wrong_map_index(self, test_client, session):

class TestGetTaskInstanceTryDetails(TestTaskInstanceEndpoint):
def test_should_respond_200_get_task_instance_try_details(self, test_client, session):
self.create_task_instances(session, task_instances=[{"state": State.RUNNING, "try_number": 1}])
self.create_task_instances(session, task_instances=[{"try_number": 1}])
response = test_client.get(
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1"
)
Expand Down Expand Up @@ -501,13 +501,5 @@ def test_should_respond_404_wrong_task_instance_try_details(self, test_client, s
assert response.status_code == 404

assert response.json() == {
"detail": "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context` and try_number: `1` was not found"
"detail": "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `print_the_context`, try_number: `1` and map_index: `-1` was not found"
}

def test_should_respond_404_mapped_task_instance_try_details(self, test_client, session):
self.create_task_instances(session, task_instances=[{"map_index": 0}])
response = test_client.get(
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/0"
)
assert response.status_code == 404
assert response.json() == {"detail": "Task instance is mapped, add the map_index value to the URL"}

0 comments on commit 7a55eb9

Please sign in to comment.