From 38cefe013d0ec55b692e703ea131426d2ac3fd86 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Thu, 28 Nov 2024 22:20:57 +0100 Subject: [PATCH] Remove internal-api command This PR removes CLI and openapi specification for the API. Part of #44436 --- .pre-commit-config.yaml | 1 - airflow/api_internal/__init__.py | 16 -- airflow/api_internal/endpoints/__init__.py | 16 -- .../api_internal/endpoints/health_endpoint.py | 22 -- .../endpoints/rpc_api_endpoint.py | 246 ----------------- airflow/api_internal/gunicorn_config.py | 33 --- airflow/api_internal/internal_api_call.py | 13 - .../api_internal/openapi/internal_api_v1.yaml | 101 ------- airflow/cli/cli_config.py | 71 ----- airflow/cli/commands/internal_api_command.py | 255 ------------------ docs/conf.py | 1 + scripts/cov/cli_coverage.py | 1 - 12 files changed, 1 insertion(+), 775 deletions(-) delete mode 100644 airflow/api_internal/__init__.py delete mode 100644 airflow/api_internal/endpoints/__init__.py delete mode 100644 airflow/api_internal/endpoints/health_endpoint.py delete mode 100644 airflow/api_internal/endpoints/rpc_api_endpoint.py delete mode 100644 airflow/api_internal/gunicorn_config.py delete mode 100644 airflow/api_internal/openapi/internal_api_v1.yaml delete mode 100644 airflow/cli/commands/internal_api_command.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 326ba2b1cb8f8..63fa31a35e8cc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -607,7 +607,6 @@ repos: (?x) ^airflow/api_connexion/openapi/v1.yaml$| ^airflow/ui/openapi-gen/| - ^airflow/cli/commands/internal_api_command.py$| ^airflow/cli/commands/fastapi_api_command.py$| ^airflow/cli/commands/webserver_command.py$| ^airflow/config_templates/| diff --git a/airflow/api_internal/__init__.py b/airflow/api_internal/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/airflow/api_internal/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow/api_internal/endpoints/__init__.py b/airflow/api_internal/endpoints/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/airflow/api_internal/endpoints/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow/api_internal/endpoints/health_endpoint.py b/airflow/api_internal/endpoints/health_endpoint.py deleted file mode 100644 index a6c8a9c7950da..0000000000000 --- a/airflow/api_internal/endpoints/health_endpoint.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import annotations - - -def health(): - return {} diff --git a/airflow/api_internal/endpoints/rpc_api_endpoint.py b/airflow/api_internal/endpoints/rpc_api_endpoint.py deleted file mode 100644 index 2db39c345f9f7..0000000000000 --- a/airflow/api_internal/endpoints/rpc_api_endpoint.py +++ /dev/null @@ -1,246 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import annotations - -import functools -import json -import logging -from typing import TYPE_CHECKING, Any, Callable -from uuid import uuid4 - -from flask import Response, request -from itsdangerous import BadSignature -from jwt import ( - ExpiredSignatureError, - ImmatureSignatureError, - InvalidAudienceError, - InvalidIssuedAtError, - InvalidSignatureError, -) - -from airflow.api_connexion.exceptions import PermissionDenied -from airflow.configuration import conf -from airflow.exceptions import AirflowException -from airflow.jobs.job import Job, most_recent_job -from airflow.models.taskinstance import _record_task_map_for_downstreams -from airflow.models.xcom_arg import _get_task_map_length -from airflow.sensors.base import _orig_start_date -from airflow.serialization.serialized_objects import BaseSerialization -from airflow.utils.jwt_signer import JWTSigner -from airflow.utils.session import create_session - -if TYPE_CHECKING: - from airflow.api_connexion.types import APIResponse - -log = logging.getLogger(__name__) - - -@functools.lru_cache -def initialize_method_map() -> dict[str, Callable]: - from airflow.api.common.trigger_dag import trigger_dag - from airflow.assets.manager import AssetManager - from airflow.cli.commands.task_command import _get_ti_db_access - from airflow.dag_processing.manager import DagFileProcessorManager - from airflow.dag_processing.processor import DagFileProcessor - from airflow.models import Trigger, Variable, XCom - from airflow.models.dag import DAG, DagModel - from airflow.models.dagrun import DagRun - from airflow.models.dagwarning import DagWarning - from airflow.models.renderedtifields import RenderedTaskInstanceFields - from airflow.models.serialized_dag import SerializedDagModel - from airflow.models.skipmixin import SkipMixin - from airflow.models.taskinstance import ( - TaskInstance, - _add_log, - _defer_task, - _get_template_context, - _handle_failure, - _handle_reschedule, - _update_rtif, - _update_ti_heartbeat, - _xcom_pull, - ) - from airflow.sdk.definitions.asset import expand_alias_to_assets - from airflow.secrets.metastore import MetastoreBackend - from airflow.utils.cli_action_loggers import _default_action_log_internal - from airflow.utils.log.file_task_handler import FileTaskHandler - - functions: list[Callable] = [ - _default_action_log_internal, - _defer_task, - _get_template_context, - _get_ti_db_access, - _get_task_map_length, - _update_rtif, - _update_ti_heartbeat, - _orig_start_date, - _handle_failure, - _handle_reschedule, - _add_log, - _xcom_pull, - _record_task_map_for_downstreams, - trigger_dag, - DagModel.deactivate_deleted_dags, - DagModel.get_paused_dag_ids, - DagModel.get_current, - DagFileProcessor._execute_task_callbacks, - DagFileProcessor.execute_callbacks, - DagFileProcessor.execute_callbacks_without_dag, - DagFileProcessor.save_dag_to_db, - DagFileProcessor.update_import_errors, - DagFileProcessor._validate_task_pools_and_update_dag_warnings, - DagFileProcessorManager._fetch_callbacks, - DagFileProcessorManager._get_priority_filelocs, - DagFileProcessorManager.clear_nonexistent_import_errors, - DagFileProcessorManager.deactivate_stale_dags, - DagWarning.purge_inactive_dag_warnings, - expand_alias_to_assets, - AssetManager.register_asset_change, - FileTaskHandler._render_filename_db_access, - Job._add_to_db, - Job._fetch_from_db, - Job._kill, - Job._update_heartbeat, - Job._update_in_db, - most_recent_job, - MetastoreBackend._fetch_connection, - MetastoreBackend._fetch_variable, - XCom.get_value, - XCom.get_one, - # XCom.get_many, # Not supported because it returns query - XCom.clear, - XCom.set, - Variable._set, - Variable._update, - Variable._delete, - DAG.fetch_callback, - DAG.fetch_dagrun, - DagRun.fetch_task_instances, - DagRun.get_previous_dagrun, - DagRun.get_previous_scheduled_dagrun, - DagRun.get_task_instances, - DagRun.fetch_task_instance, - DagRun._get_log_template, - RenderedTaskInstanceFields._update_runtime_evaluated_template_fields, - SerializedDagModel.get_serialized_dag, - SkipMixin._skip, - SkipMixin._skip_all_except, - TaskInstance._check_and_change_state_before_execution, - TaskInstance.get_task_instance, - TaskInstance._get_dagrun, - TaskInstance._set_state, - TaskInstance.save_to_db, - TaskInstance._clear_xcom_data, - TaskInstance._register_asset_changes_int, - Trigger.from_object, - Trigger.bulk_fetch, - Trigger.clean_unused, - Trigger.submit_event, - Trigger.submit_failure, - Trigger.ids_for_triggerer, - Trigger.assign_unassigned, - ] - return {f"{func.__module__}.{func.__qualname__}": func for func in functions} - - -def log_and_build_error_response(message, status): - error_id = uuid4() - server_message = message + f" error_id={error_id}" - log.exception(server_message) - client_message = message + f" The server side traceback may be identified with error_id={error_id}" - return Response(response=client_message, status=status) - - -def internal_airflow_api(body: dict[str, Any]) -> APIResponse: - """Handle Internal API /internal_api/v1/rpcapi endpoint.""" - content_type = request.headers.get("Content-Type") - if content_type != "application/json": - raise PermissionDenied("Expected Content-Type: application/json") - accept = request.headers.get("Accept") - if accept != "application/json": - raise PermissionDenied("Expected Accept: application/json") - auth = request.headers.get("Authorization", "") - clock_grace = conf.getint("core", "internal_api_clock_grace", fallback=30) - signer = JWTSigner( - secret_key=conf.get("core", "internal_api_secret_key"), - expiration_time_in_seconds=clock_grace, - leeway_in_seconds=clock_grace, - audience="api", - ) - try: - payload = signer.verify_token(auth) - signed_method = payload.get("method") - if not signed_method or signed_method != body.get("method"): - raise BadSignature("Invalid method in token authorization.") - except BadSignature: - raise PermissionDenied("Bad Signature. Please use only the tokens provided by the API.") - except InvalidAudienceError: - raise PermissionDenied("Invalid audience for the request") - except InvalidSignatureError: - raise PermissionDenied("The signature of the request was wrong") - except ImmatureSignatureError: - raise PermissionDenied("The signature of the request was sent from the future") - except ExpiredSignatureError: - raise PermissionDenied( - "The signature of the request has expired. Make sure that all components " - "in your system have synchronized clocks.", - ) - except InvalidIssuedAtError: - raise PermissionDenied( - "The request was issues in the future. Make sure that all components " - "in your system have synchronized clocks.", - ) - except Exception: - raise PermissionDenied("Unable to authenticate API via token.") - - log.debug("Got request") - json_rpc = body.get("jsonrpc") - if json_rpc != "2.0": - return log_and_build_error_response(message="Expected jsonrpc 2.0 request.", status=400) - - methods_map = initialize_method_map() - method_name = body.get("method") - if method_name not in methods_map: - return log_and_build_error_response(message=f"Unrecognized method: {method_name}.", status=400) - - handler = methods_map[method_name] - params = {} - try: - if body.get("params"): - params_json = body.get("params") - params = BaseSerialization.deserialize(params_json, use_pydantic_models=True) - except Exception: - return log_and_build_error_response(message="Error deserializing parameters.", status=400) - - log.info("Calling method %s\nparams: %s", method_name, params) - try: - # Session must be created there as it may be needed by serializer for lazy-loaded fields. - with create_session() as session: - output = handler(**params, session=session) - output_json = BaseSerialization.serialize(output, use_pydantic_models=True) - response = json.dumps(output_json) if output_json is not None else None - log.info("Sending response: %s", response) - return Response(response=response, headers={"Content-Type": "application/json"}) - # In case of AirflowException or other selective known types, transport the exception class back to caller - except (KeyError, AttributeError, AirflowException) as e: - exception_json = BaseSerialization.serialize(e, use_pydantic_models=True) - response = json.dumps(exception_json) - log.info("Sending exception response: %s", response) - return Response(response=response, headers={"Content-Type": "application/json"}) - except Exception: - return log_and_build_error_response(message=f"Error executing method '{method_name}'.", status=500) diff --git a/airflow/api_internal/gunicorn_config.py b/airflow/api_internal/gunicorn_config.py deleted file mode 100644 index 1ed8e1f941e5b..0000000000000 --- a/airflow/api_internal/gunicorn_config.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import setproctitle - -from airflow import settings - - -def post_worker_init(_): - """ - Set process title. - - This is used by airflow.cli.commands.internal_api_command to track the status of the worker. - """ - old_title = setproctitle.getproctitle() - setproctitle.setproctitle(settings.GUNICORN_WORKER_READY_PREFIX + old_title) diff --git a/airflow/api_internal/internal_api_call.py b/airflow/api_internal/internal_api_call.py index 4c0b613b78b4c..881ff9924a0f9 100644 --- a/airflow/api_internal/internal_api_call.py +++ b/airflow/api_internal/internal_api_call.py @@ -17,27 +17,14 @@ from __future__ import annotations -import logging from functools import wraps -from http import HTTPStatus from typing import Callable, TypeVar -from airflow.exceptions import AirflowException from airflow.typing_compat import ParamSpec PS = ParamSpec("PS") RT = TypeVar("RT") -logger = logging.getLogger(__name__) - - -class AirflowHttpException(AirflowException): - """Raise when there is a problem during an http request on the internal API decorator.""" - - def __init__(self, message: str, status_code: HTTPStatus): - super().__init__(message) - self.status_code = status_code - def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]: @wraps(func) diff --git a/airflow/api_internal/openapi/internal_api_v1.yaml b/airflow/api_internal/openapi/internal_api_v1.yaml deleted file mode 100644 index 15995a954a683..0000000000000 --- a/airflow/api_internal/openapi/internal_api_v1.yaml +++ /dev/null @@ -1,101 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - ---- -openapi: 3.0.2 -info: - title: Airflow Internal API - version: 1.0.0 - description: | - This is Airflow Internal API - which is a proxy for components running - customer code for connecting to Airflow Database. - - It is not intended to be used by any external code. - - You can find more information in AIP-44 - https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API - - -servers: - - url: /internal_api/v1 - description: Airflow Internal API -paths: - "/rpcapi": - post: - deprecated: false - x-openapi-router-controller: airflow.api_internal.endpoints.rpc_api_endpoint - operationId: internal_airflow_api - tags: - - JSONRPC - parameters: [] - responses: - '200': - description: Successful response - requestBody: - x-body-name: body - required: true - content: - application/json: - schema: - type: object - required: - - method - - jsonrpc - - params - properties: - jsonrpc: - type: string - default: '2.0' - description: JSON-RPC Version (2.0) - method: - type: string - description: Method name - params: - title: Parameters - type: object - "/health": - get: - operationId: health - deprecated: false - x-openapi-router-controller: airflow.api_internal.endpoints.health_endpoint - tags: - - JSONRPC - parameters: [] - responses: - '200': - description: Successful response -x-headers: [] -x-explorer-enabled: true -x-proxy-enabled: true -components: - schemas: - JsonRpcRequired: - type: object - required: - - method - - jsonrpc - properties: - method: - type: string - description: Method name - jsonrpc: - type: string - default: '2.0' - description: JSON-RPC Version (2.0) - discriminator: - propertyName: method_name -tags: [] diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 21d09d0bca811..0c9703ec7801e 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -696,51 +696,6 @@ def string_lower_type(val): help="The access log format for gunicorn logs", ) - -# internal-api -ARG_INTERNAL_API_PORT = Arg( - ("-p", "--port"), - default=9080, - type=int, - help="The port on which to run the server", -) -ARG_INTERNAL_API_WORKERS = Arg( - ("-w", "--workers"), - default=4, - type=int, - help="Number of workers to run the Internal API-on", -) -ARG_INTERNAL_API_WORKERCLASS = Arg( - ("-k", "--workerclass"), - default="sync", - choices=["sync", "eventlet", "gevent", "tornado"], - help="The worker class to use for Gunicorn", -) -ARG_INTERNAL_API_WORKER_TIMEOUT = Arg( - ("-t", "--worker-timeout"), - default=120, - type=int, - help="The timeout for waiting on Internal API workers", -) -ARG_INTERNAL_API_HOSTNAME = Arg( - ("-H", "--hostname"), - default="0.0.0.0", # nosec - help="Set the hostname on which to run the web server", -) -ARG_INTERNAL_API_ACCESS_LOGFILE = Arg( - ("-A", "--access-logfile"), - help="The logfile to store the access log. Use '-' to print to stdout", -) -ARG_INTERNAL_API_ERROR_LOGFILE = Arg( - ("-E", "--error-logfile"), - help="The logfile to store the error log. Use '-' to print to stderr", -) -ARG_INTERNAL_API_ACCESS_LOGFORMAT = Arg( - ("-L", "--access-logformat"), - help="The access log format for gunicorn logs", -) - - # fastapi-api ARG_FASTAPI_API_PORT = Arg( ("-p", "--port"), @@ -2070,32 +2025,6 @@ class GroupCommand(NamedTuple): ), ] -core_commands.append( - ActionCommand( - name="internal-api", - help="Start an Airflow Internal API instance", - func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"), - args=( - ARG_INTERNAL_API_PORT, - ARG_INTERNAL_API_WORKERS, - ARG_INTERNAL_API_WORKERCLASS, - ARG_INTERNAL_API_WORKER_TIMEOUT, - ARG_INTERNAL_API_HOSTNAME, - ARG_PID, - ARG_DAEMON, - ARG_STDOUT, - ARG_STDERR, - ARG_INTERNAL_API_ACCESS_LOGFILE, - ARG_INTERNAL_API_ERROR_LOGFILE, - ARG_INTERNAL_API_ACCESS_LOGFORMAT, - ARG_LOG_FILE, - ARG_SSL_CERT, - ARG_SSL_KEY, - ARG_DEBUG, - ), - ), -) - def _remove_dag_id_opt(command: ActionCommand): cmd = command._asdict() diff --git a/airflow/cli/commands/internal_api_command.py b/airflow/cli/commands/internal_api_command.py deleted file mode 100644 index 45c930b47fc0f..0000000000000 --- a/airflow/cli/commands/internal_api_command.py +++ /dev/null @@ -1,255 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Internal API command.""" - -from __future__ import annotations - -import logging -import os -import signal -import subprocess -import sys -import textwrap -from contextlib import suppress -from pathlib import Path -from tempfile import gettempdir -from time import sleep - -import psutil -from flask import Flask -from flask_appbuilder import SQLA -from flask_caching import Cache -from flask_wtf.csrf import CSRFProtect -from lockfile.pidlockfile import read_pid_from_pidfile -from sqlalchemy.engine.url import make_url - -from airflow import settings -from airflow.cli.commands.daemon_utils import run_command_with_daemon_option -from airflow.cli.commands.webserver_command import GunicornMonitor -from airflow.configuration import conf -from airflow.exceptions import AirflowConfigException -from airflow.logging_config import configure_logging -from airflow.models import import_all_models -from airflow.utils import cli as cli_utils -from airflow.utils.cli import setup_locations -from airflow.utils.providers_configuration_loader import providers_configuration_loaded -from airflow.www.extensions.init_dagbag import init_dagbag -from airflow.www.extensions.init_jinja_globals import init_jinja_globals -from airflow.www.extensions.init_manifest_files import configure_manifest_files -from airflow.www.extensions.init_security import init_xframe_protection -from airflow.www.extensions.init_views import init_api_internal, init_error_handlers - -log = logging.getLogger(__name__) -app: Flask | None = None - - -@cli_utils.action_cli -@providers_configuration_loaded -def internal_api(args): - """Start Airflow Internal API.""" - print(settings.HEADER) - - access_logfile = args.access_logfile or "-" - error_logfile = args.error_logfile or "-" - access_logformat = args.access_logformat - num_workers = args.workers - worker_timeout = args.worker_timeout - - if args.debug: - log.info("Starting the Internal API server on port %s and host %s.", args.port, args.hostname) - app = create_app(testing=conf.getboolean("core", "unit_test_mode")) - app.run( - debug=True, # nosec - use_reloader=not app.config["TESTING"], - port=args.port, - host=args.hostname, - ) - else: - log.info( - textwrap.dedent( - f"""\ - Running the Gunicorn Server with: - Workers: {num_workers} {args.workerclass} - Host: {args.hostname}:{args.port} - Timeout: {worker_timeout} - Logfiles: {access_logfile} {error_logfile} - Access Logformat: {access_logformat} - =================================================================""" - ) - ) - - pid_file, _, _, _ = setup_locations("internal-api", pid=args.pid) - - run_args = [ - sys.executable, - "-m", - "gunicorn", - "--workers", - str(num_workers), - "--worker-class", - str(args.workerclass), - "--timeout", - str(worker_timeout), - "--bind", - args.hostname + ":" + str(args.port), - "--name", - "airflow-internal-api", - "--pid", - pid_file, - "--access-logfile", - str(access_logfile), - "--error-logfile", - str(error_logfile), - "--config", - "python:airflow.api_internal.gunicorn_config", - ] - - if args.access_logformat and args.access_logformat.strip(): - run_args += ["--access-logformat", str(args.access_logformat)] - - if args.daemon: - run_args += ["--daemon"] - - run_args += ["airflow.cli.commands.internal_api_command:cached_app()"] - - # To prevent different workers creating the web app and - # all writing to the database at the same time, we use the --preload option. - # With the preload option, the app is loaded before the workers are forked, and each worker will - # then have a copy of the app - run_args += ["--preload"] - - def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen): - log.info("Received signal: %s. Closing gunicorn.", signum) - gunicorn_master_proc.terminate() - with suppress(TimeoutError): - gunicorn_master_proc.wait(timeout=30) - if isinstance(gunicorn_master_proc, subprocess.Popen): - still_running = gunicorn_master_proc.poll() is not None - else: - still_running = gunicorn_master_proc.is_running() - if still_running: - gunicorn_master_proc.kill() - sys.exit(0) - - def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen): - # Register signal handlers - signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc)) - signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc)) - - # These run forever until SIG{INT, TERM, KILL, ...} signal is sent - GunicornMonitor( - gunicorn_master_pid=gunicorn_master_proc.pid, - num_workers_expected=num_workers, - master_timeout=120, - worker_refresh_interval=30, - worker_refresh_batch_size=1, - reload_on_plugin_change=False, - ).start() - - def start_and_monitor_gunicorn(args): - if args.daemon: - subprocess.Popen(run_args, close_fds=True) - - # Reading pid of gunicorn master as it will be different that - # the one of process spawned above. - gunicorn_master_proc_pid = None - while not gunicorn_master_proc_pid: - sleep(0.1) - gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file) - - # Run Gunicorn monitor - gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) - monitor_gunicorn(gunicorn_master_proc) - else: - with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc: - monitor_gunicorn(gunicorn_master_proc) - - if args.daemon: - # This makes possible errors get reported before daemonization - os.environ["SKIP_DAGS_PARSING"] = "True" - create_app(None) - os.environ.pop("SKIP_DAGS_PARSING") - - pid_file_path = Path(pid_file) - monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}")) - run_command_with_daemon_option( - args=args, - process_name="internal-api", - callback=lambda: start_and_monitor_gunicorn(args), - should_setup_logging=True, - pid_file=monitor_pid_file, - ) - - -def create_app(config=None, testing=False): - """Create a new instance of Airflow Internal API app.""" - flask_app = Flask(__name__) - - flask_app.config["APP_NAME"] = "Airflow Internal API" - flask_app.config["TESTING"] = testing - flask_app.config["SQLALCHEMY_DATABASE_URI"] = conf.get("database", "SQL_ALCHEMY_CONN") - - url = make_url(flask_app.config["SQLALCHEMY_DATABASE_URI"]) - if url.drivername == "sqlite" and url.database and not url.database.startswith("/"): - raise AirflowConfigException( - f'Cannot use relative path: `{conf.get("database", "SQL_ALCHEMY_CONN")}` to connect to sqlite. ' - "Please use absolute path such as `sqlite:////tmp/airflow.db`." - ) - - flask_app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False - - flask_app.config["SESSION_COOKIE_HTTPONLY"] = True - flask_app.config["SESSION_COOKIE_SAMESITE"] = "Lax" - - if config: - flask_app.config.from_mapping(config) - - if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config: - flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args() - - csrf = CSRFProtect() - csrf.init_app(flask_app) - - db = SQLA() - db.session = settings.Session - db.init_app(flask_app) - - init_dagbag(flask_app) - - cache_config = {"CACHE_TYPE": "flask_caching.backends.filesystem", "CACHE_DIR": gettempdir()} - Cache(app=flask_app, config=cache_config) - - configure_logging() - configure_manifest_files(flask_app) - - import_all_models() - - with flask_app.app_context(): - init_error_handlers(flask_app) - init_api_internal(flask_app, standalone_api=True) - - init_jinja_globals(flask_app) - init_xframe_protection(flask_app) - return flask_app - - -def cached_app(config=None, testing=False): - """Return cached instance of Airflow Internal API app.""" - global app - if not app: - app = create_app(config=config, testing=testing) - return app diff --git a/docs/conf.py b/docs/conf.py index f2cebd8c89de9..5426818232534 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -202,6 +202,7 @@ exclude_patterns = [ # We only link to selected subpackages. "_api/airflow/index.rst", + "_api/internal_api_call/index.rst", # Included in the cluster-policies doc "_api/airflow/policies/index.rst", "README.rst", diff --git a/scripts/cov/cli_coverage.py b/scripts/cov/cli_coverage.py index 043010676ff45..ead4873cb7299 100644 --- a/scripts/cov/cli_coverage.py +++ b/scripts/cov/cli_coverage.py @@ -37,7 +37,6 @@ "airflow/cli/commands/dag_processor_command.py", "airflow/cli/commands/db_command.py", "airflow/cli/commands/info_command.py", - "airflow/cli/commands/internal_api_command.py", "airflow/cli/commands/jobs_command.py", "airflow/cli/commands/kubernetes_command.py", "airflow/cli/commands/plugins_command.py",