diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 83456960753d6..7f43c160f3273 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -728,6 +728,7 @@ def get_mapped_task_instance_dependencies( ) +@mark_fastapi_migration_done @security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE) @provide_session def get_task_instance_try_details( diff --git a/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow/api_fastapi/core_api/datamodels/task_instances.py index cd4caf1b6119a..4712df3273a4e 100644 --- a/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -114,3 +114,39 @@ class TaskInstancesBatchBody(BaseModel): page_offset: NonNegativeInt = 0 page_limit: NonNegativeInt = 100 order_by: str | None = None + + +class TaskInstanceHistoryResponse(BaseModel): + """TaskInstanceHistory serializer for responses.""" + + model_config = ConfigDict(populate_by_name=True) + + task_id: str + dag_id: str + run_id: str = Field(alias="dag_run_id") + map_index: int + start_date: datetime | None + end_date: datetime | None + duration: float | None + state: TaskInstanceState | None + try_number: int + max_tries: int + task_display_name: str + hostname: str | None + unixname: str | None + pool: str + pool_slots: int + queue: str | None + priority_weight: int | None + operator: str | None + queued_dttm: datetime | None = Field(alias="queued_when") + pid: int | None + executor: str | None + executor_config: Annotated[str, BeforeValidator(str)] + + +class TaskInstanceHistoryCollectionResponse(BaseModel): + """TaskInstanceHistory Collection serializer for responses.""" + + task_instances: list[TaskInstanceHistoryResponse] + total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 9b42d5c3a8145..6fe18f74f9bf5 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -3530,6 +3530,76 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}: + get: + tags: + - Task Instance + summary: Get Task Instance Try Details + description: Get task instance details by try number. + operationId: get_task_instance_try_details + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: task_try_number + in: path + required: true + schema: + type: integer + title: Task Try Number + - name: map_index + in: query + required: false + schema: + type: integer + default: -1 + title: Map Index + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/TaskInstanceHistoryResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/tasks/: get: tags: @@ -6041,6 +6111,126 @@ components: - total_entries title: TaskInstanceCollectionResponse description: Task Instance Collection serializer for responses. + TaskInstanceHistoryResponse: + properties: + task_id: + type: string + title: Task Id + dag_id: + type: string + title: Dag Id + dag_run_id: + type: string + title: Dag Run Id + map_index: + type: integer + title: Map Index + start_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Start Date + end_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: End Date + duration: + anyOf: + - type: number + - type: 'null' + title: Duration + state: + anyOf: + - $ref: '#/components/schemas/TaskInstanceState' + - type: 'null' + try_number: + type: integer + title: Try Number + max_tries: + type: integer + title: Max Tries + task_display_name: + type: string + title: Task Display Name + hostname: + anyOf: + - type: string + - type: 'null' + title: Hostname + unixname: + anyOf: + - type: string + - type: 'null' + title: Unixname + pool: + type: string + title: Pool + pool_slots: + type: integer + title: Pool Slots + queue: + anyOf: + - type: string + - type: 'null' + title: Queue + priority_weight: + anyOf: + - type: integer + - type: 'null' + title: Priority Weight + operator: + anyOf: + - type: string + - type: 'null' + title: Operator + queued_when: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Queued When + pid: + anyOf: + - type: integer + - type: 'null' + title: Pid + executor: + anyOf: + - type: string + - type: 'null' + title: Executor + executor_config: + type: string + title: Executor Config + type: object + required: + - task_id + - dag_id + - dag_run_id + - map_index + - start_date + - end_date + - duration + - state + - try_number + - max_tries + - task_display_name + - hostname + - unixname + - pool + - pool_slots + - queue + - priority_weight + - operator + - queued_when + - pid + - executor + - executor_config + title: TaskInstanceHistoryResponse + description: TaskInstanceHistory serializer for responses. TaskInstanceResponse: properties: id: diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index c805095f58208..f0c51b9606f51 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -50,12 +50,15 @@ from airflow.api_fastapi.core_api.datamodels.task_instances import ( TaskDependencyCollectionResponse, TaskInstanceCollectionResponse, + TaskInstanceHistoryResponse, TaskInstanceResponse, TaskInstancesBatchBody, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.exceptions import TaskNotFound +from airflow.models import Base from airflow.models.taskinstance import TaskInstance as TI +from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS from airflow.utils.db import get_query_count @@ -410,3 +413,38 @@ def get_task_instances_batch( ], total_entries=total_entries, ) + + +@task_instances_router.get( + "/{task_id}/tries/{task_try_number}", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +) +def get_task_instance_try_details( + dag_id: str, + dag_run_id: str, + task_id: str, + task_try_number: int, + session: Annotated[Session, Depends(get_session)], + map_index: int = -1, +) -> TaskInstanceHistoryResponse: + """Get task instance details by try number.""" + + def _query(orm_object: Base) -> TI | TIH | None: + query = select(orm_object).where( + orm_object.dag_id == dag_id, + orm_object.run_id == dag_run_id, + orm_object.task_id == task_id, + orm_object.try_number == task_try_number, + orm_object.map_index == map_index, + ) + + task_instance = session.scalar(query) + return task_instance + + result = _query(TI) or _query(TIH) + if result is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}`, try_number: `{task_try_number}` and map_index: `{map_index}` was not found", + ) + return TaskInstanceHistoryResponse.model_validate(result, from_attributes=True) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 14377a81a968c..35881c75255b7 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -975,6 +975,33 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ( }, ]), ]; +export type TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse = + Awaited>; +export type TaskInstanceServiceGetTaskInstanceTryDetailsQueryResult< + TData = TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useTaskInstanceServiceGetTaskInstanceTryDetailsKey = + "TaskInstanceServiceGetTaskInstanceTryDetails"; +export const UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn = ( + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + taskTryNumber: number; + }, + queryKey?: Array, +) => [ + useTaskInstanceServiceGetTaskInstanceTryDetailsKey, + ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]), +]; export type TaskServiceGetTasksDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 0c522f36e4330..0eb6ebb5d5023 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1316,6 +1316,51 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = ( updatedAtLte, }), }); +/** + * Get Task Instance Try Details + * Get task instance details by try number. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ +export const prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = ( + queryClient: QueryClient, + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + taskTryNumber: number; + }, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn({ + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }), + queryFn: () => + TaskInstanceService.getTaskInstanceTryDetails({ + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }), + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index c461db254de2d..29e7842c7f215 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -1576,6 +1576,54 @@ export const useTaskInstanceServiceGetTaskInstances = < }) as TData, ...options, }); +/** + * Get Task Instance Try Details + * Get task instance details by try number. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetTaskInstanceTryDetails = < + TData = Common.TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + taskTryNumber: number; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn( + { dagId, dagRunId, mapIndex, taskId, taskTryNumber }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getTaskInstanceTryDetails({ + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }) as TData, + ...options, + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 1b81422281535..6180587fd8929 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -1558,6 +1558,54 @@ export const useTaskInstanceServiceGetTaskInstancesSuspense = < }) as TData, ...options, }); +/** + * Get Task Instance Try Details + * Get task instance details by try number. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ +export const useTaskInstanceServiceGetTaskInstanceTryDetailsSuspense = < + TData = Common.TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }: { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + taskTryNumber: number; + }, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn( + { dagId, dagRunId, mapIndex, taskId, taskTryNumber }, + queryKey, + ), + queryFn: () => + TaskInstanceService.getTaskInstanceTryDetails({ + dagId, + dagRunId, + mapIndex, + taskId, + taskTryNumber, + }) as TData, + ...options, + }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 1f83e434286b8..61a17ac8371bf 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3036,6 +3036,212 @@ export const $TaskInstanceCollectionResponse = { description: "Task Instance Collection serializer for responses.", } as const; +export const $TaskInstanceHistoryResponse = { + properties: { + task_id: { + type: "string", + title: "Task Id", + }, + dag_id: { + type: "string", + title: "Dag Id", + }, + dag_run_id: { + type: "string", + title: "Dag Run Id", + }, + map_index: { + type: "integer", + title: "Map Index", + }, + start_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Start Date", + }, + end_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "End Date", + }, + duration: { + anyOf: [ + { + type: "number", + }, + { + type: "null", + }, + ], + title: "Duration", + }, + state: { + anyOf: [ + { + $ref: "#/components/schemas/TaskInstanceState", + }, + { + type: "null", + }, + ], + }, + try_number: { + type: "integer", + title: "Try Number", + }, + max_tries: { + type: "integer", + title: "Max Tries", + }, + task_display_name: { + type: "string", + title: "Task Display Name", + }, + hostname: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Hostname", + }, + unixname: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Unixname", + }, + pool: { + type: "string", + title: "Pool", + }, + pool_slots: { + type: "integer", + title: "Pool Slots", + }, + queue: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Queue", + }, + priority_weight: { + anyOf: [ + { + type: "integer", + }, + { + type: "null", + }, + ], + title: "Priority Weight", + }, + operator: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Operator", + }, + queued_when: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Queued When", + }, + pid: { + anyOf: [ + { + type: "integer", + }, + { + type: "null", + }, + ], + title: "Pid", + }, + executor: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Executor", + }, + executor_config: { + type: "string", + title: "Executor Config", + }, + }, + type: "object", + required: [ + "task_id", + "dag_id", + "dag_run_id", + "map_index", + "start_date", + "end_date", + "duration", + "state", + "try_number", + "max_tries", + "task_display_name", + "hostname", + "unixname", + "pool", + "pool_slots", + "queue", + "priority_weight", + "operator", + "queued_when", + "pid", + "executor", + "executor_config", + ], + title: "TaskInstanceHistoryResponse", + description: "TaskInstanceHistory serializer for responses.", +} as const; + export const $TaskInstanceResponse = { properties: { id: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 2d75d9811b127..15c8bd3de5404 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -113,6 +113,8 @@ import type { GetTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, + GetTaskInstanceTryDetailsData, + GetTaskInstanceTryDetailsResponse, GetTasksData, GetTasksResponse, GetTaskData, @@ -1878,6 +1880,42 @@ export class TaskInstanceService { }, }); } + + /** + * Get Task Instance Try Details + * Get task instance details by try number. + * @param data The data for the request. + * @param data.dagId + * @param data.dagRunId + * @param data.taskId + * @param data.taskTryNumber + * @param data.mapIndex + * @returns TaskInstanceHistoryResponse Successful Response + * @throws ApiError + */ + public static getTaskInstanceTryDetails( + data: GetTaskInstanceTryDetailsData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}", + path: { + dag_id: data.dagId, + dag_run_id: data.dagRunId, + task_id: data.taskId, + task_try_number: data.taskTryNumber, + }, + query: { + map_index: data.mapIndex, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class TaskService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 85a8a7140874e..ac20eefdd1d27 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -775,6 +775,34 @@ export type TaskInstanceCollectionResponse = { total_entries: number; }; +/** + * TaskInstanceHistory serializer for responses. + */ +export type TaskInstanceHistoryResponse = { + task_id: string; + dag_id: string; + dag_run_id: string; + map_index: number; + start_date: string | null; + end_date: string | null; + duration: number | null; + state: TaskInstanceState | null; + try_number: number; + max_tries: number; + task_display_name: string; + hostname: string | null; + unixname: string | null; + pool: string; + pool_slots: number; + queue: string | null; + priority_weight: number | null; + operator: string | null; + queued_when: string | null; + pid: number | null; + executor: string | null; + executor_config: string; +}; + /** * TaskInstance serializer for responses. */ @@ -1491,6 +1519,16 @@ export type GetTaskInstancesBatchData = { export type GetTaskInstancesBatchResponse = TaskInstanceCollectionResponse; +export type GetTaskInstanceTryDetailsData = { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + taskTryNumber: number; +}; + +export type GetTaskInstanceTryDetailsResponse = TaskInstanceHistoryResponse; + export type GetTasksData = { dagId: string; orderBy?: string; @@ -3015,6 +3053,33 @@ export type $OpenApiTs = { }; }; }; + "/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}": { + get: { + req: GetTaskInstanceTryDetailsData; + res: { + /** + * Successful Response + */ + 200: TaskInstanceHistoryResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/dags/{dag_id}/tasks/": { get: { req: GetTasksData; diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 48c4b5d1814d2..1b2a0c873f12b 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -31,6 +31,7 @@ from airflow.models.baseoperator import BaseOperator from airflow.models.dagbag import DagBag from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF +from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.models.taskmap import TaskMap from airflow.models.trigger import Trigger from airflow.utils.platform import getuser @@ -1407,3 +1408,174 @@ def test_should_respond_200_for_pagination(self, test_client, session): num_entries_batch3 = len(response_batch3.json()["task_instances"]) assert num_entries_batch3 == ti_count assert len(response_batch3.json()["task_instances"]) == ti_count + + +class TestGetTaskInstanceTry(TestTaskInstanceEndpoint): + def test_should_respond_200(self, test_client, session): + self.create_task_instances(session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True) + response = test_client.get( + "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1" + ) + assert response.status_code == 200 + assert response.json() == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "success", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + + @pytest.mark.parametrize("try_number", [1, 2]) + def test_should_respond_200_with_different_try_numbers(self, test_client, try_number, session): + self.create_task_instances(session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True) + response = test_client.get( + f"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/{try_number}", + ) + + assert response.status_code == 200 + assert response.json() == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0 if try_number == 1 else 1, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "success" if try_number == 1 else None, + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": try_number, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + + def test_should_respond_200_with_task_state_in_deferred(self, test_client, session): + now = pendulum.now("UTC") + ti = self.create_task_instances( + session, + task_instances=[{"state": State.DEFERRED}], + update_extras=True, + )[0] + ti.trigger = Trigger("none", {}) + ti.trigger.created_date = now + ti.triggerer_job = Job() + TriggererJobRunner(job=ti.triggerer_job) + ti.triggerer_job.state = "running" + ti.try_number = 1 + session.merge(ti) + session.flush() + # Record the TaskInstanceHistory + TaskInstanceHistory.record_ti(ti, session=session) + session.flush() + # Change TaskInstance try_number to 2, ensuring api checks TIHistory + ti = session.query(TaskInstance).one_or_none() + ti.try_number = 2 + session.merge(ti) + # Set duration and end_date in TaskInstanceHistory for easy testing + tih = session.query(TaskInstanceHistory).all()[0] + tih.duration = 10000 + tih.end_date = self.default_time + dt.timedelta(days=2) + session.merge(tih) + session.commit() + # Get the task instance details from TIHistory: + response = test_client.get( + "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1", + ) + assert response.status_code == 200 + data = response.json() + + assert data == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "failed", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + + def test_should_respond_200_with_task_state_in_removed(self, test_client, session): + self.create_task_instances( + session, task_instances=[{"state": State.REMOVED}], update_extras=True, with_ti_history=True + ) + response = test_client.get( + "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries/1", + ) + assert response.status_code == 200 + + assert response.json() == { + "dag_id": "example_python_operator", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "removed", + "task_id": "print_the_context", + "task_display_name": "print_the_context", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", + } + + def test_raises_404_for_nonexistent_task_instance(self, test_client, session): + self.create_task_instances(session) + response = test_client.get( + "/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/nonexistent_task/tries/0" + ) + assert response.status_code == 404 + + assert response.json() == { + "detail": "The Task Instance with dag_id: `example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: `nonexistent_task`, try_number: `0` and map_index: `-1` was not found" + }