From 1328d1a03401c229d723697642cada78b06a00db Mon Sep 17 00:00:00 2001 From: Bugra Ozturk Date: Tue, 5 Nov 2024 16:29:17 +0100 Subject: [PATCH] AIP-84 Migrate post a connection to FastAPI API (#43396) * Migrate Create a Connection to FastAPI * Remove additional duplicate comment * Include password in connection and move dashboard.py to serializers/ui/ * Fix test for password * Include password field to response and redact it, run pre-commit after rebase * Convert redact to field_validator and fix tests * Pass field name into redact * run pre-commit after rebase --- .../endpoints/connection_endpoint.py | 1 + .../core_api/openapi/v1-generated.yaml | 102 ++++++++++++++- .../core_api/routes/public/connections.py | 23 ++++ .../core_api/routes/ui/dashboard.py | 2 +- .../core_api/serializers/connections.py | 25 ++++ .../serializers/{ => ui}/dashboard.py | 0 airflow/ui/openapi-gen/queries/common.ts | 3 + airflow/ui/openapi-gen/queries/queries.ts | 40 ++++++ .../ui/openapi-gen/requests/schemas.gen.ts | 110 +++++++++++++++- .../ui/openapi-gen/requests/services.gen.ts | 27 ++++ airflow/ui/openapi-gen/requests/types.gen.ts | 51 +++++++- .../routes/public/test_connections.py | 119 ++++++++++++++++++ 12 files changed, 496 insertions(+), 7 deletions(-) rename airflow/api_fastapi/core_api/serializers/{ => ui}/dashboard.py (100%) diff --git a/airflow/api_connexion/endpoints/connection_endpoint.py b/airflow/api_connexion/endpoints/connection_endpoint.py index 37c91c44eb698..c0c2fcbf4610a 100644 --- a/airflow/api_connexion/endpoints/connection_endpoint.py +++ b/airflow/api_connexion/endpoints/connection_endpoint.py @@ -151,6 +151,7 @@ def patch_connection( return connection_schema.dump(connection) +@mark_fastapi_migration_done @security.requires_access_connection("POST") @provide_session @action_logging( diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 28e38884803a7..06a041b55daab 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1104,6 +1104,49 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + post: + tags: + - Connection + summary: Post Connection + description: Create connection entry. + operationId: post_connection + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectionBody' + responses: + '201': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '409': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Conflict + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/dags/{dag_id}/dagRuns/{dag_run_id}: get: tags: @@ -2515,6 +2558,55 @@ components: - status title: BaseInfoSchema description: Base status field for metadatabase and scheduler. + ConnectionBody: + properties: + connection_id: + type: string + title: Connection Id + conn_type: + type: string + title: Conn Type + description: + anyOf: + - type: string + - type: 'null' + title: Description + host: + anyOf: + - type: string + - type: 'null' + title: Host + login: + anyOf: + - type: string + - type: 'null' + title: Login + schema: + anyOf: + - type: string + - type: 'null' + title: Schema + port: + anyOf: + - type: integer + - type: 'null' + title: Port + password: + anyOf: + - type: string + - type: 'null' + title: Password + extra: + anyOf: + - type: string + - type: 'null' + title: Extra + type: object + required: + - connection_id + - conn_type + title: ConnectionBody + description: Connection Serializer for requests body. ConnectionCollectionResponse: properties: connections: @@ -2564,6 +2656,11 @@ components: - type: integer - type: 'null' title: Port + password: + anyOf: + - type: string + - type: 'null' + title: Password extra: anyOf: - type: string @@ -2578,6 +2675,7 @@ components: - login - schema - port + - password - extra title: ConnectionResponse description: Connection serializer for responses. @@ -3545,7 +3643,7 @@ components: dag_run_states: $ref: '#/components/schemas/DAGRunStates' task_instance_states: - $ref: '#/components/schemas/airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState' + $ref: '#/components/schemas/airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState' type: object required: - dag_run_types @@ -4224,7 +4322,7 @@ components: - git_version title: VersionInfo description: Version information serializer for responses. - airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState: + airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState: properties: no_status: type: integer diff --git a/airflow/api_fastapi/core_api/routes/public/connections.py b/airflow/api_fastapi/core_api/routes/public/connections.py index 8d9f9ddb8ebfc..a31c97c91488f 100644 --- a/airflow/api_fastapi/core_api/routes/public/connections.py +++ b/airflow/api_fastapi/core_api/routes/public/connections.py @@ -26,10 +26,12 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.serializers.connections import ( + ConnectionBody, ConnectionCollectionResponse, ConnectionResponse, ) from airflow.models import Connection +from airflow.utils import helpers connections_router = AirflowRouter(tags=["Connection"], prefix="/connections") @@ -114,3 +116,24 @@ async def get_connections( ], total_entries=total_entries, ) + + +@connections_router.post("/", status_code=201, responses=create_openapi_http_exception_doc([401, 403, 409])) +async def post_connection( + post_body: ConnectionBody, + session: Annotated[Session, Depends(get_session)], +) -> ConnectionResponse: + """Create connection entry.""" + try: + helpers.validate_key(post_body.connection_id, max_length=200) + except Exception as e: + raise HTTPException(400, f"{e}") + + connection = session.scalar(select(Connection).filter_by(conn_id=post_body.connection_id)) + if connection is not None: + raise HTTPException(409, f"Connection with connection_id: `{post_body.connection_id}` already exists") + + connection = Connection(**post_body.model_dump(by_alias=True)) + session.add(connection) + + return ConnectionResponse.model_validate(connection, from_attributes=True) diff --git a/airflow/api_fastapi/core_api/routes/ui/dashboard.py b/airflow/api_fastapi/core_api/routes/ui/dashboard.py index e101ca78be7d9..0eeea4d0dc15d 100644 --- a/airflow/api_fastapi/core_api/routes/ui/dashboard.py +++ b/airflow/api_fastapi/core_api/routes/ui/dashboard.py @@ -25,7 +25,7 @@ from airflow.api_fastapi.common.parameters import DateTimeQuery from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.api_fastapi.core_api.serializers.dashboard import HistoricalMetricDataResponse +from airflow.api_fastapi.core_api.serializers.ui.dashboard import HistoricalMetricDataResponse from airflow.models.dagrun import DagRun, DagRunType from airflow.models.taskinstance import TaskInstance from airflow.utils.state import DagRunState, TaskInstanceState diff --git a/airflow/api_fastapi/core_api/serializers/connections.py b/airflow/api_fastapi/core_api/serializers/connections.py index 1cc069cac0cb3..c5956b6ec517d 100644 --- a/airflow/api_fastapi/core_api/serializers/connections.py +++ b/airflow/api_fastapi/core_api/serializers/connections.py @@ -20,10 +20,12 @@ import json from pydantic import BaseModel, Field, field_validator +from pydantic_core.core_schema import ValidationInfo from airflow.utils.log.secrets_masker import redact +# Response Models class ConnectionResponse(BaseModel): """Connection serializer for responses.""" @@ -34,8 +36,16 @@ class ConnectionResponse(BaseModel): login: str | None schema_: str | None = Field(alias="schema") port: int | None + password: str | None extra: str | None + @field_validator("password", mode="after") + @classmethod + def redact_password(cls, v: str | None, field_info: ValidationInfo) -> str | None: + if v is None: + return None + return redact(v, field_info.field_name) + @field_validator("extra", mode="before") @classmethod def redact_extra(cls, v: str | None) -> str | None: @@ -55,3 +65,18 @@ class ConnectionCollectionResponse(BaseModel): connections: list[ConnectionResponse] total_entries: int + + +# Request Models +class ConnectionBody(BaseModel): + """Connection Serializer for requests body.""" + + connection_id: str = Field(serialization_alias="conn_id") + conn_type: str + description: str | None = Field(default=None) + host: str | None = Field(default=None) + login: str | None = Field(default=None) + schema_: str | None = Field(None, alias="schema") + port: int | None = Field(default=None) + password: str | None = Field(default=None) + extra: str | None = Field(default=None) diff --git a/airflow/api_fastapi/core_api/serializers/dashboard.py b/airflow/api_fastapi/core_api/serializers/ui/dashboard.py similarity index 100% rename from airflow/api_fastapi/core_api/serializers/dashboard.py rename to airflow/api_fastapi/core_api/serializers/ui/dashboard.py diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 36ea524e0138f..2ed842201c2e9 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -682,6 +682,9 @@ export const UseVersionServiceGetVersionKeyFn = (queryKey?: Array) => [ export type BackfillServiceCreateBackfillMutationResult = Awaited< ReturnType >; +export type ConnectionServicePostConnectionMutationResult = Awaited< + ReturnType +>; export type PoolServicePostPoolMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index f4b7c41195f77..583f14f7711d9 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -28,6 +28,7 @@ import { } from "../requests/services.gen"; import { BackfillPostBody, + ConnectionBody, DAGPatchBody, DAGRunPatchBody, DagRunState, @@ -1130,6 +1131,45 @@ export const useBackfillServiceCreateBackfill = < }) as unknown as Promise, ...options, }); +/** + * Post Connection + * Create connection entry. + * @param data The data for the request. + * @param data.requestBody + * @returns ConnectionResponse Successful Response + * @throws ApiError + */ +export const useConnectionServicePostConnection = < + TData = Common.ConnectionServicePostConnectionMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + requestBody: ConnectionBody; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + requestBody: ConnectionBody; + }, + TContext + >({ + mutationFn: ({ requestBody }) => + ConnectionService.postConnection({ + requestBody, + }) as unknown as Promise, + ...options, + }); /** * Post Pool * Create a Pool. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 517743af17f6c..c1dc8cd34576b 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -151,6 +151,100 @@ export const $BaseInfoSchema = { description: "Base status field for metadatabase and scheduler.", } as const; +export const $ConnectionBody = { + properties: { + connection_id: { + type: "string", + title: "Connection Id", + }, + conn_type: { + type: "string", + title: "Conn Type", + }, + description: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Description", + }, + host: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Host", + }, + login: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Login", + }, + schema: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Schema", + }, + port: { + anyOf: [ + { + type: "integer", + }, + { + type: "null", + }, + ], + title: "Port", + }, + password: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Password", + }, + extra: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Extra", + }, + }, + type: "object", + required: ["connection_id", "conn_type"], + title: "ConnectionBody", + description: "Connection Serializer for requests body.", +} as const; + export const $ConnectionCollectionResponse = { properties: { connections: { @@ -236,6 +330,17 @@ export const $ConnectionResponse = { ], title: "Port", }, + password: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Password", + }, extra: { anyOf: [ { @@ -257,6 +362,7 @@ export const $ConnectionResponse = { "login", "schema", "port", + "password", "extra", ], title: "ConnectionResponse", @@ -1731,7 +1837,7 @@ export const $HistoricalMetricDataResponse = { $ref: "#/components/schemas/DAGRunStates", }, task_instance_states: { - $ref: "#/components/schemas/airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState", + $ref: "#/components/schemas/airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState", }, }, type: "object", @@ -2766,7 +2872,7 @@ export const $VersionInfo = { description: "Version information serializer for responses.", } as const; -export const $airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState = +export const $airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState = { properties: { no_status: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 5597b0a6a9cfe..4eecb848a57ce 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -41,6 +41,8 @@ import type { GetConnectionResponse, GetConnectionsData, GetConnectionsResponse, + PostConnectionData, + PostConnectionResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, @@ -661,6 +663,31 @@ export class ConnectionService { }, }); } + + /** + * Post Connection + * Create connection entry. + * @param data The data for the request. + * @param data.requestBody + * @returns ConnectionResponse Successful Response + * @throws ApiError + */ + public static postConnection( + data: PostConnectionData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "POST", + url: "/public/connections/", + body: data.requestBody, + mediaType: "application/json", + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 409: "Conflict", + 422: "Validation Error", + }, + }); + } } export class DagRunService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index e3071b64936a8..603a20d090032 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -43,6 +43,21 @@ export type BaseInfoSchema = { status: string | null; }; +/** + * Connection Serializer for requests body. + */ +export type ConnectionBody = { + connection_id: string; + conn_type: string; + description?: string | null; + host?: string | null; + login?: string | null; + schema?: string | null; + port?: number | null; + password?: string | null; + extra?: string | null; +}; + /** * Connection Collection serializer for responses. */ @@ -62,6 +77,7 @@ export type ConnectionResponse = { login: string | null; schema: string | null; port: number | null; + password: string | null; extra: string | null; }; @@ -413,7 +429,7 @@ export type HealthInfoSchema = { export type HistoricalMetricDataResponse = { dag_run_types: DAGRunTypes; dag_run_states: DAGRunStates; - task_instance_states: airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState; + task_instance_states: airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState; }; /** @@ -651,7 +667,7 @@ export type VersionInfo = { /** * TaskInstance serializer for responses. */ -export type airflow__api_fastapi__core_api__serializers__dashboard__TaskInstanceState = +export type airflow__api_fastapi__core_api__serializers__ui__dashboard__TaskInstanceState = { no_status: number; removed: number; @@ -841,6 +857,12 @@ export type GetConnectionsData = { export type GetConnectionsResponse = ConnectionCollectionResponse; +export type PostConnectionData = { + requestBody: ConnectionBody; +}; + +export type PostConnectionResponse = ConnectionResponse; + export type GetDagRunData = { dagId: string; dagRunId: string; @@ -1512,6 +1534,31 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; + post: { + req: PostConnectionData; + res: { + /** + * Successful Response + */ + 201: ConnectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Conflict + */ + 409: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; }; "/public/dags/{dag_id}/dagRuns/{dag_run_id}": { get: { diff --git a/tests/api_fastapi/core_api/routes/public/test_connections.py b/tests/api_fastapi/core_api/routes/public/test_connections.py index ee9c80219eb18..1dc3cf9d2cd4d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_connections.py +++ b/tests/api_fastapi/core_api/routes/public/test_connections.py @@ -169,3 +169,122 @@ def test_should_respond_200( body = response.json() assert body["total_entries"] == expected_total_entries assert [connection["connection_id"] for connection in body["connections"]] == expected_ids + + +class TestPostConnection(TestConnectionEndpoint): + @pytest.mark.parametrize( + "body", + [ + {"connection_id": TEST_CONN_ID, "conn_type": TEST_CONN_TYPE}, + {"connection_id": TEST_CONN_ID, "conn_type": TEST_CONN_TYPE, "extra": None}, + {"connection_id": TEST_CONN_ID, "conn_type": TEST_CONN_TYPE, "extra": "{}"}, + {"connection_id": TEST_CONN_ID, "conn_type": TEST_CONN_TYPE, "extra": '{"key": "value"}'}, + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + "description": "test_description", + "host": "test_host", + "login": "test_login", + "schema": "test_schema", + "port": 8080, + "extra": '{"key": "value"}', + }, + ], + ) + def test_post_should_respond_200(self, test_client, session, body): + response = test_client.post("/public/connections/", json=body) + assert response.status_code == 201 + connection = session.query(Connection).all() + assert len(connection) == 1 + + @pytest.mark.parametrize( + "body", + [ + {"connection_id": "****", "conn_type": TEST_CONN_TYPE}, + {"connection_id": "test()", "conn_type": TEST_CONN_TYPE}, + {"connection_id": "this_^$#is_invalid", "conn_type": TEST_CONN_TYPE}, + {"connection_id": "iam_not@#$_connection_id", "conn_type": TEST_CONN_TYPE}, + ], + ) + def test_post_should_respond_400_for_invalid_conn_id(self, test_client, body): + response = test_client.post("/public/connections/", json=body) + assert response.status_code == 400 + connection_id = body["connection_id"] + assert response.json() == { + "detail": f"The key '{connection_id}' has to be made of " + "alphanumeric characters, dashes, dots and underscores exclusively", + } + + @pytest.mark.parametrize( + "body", + [ + {"connection_id": TEST_CONN_ID, "conn_type": TEST_CONN_TYPE}, + ], + ) + def test_post_should_respond_already_exist(self, test_client, body): + response = test_client.post("/public/connections/", json=body) + assert response.status_code == 201 + # Another request + response = test_client.post("/public/connections/", json=body) + assert response.status_code == 409 + assert response.json() == { + "detail": f"Connection with connection_id: `{TEST_CONN_ID}` already exists", + } + + @pytest.mark.enable_redact + @pytest.mark.parametrize( + "body, expected_response", + [ + ( + {"connection_id": TEST_CONN_ID, "conn_type": TEST_CONN_TYPE, "password": "test-password"}, + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + "description": None, + "extra": None, + "host": None, + "login": None, + "password": "***", + "port": None, + "schema": None, + }, + ), + ( + {"connection_id": TEST_CONN_ID, "conn_type": TEST_CONN_TYPE, "password": "?>@#+!_%()#"}, + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + "description": None, + "extra": None, + "host": None, + "login": None, + "password": "***", + "port": None, + "schema": None, + }, + ), + ( + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + "password": "A!rF|0wi$aw3s0m3", + "extra": '{"password": "test-password"}', + }, + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + "description": None, + "extra": '{"password": "***"}', + "host": None, + "login": None, + "password": "***", + "port": None, + "schema": None, + }, + ), + ], + ) + def test_post_should_response_201_redacted_password(self, test_client, body, expected_response): + response = test_client.post("/public/connections/", json=body) + assert response.status_code == 201 + assert response.json() == expected_response