Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Get Task Instance Try Details #43675

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ def get_mapped_task_instance_dependencies(
)


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_task_instance_try_details(
Expand Down
36 changes: 36 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,39 @@ class TaskInstancesBatchBody(BaseModel):
page_offset: NonNegativeInt = 0
page_limit: NonNegativeInt = 100
order_by: str | None = None


class TaskInstanceHistoryResponse(BaseModel):
"""TaskInstanceHistory serializer for responses."""

model_config = ConfigDict(populate_by_name=True)

task_id: str
dag_id: str
run_id: str = Field(alias="dag_run_id")
map_index: int
start_date: datetime | None
end_date: datetime | None
duration: float | None
state: TaskInstanceState | None
try_number: int
max_tries: int
task_display_name: str
hostname: str | None
unixname: str | None
pool: str
pool_slots: int
queue: str | None
priority_weight: int | None
operator: str | None
queued_dttm: datetime | None = Field(alias="queued_when")
pid: int | None
executor: str | None
executor_config: Annotated[str, BeforeValidator(str)]


class TaskInstanceHistoryCollectionResponse(BaseModel):
"""TaskInstanceHistory Collection serializer for responses."""

task_instances: list[TaskInstanceHistoryResponse]
total_entries: int
190 changes: 190 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3530,6 +3530,76 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}:
get:
tags:
- Task Instance
summary: Get Task Instance Try Details
description: Get task instance details by try number.
operationId: get_task_instance_try_details
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: task_try_number
in: path
required: true
schema:
type: integer
title: Task Try Number
- name: map_index
in: query
required: false
schema:
type: integer
default: -1
title: Map Index
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstanceHistoryResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/tasks/:
get:
tags:
Expand Down Expand Up @@ -6041,6 +6111,126 @@ components:
- total_entries
title: TaskInstanceCollectionResponse
description: Task Instance Collection serializer for responses.
TaskInstanceHistoryResponse:
properties:
task_id:
type: string
title: Task Id
dag_id:
type: string
title: Dag Id
dag_run_id:
type: string
title: Dag Run Id
map_index:
type: integer
title: Map Index
start_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date
end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date
duration:
anyOf:
- type: number
- type: 'null'
title: Duration
state:
anyOf:
- $ref: '#/components/schemas/TaskInstanceState'
- type: 'null'
try_number:
type: integer
title: Try Number
max_tries:
type: integer
title: Max Tries
task_display_name:
type: string
title: Task Display Name
hostname:
anyOf:
- type: string
- type: 'null'
title: Hostname
unixname:
anyOf:
- type: string
- type: 'null'
title: Unixname
pool:
type: string
title: Pool
pool_slots:
type: integer
title: Pool Slots
queue:
anyOf:
- type: string
- type: 'null'
title: Queue
priority_weight:
anyOf:
- type: integer
- type: 'null'
title: Priority Weight
operator:
anyOf:
- type: string
- type: 'null'
title: Operator
queued_when:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Queued When
pid:
anyOf:
- type: integer
- type: 'null'
title: Pid
executor:
anyOf:
- type: string
- type: 'null'
title: Executor
executor_config:
type: string
title: Executor Config
type: object
required:
- task_id
- dag_id
- dag_run_id
- map_index
- start_date
- end_date
- duration
- state
- try_number
- max_tries
- task_display_name
- hostname
- unixname
- pool
- pool_slots
- queue
- priority_weight
- operator
- queued_when
- pid
- executor
- executor_config
title: TaskInstanceHistoryResponse
description: TaskInstanceHistory serializer for responses.
TaskInstanceResponse:
properties:
id:
Expand Down
38 changes: 38 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@
from airflow.api_fastapi.core_api.datamodels.task_instances import (
TaskDependencyCollectionResponse,
TaskInstanceCollectionResponse,
TaskInstanceHistoryResponse,
TaskInstanceResponse,
TaskInstancesBatchBody,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.exceptions import TaskNotFound
from airflow.models import Base
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.utils.db import get_query_count
Expand Down Expand Up @@ -410,3 +413,38 @@ def get_task_instances_batch(
],
total_entries=total_entries,
)


@task_instances_router.get(
"/{task_id}/tries/{task_try_number}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
)
def get_task_instance_try_details(
dag_id: str,
dag_run_id: str,
task_id: str,
task_try_number: int,
session: Annotated[Session, Depends(get_session)],
map_index: int = -1,
) -> TaskInstanceHistoryResponse:
"""Get task instance details by try number."""

def _query(orm_object: Base) -> TI | TIH | None:
query = select(orm_object).where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.try_number == task_try_number,
orm_object.map_index == map_index,
)

task_instance = session.scalar(query)
return task_instance

result = _query(TI) or _query(TIH)
if result is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}`, try_number: `{task_try_number}` and map_index: `{map_index}` was not found",
)
return TaskInstanceHistoryResponse.model_validate(result, from_attributes=True)
27 changes: 27 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,33 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = (
},
]),
];
export type TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse =
Awaited<ReturnType<typeof TaskInstanceService.getTaskInstanceTryDetails>>;
export type TaskInstanceServiceGetTaskInstanceTryDetailsQueryResult<
TData = TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetTaskInstanceTryDetailsKey =
"TaskInstanceServiceGetTaskInstanceTryDetails";
export const UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn = (
{
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetTaskInstanceTryDetailsKey,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]),
];
export type TaskServiceGetTasksDefaultResponse = Awaited<
ReturnType<typeof TaskService.getTasks>
>;
Expand Down
45 changes: 45 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,51 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (
updatedAtLte,
}),
});
/**
* Get Task Instance Try Details
* Get task instance details by try number.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @param data.taskTryNumber
* @param data.mapIndex
* @returns TaskInstanceHistoryResponse Successful Response
* @throws ApiError
*/
export const prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = (
queryClient: QueryClient,
{
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}: {
dagId: string;
dagRunId: string;
mapIndex?: number;
taskId: string;
taskTryNumber: number;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn({
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}),
queryFn: () =>
TaskInstanceService.getTaskInstanceTryDetails({
dagId,
dagRunId,
mapIndex,
taskId,
taskTryNumber,
}),
});
/**
* Get Tasks
* Get tasks for DAG.
Expand Down
Loading