From 2354075c9f1ffb8273568231afe44364af9bc381 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 26 Sep 2024 09:26:13 -0700 Subject: [PATCH] Add basic endpoints for managing backfill entities (#42455) More logic will be added for `create` and `cancel`. We'll need to create dag runs and fail them accordingly. But I'll add that logic separately to make it easier to scrutinize it more closely. Will also follow up with some changes to the security implementation. --------- Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- .../endpoints/backfill_endpoint.py | 181 +++++++ airflow/api_connexion/openapi/v1.yaml | 291 ++++++++++++ airflow/models/backfill.py | 7 - airflow/www/static/js/types/api-generated.ts | 240 ++++++++++ .../endpoints/test_backfill_endpoint.py | 440 ++++++++++++++++++ tests/test_utils/db.py | 7 + 6 files changed, 1159 insertions(+), 7 deletions(-) create mode 100644 airflow/api_connexion/endpoints/backfill_endpoint.py create mode 100644 tests/api_connexion/endpoints/test_backfill_endpoint.py diff --git a/airflow/api_connexion/endpoints/backfill_endpoint.py b/airflow/api_connexion/endpoints/backfill_endpoint.py new file mode 100644 index 0000000000000..f974be4d75d82 --- /dev/null +++ b/airflow/api_connexion/endpoints/backfill_endpoint.py @@ -0,0 +1,181 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import logging +from functools import wraps +from typing import TYPE_CHECKING + +import pendulum +from sqlalchemy import select + +from airflow.api_connexion import security +from airflow.api_connexion.exceptions import Conflict, NotFound +from airflow.api_connexion.schemas.backfill_schema import ( + BackfillCollection, + backfill_collection_schema, + backfill_schema, +) +from airflow.models.backfill import Backfill +from airflow.models.serialized_dag import SerializedDagModel +from airflow.utils import timezone +from airflow.utils.session import NEW_SESSION, provide_session +from airflow.www.decorators import action_logging + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from airflow.api_connexion.types import APIResponse + +log = logging.getLogger(__name__) + +RESOURCE_EVENT_PREFIX = "dag" + + +def backfill_to_dag(func): + """ + Enrich the request with dag_id. + + :meta private: + """ + + @wraps(func) + def wrapper(*, backfill_id, session, **kwargs): + backfill = session.get(Backfill, backfill_id) + if not backfill: + raise NotFound("Backfill not found") + return func(dag_id=backfill.dag_id, backfill_id=backfill_id, session=session, **kwargs) + + return wrapper + + +@provide_session +def _create_backfill( + *, + dag_id: str, + from_date: str, + to_date: str, + max_active_runs: int, + reverse: bool, + dag_run_conf: dict | None, + session: Session = NEW_SESSION, +) -> Backfill: + serdag = session.get(SerializedDagModel, dag_id) + if not serdag: + raise NotFound(f"Could not find dag {dag_id}") + + br = Backfill( + dag_id=dag_id, + from_date=pendulum.parse(from_date), + to_date=pendulum.parse(to_date), + max_active_runs=max_active_runs, + dag_run_conf=dag_run_conf, + ) + session.add(br) + session.commit() + return br + + +@security.requires_access_dag("GET") +@action_logging +@provide_session +def list_backfills(dag_id, session): + backfills = session.scalars(select(Backfill).where(Backfill.dag_id == dag_id)).all() + obj = BackfillCollection( + backfills=backfills, + total_entries=len(backfills), + ) + return backfill_collection_schema.dump(obj) + + +@provide_session +@backfill_to_dag +@security.requires_access_dag("PUT") +@action_logging +def pause_backfill(*, backfill_id, session, **kwargs): + br = session.get(Backfill, backfill_id) + if br.completed_at: + raise Conflict("Backfill is already completed.") + if br.is_paused is False: + br.is_paused = True + session.commit() + return backfill_schema.dump(br) + + +@provide_session +@backfill_to_dag +@security.requires_access_dag("PUT") +@action_logging +def unpause_backfill(*, backfill_id, session, **kwargs): + br = session.get(Backfill, backfill_id) + if br.completed_at: + raise Conflict("Backfill is already completed.") + if br.is_paused: + br.is_paused = False + session.commit() + return backfill_schema.dump(br) + + +@provide_session +@backfill_to_dag +@security.requires_access_dag("PUT") +@action_logging +def cancel_backfill(*, backfill_id, session, **kwargs): + br: Backfill = session.get(Backfill, backfill_id) + if br.completed_at is not None: + raise Conflict("Backfill is already completed.") + + br.completed_at = timezone.utcnow() + + # first, pause + if not br.is_paused: + br.is_paused = True + session.commit() + return backfill_schema.dump(br) + + +@provide_session +@backfill_to_dag +@security.requires_access_dag("GET") +@action_logging +def get_backfill(*, backfill_id: int, session: Session = NEW_SESSION, **kwargs): + backfill = session.get(Backfill, backfill_id) + if backfill: + return backfill_schema.dump(backfill) + raise NotFound("Backfill not found") + + +@security.requires_access_dag("PUT") +@action_logging +def create_backfill( + dag_id: str, + from_date: str, + to_date: str, + max_active_runs: int = 10, + reverse: bool = False, + dag_run_conf: dict | None = None, +) -> APIResponse: + backfill_obj = _create_backfill( + dag_id=dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=max_active_runs, + reverse=reverse, + dag_run_conf=dag_run_conf, + ) + return backfill_schema.dump(backfill_obj) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 0c4b0414775f1..15ad6fd8a4f63 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -245,6 +245,200 @@ servers: description: Apache Airflow Stable API. paths: + # Database entities + /backfills: + get: + summary: List backfills + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: list_backfills + tags: [Backfill] + parameters: + - name: dag_id + in: query + schema: + type: string + required: true + description: | + List backfills for this dag. + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/BackfillCollection" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + + post: + summary: Create a backfill job. + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: create_backfill + tags: [Backfill] + parameters: + - name: dag_id + in: query + schema: + type: string + required: true + description: | + Create dag runs for this dag. + + - name: from_date + in: query + schema: + type: string + format: date-time + required: true + description: | + Create dag runs with logical dates from this date onward, including this date. + + - name: to_date + in: query + schema: + type: string + format: date-time + required: true + description: | + Create dag runs for logical dates up to but not including this date. + + - name: max_active_runs + in: query + schema: + type: integer + required: false + description: | + Maximum number of active DAG runs for the the backfill. + + - name: reverse + in: query + schema: + type: boolean + required: false + description: | + If true, run the dag runs in descending order of logical date. + + - name: config + in: query + schema: + # todo: AIP-78 make this object + type: string + required: false + description: | + If true, run the dag runs in descending order of logical date. + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/Backfill" + "400": + $ref: "#/components/responses/BadRequest" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + + /backfills/{backfill_id}: + parameters: + - $ref: "#/components/parameters/BackfillIdPath" + get: + summary: Get a backfill + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: get_backfill + tags: [Backfill] + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/Backfill" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + + /backfills/{backfill_id}/pause: + parameters: + - $ref: "#/components/parameters/BackfillIdPath" + post: + summary: Pause a backfill + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: pause_backfill + tags: [Backfill] + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/Backfill" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + "409": + $ref: "#/components/responses/Conflict" + + /backfills/{backfill_id}/unpause: + parameters: + - $ref: "#/components/parameters/BackfillIdPath" + post: + summary: Pause a backfill + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: unpause_backfill + tags: [Backfill] + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/Backfill" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + "409": + $ref: "#/components/responses/Conflict" + + /backfills/{backfill_id}/cancel: + parameters: + - $ref: "#/components/parameters/BackfillIdPath" + post: + summary: Cancel a backfill + description: | + When a backfill is cancelled, all queued dag runs will be marked as failed. + Running dag runs will be allowed to continue. + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: cancel_backfill + tags: [Backfill] + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/Backfill" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + "409": + $ref: "#/components/responses/Conflict" + # Database entities /connections: get: @@ -2704,6 +2898,66 @@ components: $ref: "#/components/schemas/UserCollectionItem" - $ref: "#/components/schemas/CollectionInfo" + Backfill: + description: > + Backfill entity object. + + Represents one backfill run / request. + type: object + properties: + id: + type: integer + description: id + dag_id: + type: string + description: The dag_id for the backfill. + from_date: + type: string + nullable: true + description: From date of the backfill (inclusive). + to_date: + type: string + nullable: true + description: To date of the backfill (exclusive). + dag_run_conf: + type: string + nullable: true + description: Dag run conf to be forwarded to the dag runs. + is_paused: + type: boolean + nullable: true + description: is_paused + max_active_runs: + type: integer + nullable: true + description: max_active_runs + created_at: + type: string + nullable: true + description: created_at + completed_at: + type: string + nullable: true + description: completed_at + updated_at: + type: string + nullable: true + description: updated_at + + + BackfillCollection: + type: object + description: | + Collection of backfill entities. + allOf: + - type: object + properties: + backfills: + type: array + items: + $ref: "#/components/schemas/Backfill" + - $ref: "#/components/schemas/CollectionInfo" + ConnectionCollectionItem: description: > Connection collection item. @@ -5125,6 +5379,36 @@ components: # Reusable path, query, header and cookie parameters parameters: + + BackfillIdPath: + in: path + name: backfill_id + schema: + type: integer + required: true + description: | + The integer id identifying the backfill entity. + + FromDate: + in: query + name: from_date + schema: + type: string + format: date-time + required: false + description: | + From date. + + ToDate: + in: query + name: to_date + schema: + type: string + format: date-time + required: false + description: | + To date. + # Pagination parameters PageOffset: in: query @@ -5691,6 +5975,13 @@ components: schema: $ref: "#/components/schemas/Error" # 409 + "Conflict": + description: There is some kind of conflict with the request. + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + # 409 "AlreadyExists": description: An existing resource conflicts with the request. content: diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index cefe16d863f52..8ff2541353688 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -41,7 +41,6 @@ class Backfill(Base): Controls whether new dag runs will be created for this backfill. Does not pause existing dag runs. - todo: AIP-78 Add test """ max_active_runs = Column(Integer, default=10, nullable=False) created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) @@ -49,12 +48,6 @@ class Backfill(Base): updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) -# todo: AIP-78 implement clear_failed_tasks?` -# todo: AIP-78 implement clear_dag_run? - -# todo: (AIP-78) should backfill be supported for things with no schedule, or statically partitioned assets? - - class BackfillDagRun(Base): """Mapping table between backfill run and dag run.""" diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 60fd384df00a7..3616be30a1fac 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -6,6 +6,50 @@ import type { CamelCasedPropertiesDeep } from "type-fest"; */ export interface paths { + "/backfills": { + get: operations["list_backfills"]; + post: operations["create_backfill"]; + }; + "/backfills/{backfill_id}": { + get: operations["get_backfill"]; + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + }; + "/backfills/{backfill_id}/pause": { + post: operations["pause_backfill"]; + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + }; + "/backfills/{backfill_id}/unpause": { + post: operations["unpause_backfill"]; + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + }; + "/backfills/{backfill_id}/cancel": { + /** + * When a backfill is cancelled, all queued dag runs will be marked as failed. + * Running dag runs will be allowed to continue. + */ + post: operations["cancel_backfill"]; + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + }; "/connections": { get: operations["get_connections"]; post: operations["post_connection"]; @@ -861,6 +905,36 @@ export interface components { UserCollection: { users?: components["schemas"]["UserCollectionItem"][]; } & components["schemas"]["CollectionInfo"]; + /** + * @description Backfill entity object. + * Represents one backfill run / request. + */ + Backfill: { + /** @description id */ + id?: number; + /** @description The dag_id for the backfill. */ + dag_id?: string; + /** @description From date of the backfill (inclusive). */ + from_date?: string | null; + /** @description To date of the backfill (exclusive). */ + to_date?: string | null; + /** @description Dag run conf to be forwarded to the dag runs. */ + dag_run_conf?: string | null; + /** @description is_paused */ + is_paused?: boolean | null; + /** @description max_active_runs */ + max_active_runs?: number | null; + /** @description created_at */ + created_at?: string | null; + /** @description completed_at */ + completed_at?: string | null; + /** @description updated_at */ + updated_at?: string | null; + }; + /** @description Collection of backfill entities. */ + BackfillCollection: { + backfills?: components["schemas"]["Backfill"][]; + } & components["schemas"]["CollectionInfo"]; /** * @description Connection collection item. * The password and extra fields are only available when retrieving a single object due to the sensitivity of this data. @@ -2405,6 +2479,12 @@ export interface components { "application/json": components["schemas"]["Error"]; }; }; + /** There is some kind of conflict with the request. */ + Conflict: { + content: { + "application/json": components["schemas"]["Error"]; + }; + }; /** An existing resource conflicts with the request. */ AlreadyExists: { content: { @@ -2419,6 +2499,12 @@ export interface components { }; }; parameters: { + /** @description The integer id identifying the backfill entity. */ + BackfillIdPath: number; + /** @description From date. */ + FromDate: string; + /** @description To date. */ + ToDate: string; /** @description The number of items to skip before starting to collect the result set. */ PageOffset: number; /** @description The numbers of items to return. */ @@ -2621,6 +2707,136 @@ export interface components { } export interface operations { + list_backfills: { + parameters: { + query: { + /** List backfills for this dag. */ + dag_id: string; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["BackfillCollection"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + }; + }; + create_backfill: { + parameters: { + query: { + /** Create dag runs for this dag. */ + dag_id: string; + /** Create dag runs with logical dates from this date onward, including this date. */ + from_date: string; + /** Create dag runs for logical dates up to but not including this date. */ + to_date: string; + /** Maximum number of active DAG runs for the the backfill. */ + max_active_runs?: number; + /** If true, run the dag runs in descending order of logical date. */ + reverse?: boolean; + /** If true, run the dag runs in descending order of logical date. */ + config?: string; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["Backfill"]; + }; + }; + 400: components["responses"]["BadRequest"]; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + }; + }; + get_backfill: { + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["Backfill"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + }; + }; + pause_backfill: { + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["Backfill"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + 409: components["responses"]["Conflict"]; + }; + }; + unpause_backfill: { + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["Backfill"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + 409: components["responses"]["Conflict"]; + }; + }; + /** + * When a backfill is cancelled, all queued dag runs will be marked as failed. + * Running dag runs will be allowed to continue. + */ + cancel_backfill: { + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["Backfill"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + 409: components["responses"]["Conflict"]; + }; + }; get_connections: { parameters: { query: { @@ -5048,6 +5264,12 @@ export type User = CamelCasedPropertiesDeep; export type UserCollection = CamelCasedPropertiesDeep< components["schemas"]["UserCollection"] >; +export type Backfill = CamelCasedPropertiesDeep< + components["schemas"]["Backfill"] +>; +export type BackfillCollection = CamelCasedPropertiesDeep< + components["schemas"]["BackfillCollection"] +>; export type ConnectionCollectionItem = CamelCasedPropertiesDeep< components["schemas"]["ConnectionCollectionItem"] >; @@ -5305,6 +5527,24 @@ export type HealthStatus = CamelCasedPropertiesDeep< export type Operations = operations; /* Types for operation variables */ +export type ListBackfillsVariables = CamelCasedPropertiesDeep< + operations["list_backfills"]["parameters"]["query"] +>; +export type CreateBackfillVariables = CamelCasedPropertiesDeep< + operations["create_backfill"]["parameters"]["query"] +>; +export type GetBackfillVariables = CamelCasedPropertiesDeep< + operations["get_backfill"]["parameters"]["path"] +>; +export type PauseBackfillVariables = CamelCasedPropertiesDeep< + operations["pause_backfill"]["parameters"]["path"] +>; +export type UnpauseBackfillVariables = CamelCasedPropertiesDeep< + operations["unpause_backfill"]["parameters"]["path"] +>; +export type CancelBackfillVariables = CamelCasedPropertiesDeep< + operations["cancel_backfill"]["parameters"]["path"] +>; export type GetConnectionsVariables = CamelCasedPropertiesDeep< operations["get_connections"]["parameters"]["query"] >; diff --git a/tests/api_connexion/endpoints/test_backfill_endpoint.py b/tests/api_connexion/endpoints/test_backfill_endpoint.py new file mode 100644 index 0000000000000..51a4faf40055c --- /dev/null +++ b/tests/api_connexion/endpoints/test_backfill_endpoint.py @@ -0,0 +1,440 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from datetime import datetime +from unittest import mock +from urllib.parse import urlencode + +import pendulum +import pytest + +from airflow.models import DagBag, DagModel +from airflow.models.backfill import Backfill +from airflow.models.dag import DAG +from airflow.models.serialized_dag import SerializedDagModel +from airflow.operators.empty import EmptyOperator +from airflow.security import permissions +from airflow.utils import timezone +from airflow.utils.session import provide_session +from tests.test_utils.api_connexion_utils import create_user, delete_user +from tests.test_utils.db import clear_db_backfills, clear_db_dags, clear_db_runs, clear_db_serialized_dags + +pytestmark = [pytest.mark.db_test] + + +DAG_ID = "test_dag" +TASK_ID = "op1" +DAG2_ID = "test_dag2" +DAG3_ID = "test_dag3" +UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else "Timezone('UTC')" + + +@pytest.fixture(scope="module") +def configured_app(minimal_app_for_api): + app = minimal_app_for_api + + create_user( + app, # type: ignore + username="test", + role_name="Test", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG), + ], + ) + create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore + create_user(app, username="test_granular_permissions", role_name="TestGranularDag") # type: ignore + app.appbuilder.sm.sync_perm_for_dag( # type: ignore + "TEST_DAG_1", + access_control={ + "TestGranularDag": { + permissions.RESOURCE_DAG: {permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ} + }, + }, + ) + + with DAG( + DAG_ID, + schedule=None, + start_date=datetime(2020, 6, 15), + doc_md="details", + params={"foo": 1}, + tags=["example"], + ) as dag: + EmptyOperator(task_id=TASK_ID) + + with DAG(DAG2_ID, schedule=None, start_date=datetime(2020, 6, 15)) as dag2: # no doc_md + EmptyOperator(task_id=TASK_ID) + + with DAG(DAG3_ID, schedule=None) as dag3: # DAG start_date set to None + EmptyOperator(task_id=TASK_ID, start_date=datetime(2019, 6, 12)) + + dag_bag = DagBag(os.devnull, include_examples=False) + dag_bag.dags = {dag.dag_id: dag, dag2.dag_id: dag2, dag3.dag_id: dag3} + + app.dag_bag = dag_bag + + yield app + + delete_user(app, username="test") # type: ignore + delete_user(app, username="test_no_permissions") # type: ignore + delete_user(app, username="test_granular_permissions") # type: ignore + + +class TestBackfillEndpoint: + @staticmethod + def clean_db(): + clear_db_backfills() + clear_db_runs() + clear_db_dags() + clear_db_serialized_dags() + + @pytest.fixture(autouse=True) + def setup_attrs(self, configured_app) -> None: + self.clean_db() + self.app = configured_app + self.client = self.app.test_client() # type:ignore + self.dag_id = DAG_ID + self.dag2_id = DAG2_ID + self.dag3_id = DAG3_ID + + def teardown_method(self) -> None: + self.clean_db() + + @provide_session + def _create_dag_models(self, *, count=1, dag_id_prefix="TEST_DAG", is_paused=False, session=None): + dags = [] + for num in range(1, count + 1): + dag_model = DagModel( + dag_id=f"{dag_id_prefix}_{num}", + fileloc=f"/tmp/dag_{num}.py", + is_active=True, + timetable_summary="0 0 * * *", + is_paused=is_paused, + ) + session.add(dag_model) + dags.append(dag_model) + return dags + + @provide_session + def _create_deactivated_dag(self, session=None): + dag_model = DagModel( + dag_id="TEST_DAG_DELETED_1", + fileloc="/tmp/dag_del_1.py", + schedule_interval="2 2 * * *", + is_active=False, + ) + session.add(dag_model) + + +class TestListBackfills(TestBackfillEndpoint): + def test_should_respond_200(self, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + b = Backfill(dag_id=dag.dag_id, from_date=from_date, to_date=to_date) + session.add(b) + session.commit() + response = self.client.get( + f"/api/v1/backfills?dag_id={dag.dag_id}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "backfills": [ + { + "completed_at": mock.ANY, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": None, + "from_date": from_date.isoformat(), + "id": b.id, + "is_paused": False, + "max_active_runs": 10, + "to_date": to_date.isoformat(), + "updated_at": mock.ANY, + } + ], + "total_entries": 1, + } + + @pytest.mark.parametrize( + "user, expected", + [ + ("test_granular_permissions", 200), + ("test_no_permissions", 403), + ("test", 200), + (None, 401), + ], + ) + def test_should_respond_200_with_granular_dag_access(self, user, expected, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + b = Backfill( + dag_id=dag.dag_id, + from_date=from_date, + to_date=to_date, + ) + + session.add(b) + session.commit() + kwargs = {} + if user: + kwargs.update(environ_overrides={"REMOTE_USER": user}) + response = self.client.get("/api/v1/backfills?dag_id=TEST_DAG_1", **kwargs) + assert response.status_code == expected + + +class TestGetBackfill(TestBackfillEndpoint): + def test_should_respond_200(self, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill(dag_id=dag.dag_id, from_date=from_date, to_date=to_date) + session.add(backfill) + session.commit() + response = self.client.get( + f"/api/v1/backfills/{backfill.id}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "completed_at": mock.ANY, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": None, + "from_date": from_date.isoformat(), + "id": backfill.id, + "is_paused": False, + "max_active_runs": 10, + "to_date": to_date.isoformat(), + "updated_at": mock.ANY, + } + + def test_no_exist(self, session): + response = self.client.get( + f"/api/v1/backfills/{23198409834208}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 404 + assert response.json.get("title") == "Backfill not found" + + @pytest.mark.parametrize( + "user, expected", + [ + ("test_granular_permissions", 200), + ("test_no_permissions", 403), + ("test", 200), + (None, 401), + ], + ) + def test_should_respond_200_with_granular_dag_access(self, user, expected, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill( + dag_id=dag.dag_id, + from_date=from_date, + to_date=to_date, + ) + session.add(backfill) + session.commit() + kwargs = {} + if user: + kwargs.update(environ_overrides={"REMOTE_USER": user}) + response = self.client.get(f"/api/v1/backfills/{backfill.id}", **kwargs) + assert response.status_code == expected + + +class TestCreateBackfill(TestBackfillEndpoint): + @pytest.mark.parametrize( + "user, expected", + [ + ("test_granular_permissions", 200), + ("test_no_permissions", 403), + ("test", 200), + (None, 401), + ], + ) + def test_create_backfill(self, user, expected, session, dag_maker): + with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *") as dag: + EmptyOperator(task_id="mytask") + session.add(SerializedDagModel(dag)) + session.commit() + session.query(DagModel).all() + from_date = pendulum.parse("2024-01-01") + from_date_iso = from_date.isoformat() + to_date = pendulum.parse("2024-02-01") + to_date_iso = to_date.isoformat() + max_active_runs = 5 + query = urlencode( + query={ + "dag_id": dag.dag_id, + "from_date": f"{from_date_iso}", + "to_date": f"{to_date_iso}", + "max_active_runs": max_active_runs, + "reverse": False, + } + ) + kwargs = {} + if user: + kwargs.update(environ_overrides={"REMOTE_USER": user}) + + response = self.client.post( + f"/api/v1/backfills?{query}", + **kwargs, + ) + assert response.status_code == expected + if expected < 300: + assert response.json == { + "completed_at": mock.ANY, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": None, + "from_date": from_date_iso, + "id": mock.ANY, + "is_paused": False, + "max_active_runs": 5, + "to_date": to_date_iso, + "updated_at": mock.ANY, + } + + +class TestPauseBackfill(TestBackfillEndpoint): + def test_should_respond_200(self, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill(dag_id=dag.dag_id, from_date=from_date, to_date=to_date) + session.add(backfill) + session.commit() + response = self.client.post( + f"/api/v1/backfills/{backfill.id}/pause", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "completed_at": mock.ANY, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": None, + "from_date": from_date.isoformat(), + "id": backfill.id, + "is_paused": True, + "max_active_runs": 10, + "to_date": to_date.isoformat(), + "updated_at": mock.ANY, + } + + @pytest.mark.parametrize( + "user, expected", + [ + ("test_granular_permissions", 200), + ("test_no_permissions", 403), + ("test", 200), + (None, 401), + ], + ) + def test_should_respond_200_with_granular_dag_access(self, user, expected, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill( + dag_id=dag.dag_id, + from_date=from_date, + to_date=to_date, + ) + session.add(backfill) + session.commit() + kwargs = {} + if user: + kwargs.update(environ_overrides={"REMOTE_USER": user}) + response = self.client.post(f"/api/v1/backfills/{backfill.id}/pause", **kwargs) + assert response.status_code == expected + + +class TestCancelBackfill(TestBackfillEndpoint): + def test_should_respond_200(self, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill(dag_id=dag.dag_id, from_date=from_date, to_date=to_date) + session.add(backfill) + session.commit() + response = self.client.post( + f"/api/v1/backfills/{backfill.id}/cancel", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "completed_at": mock.ANY, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": None, + "from_date": from_date.isoformat(), + "id": backfill.id, + "is_paused": True, + "max_active_runs": 10, + "to_date": to_date.isoformat(), + "updated_at": mock.ANY, + } + assert pendulum.parse(response.json["completed_at"]) + # now it is marked as completed + assert pendulum.parse(response.json["completed_at"]) + + # get conflict when canceling already-canceled backfill + response = self.client.post( + f"/api/v1/backfills/{backfill.id}/cancel", environ_overrides={"REMOTE_USER": "test"} + ) + assert response.status_code == 409 + + @pytest.mark.parametrize( + "user, expected", + [ + ("test_granular_permissions", 200), + ("test_no_permissions", 403), + ("test", 200), + (None, 401), + ], + ) + def test_should_respond_200_with_granular_dag_access(self, user, expected, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill( + dag_id=dag.dag_id, + from_date=from_date, + to_date=to_date, + ) + session.add(backfill) + session.commit() + kwargs = {} + if user: + kwargs.update(environ_overrides={"REMOTE_USER": user}) + response = self.client.post(f"/api/v1/backfills/{backfill.id}/cancel", **kwargs) + assert response.status_code == expected + if response.status_code < 300: + # now it is marked as completed + assert pendulum.parse(response.json["completed_at"]) + + # get conflict when canceling already-canceled backfill + response = self.client.post(f"/api/v1/backfills/{backfill.id}/cancel", **kwargs) + assert response.status_code == 409 diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py index ceb6bc94b8dce..77875bb03ec51 100644 --- a/tests/test_utils/db.py +++ b/tests/test_utils/db.py @@ -35,6 +35,7 @@ Variable, XCom, ) +from airflow.models.backfill import Backfill, BackfillDagRun from airflow.models.dag import DagOwnerAttributes from airflow.models.dagcode import DagCode from airflow.models.dagwarning import DagWarning @@ -66,6 +67,12 @@ def clear_db_runs(): pass +def clear_db_backfills(): + with create_session() as session: + session.query(BackfillDagRun).delete() + session.query(Backfill).delete() + + def clear_db_datasets(): with create_session() as session: session.query(DatasetEvent).delete()