From 9e4b038ef0be04ff30308f0ff0b8ef61c49d6624 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Fri, 29 Nov 2024 19:49:40 +0200 Subject: [PATCH] Remove internal api call decorator --- airflow/cli/commands/task_command.py | 2 -- airflow/dag_processing/manager.py | 5 ----- airflow/dag_processing/processor.py | 7 ------- airflow/models/dagwarning.py | 2 -- airflow/models/renderedtifields.py | 2 -- airflow/models/skipmixin.py | 3 --- airflow/models/taskinstance.py | 18 ------------------ airflow/models/variable.py | 4 ---- airflow/models/xcom_arg.py | 2 -- airflow/sensors/base.py | 2 -- airflow/utils/cli_action_loggers.py | 2 -- airflow/utils/log/file_task_handler.py | 2 -- .../airflow/providers/edge/models/edge_job.py | 3 --- .../airflow/providers/edge/models/edge_logs.py | 2 -- .../providers/edge/models/edge_worker.py | 3 --- .../airflow/sdk/definitions/asset/__init__.py | 2 -- 16 files changed, 61 deletions(-) diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 4748aea2bbf68..d4b8638623710 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -35,7 +35,6 @@ from sqlalchemy import select from airflow import settings -from airflow.api_internal.internal_api_call import internal_api_call from airflow.cli.simple_table import AirflowConsole from airflow.configuration import conf from airflow.exceptions import AirflowException, DagRunNotFound, TaskDeferred, TaskInstanceNotFound @@ -193,7 +192,6 @@ def _get_dag_run( raise ValueError(f"unknown create_if_necessary value: {create_if_necessary!r}") -@internal_api_call @provide_session def _get_ti_db_access( dag: DAG, diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index bf0c41cf3b68b..1617c427f37ad 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -42,7 +42,6 @@ from tabulate import tabulate import airflow.models -from airflow.api_internal.internal_api_call import internal_api_call from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.dag_processing.processor import DagFileProcessorProcess @@ -499,7 +498,6 @@ def _scan_stale_dags(self): self.last_deactivate_stale_dags_time = timezone.utcnow() @classmethod - @internal_api_call @provide_session def deactivate_stale_dags( cls, @@ -698,7 +696,6 @@ def _run_parsing_loop(self): poll_time = 0.0 @classmethod - @internal_api_call @provide_session def _fetch_callbacks( cls, @@ -765,7 +762,6 @@ def _refresh_requested_filelocs(self) -> None: self._file_path_queue.appendleft(fileloc) @classmethod - @internal_api_call @provide_session def _get_priority_filelocs(cls, session: Session = NEW_SESSION): """Get filelocs from DB table.""" @@ -831,7 +827,6 @@ def _print_stat(self): self.last_stat_print_time = time.monotonic() @staticmethod - @internal_api_call @provide_session def clear_nonexistent_import_errors( file_paths: list[str] | None, processor_subdir: str | None, session=NEW_SESSION diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index c44da8c857083..03324441c7f66 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -32,7 +32,6 @@ from sqlalchemy import delete, event, select from airflow import settings -from airflow.api_internal.internal_api_call import internal_api_call from airflow.callbacks.callback_requests import ( DagCallbackRequest, TaskCallbackRequest, @@ -430,7 +429,6 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L self._last_num_of_db_queries = 0 @staticmethod - @internal_api_call @provide_session def update_import_errors( file_last_changed: dict[str, datetime], @@ -507,7 +505,6 @@ def get_pools(dag) -> dict[str, set[str]]: return DagFileProcessor._validate_task_pools_and_update_dag_warnings(pool_dict, dag_ids) @classmethod - @internal_api_call @provide_session def _validate_task_pools_and_update_dag_warnings( cls, pool_dict: dict[str, set[str]], dag_ids: set[str], session: Session = NEW_SESSION @@ -545,7 +542,6 @@ def _validate_task_pools_and_update_dag_warnings( session.commit() @classmethod - @internal_api_call @provide_session def execute_callbacks( cls, @@ -582,7 +578,6 @@ def execute_callbacks( session.commit() @classmethod - @internal_api_call @provide_session def execute_callbacks_without_dag( cls, callback_requests: list[CallbackRequest], unit_test_mode: bool, session: Session = NEW_SESSION @@ -621,7 +616,6 @@ def _execute_dag_callbacks(cls, dagbag: DagBag, request: DagCallbackRequest, ses DAG.execute_callback(callbacks, context, dag.dag_id) @classmethod - @internal_api_call @provide_session def _execute_task_callbacks( cls, dagbag: DagBag | None, request: TaskCallbackRequest, unit_test_mode: bool, session: Session @@ -779,7 +773,6 @@ def _cache_last_num_of_db_queries(self, query_counter: _QueryCounter | None = No return self._last_num_of_db_queries @staticmethod - @internal_api_call @provide_session def save_dag_to_db( dags: dict[str, DAG], diff --git a/airflow/models/dagwarning.py b/airflow/models/dagwarning.py index d41b410e1a8d7..2e48f963c13d7 100644 --- a/airflow/models/dagwarning.py +++ b/airflow/models/dagwarning.py @@ -22,7 +22,6 @@ from sqlalchemy import Column, ForeignKeyConstraint, Index, String, Text, delete, false, select -from airflow.api_internal.internal_api_call import internal_api_call from airflow.models.base import Base, StringID from airflow.utils import timezone from airflow.utils.retries import retry_db_transaction @@ -71,7 +70,6 @@ def __hash__(self) -> int: return hash((self.dag_id, self.warning_type)) @classmethod - @internal_api_call @provide_session def purge_inactive_dag_warnings(cls, session: Session = NEW_SESSION) -> None: """ diff --git a/airflow/models/renderedtifields.py b/airflow/models/renderedtifields.py index b8ab93cf41a77..d5e3acc0fff2b 100644 --- a/airflow/models/renderedtifields.py +++ b/airflow/models/renderedtifields.py @@ -36,7 +36,6 @@ from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import relationship -from airflow.api_internal.internal_api_call import internal_api_call from airflow.configuration import conf from airflow.models.base import StringID, TaskInstanceDependencies from airflow.serialization.helpers import serialize_template_field @@ -155,7 +154,6 @@ def _redact(self): self.rendered_fields[field] = redact(rendered, field) @classmethod - @internal_api_call @provide_session def _update_runtime_evaluated_template_fields( cls, ti: TaskInstance, session: Session = NEW_SESSION diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py index 87cd4b2d931e7..8dabf3548844d 100644 --- a/airflow/models/skipmixin.py +++ b/airflow/models/skipmixin.py @@ -23,7 +23,6 @@ from sqlalchemy import update -from airflow.api_internal.internal_api_call import internal_api_call from airflow.exceptions import AirflowException from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone @@ -106,7 +105,6 @@ def skip( SkipMixin._skip(dag_run=dag_run, task_id=task_id, tasks=tasks, map_index=map_index) @staticmethod - @internal_api_call @provide_session def _skip( dag_run: DagRun | DagRunPydantic, @@ -163,7 +161,6 @@ def skip_all_except( SkipMixin._skip_all_except(ti=ti, branch_task_ids=branch_task_ids) @classmethod - @internal_api_call @provide_session def _skip_all_except( cls, diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 591f3549bab39..08b5683ff865a 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -72,7 +72,6 @@ from sqlalchemy_utils import UUIDType from airflow import settings -from airflow.api_internal.internal_api_call import internal_api_call from airflow.assets.manager import asset_manager from airflow.configuration import conf from airflow.exceptions import ( @@ -185,14 +184,12 @@ class TaskReturnCode(Enum): """When task exits with deferral to trigger.""" -@internal_api_call @provide_session def _merge_ti(ti, session: Session = NEW_SESSION): session.merge(ti) session.commit() -@internal_api_call @provide_session def _add_log( event, @@ -215,7 +212,6 @@ def _add_log( ) -@internal_api_call @provide_session def _update_ti_heartbeat(id: str, when: datetime, session: Session = NEW_SESSION): session.execute(update(TaskInstance).where(TaskInstance.id == id).values(last_heartbeat_at=when)) @@ -546,7 +542,6 @@ def clear_task_instances( session.flush() -@internal_api_call @provide_session def _xcom_pull( *, @@ -920,7 +915,6 @@ def _clear_next_method_args(*, task_instance: TaskInstance | TaskInstancePydanti task_instance.next_kwargs = None -@internal_api_call def _get_template_context( *, task_instance: TaskInstance | TaskInstancePydantic, @@ -1095,7 +1089,6 @@ def _is_eligible_to_retry(*, task_instance: TaskInstance | TaskInstancePydantic) @provide_session -@internal_api_call def _handle_failure( *, task_instance: TaskInstance | TaskInstancePydantic, @@ -1208,7 +1201,6 @@ def _refresh_from_task( task_instance_mutation_hook(task_instance) -@internal_api_call @provide_session def _record_task_map_for_downstreams( *, @@ -1546,7 +1538,6 @@ def _get_previous_ti( return dagrun.get_task_instance(task_instance.task_id, session=session) -@internal_api_call @provide_session def _update_rtif(ti, rendered_fields, session: Session = NEW_SESSION): from airflow.models.renderedtifields import RenderedTaskInstanceFields @@ -1577,7 +1568,6 @@ def _coalesce_to_orm_ti(*, ti: TaskInstancePydantic | TaskInstance, session: Ses return ti -@internal_api_call @provide_session def _defer_task( ti: TaskInstance | TaskInstancePydantic, @@ -1652,7 +1642,6 @@ def _defer_task( return ti -@internal_api_call @provide_session def _handle_reschedule( ti, @@ -2142,7 +2131,6 @@ def error(self, session: Session = NEW_SESSION) -> None: session.commit() @classmethod - @internal_api_call @provide_session def get_task_instance( cls, @@ -2195,7 +2183,6 @@ def refresh_from_task(self, task: Operator, pool_override: str | None = None) -> _refresh_from_task(task_instance=self, task=task, pool_override=pool_override) @staticmethod - @internal_api_call @provide_session def _clear_xcom_data(ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION) -> None: """ @@ -2231,7 +2218,6 @@ def key(self) -> TaskInstanceKey: return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, self.try_number, self.map_index) @staticmethod - @internal_api_call def _set_state(ti: TaskInstance | TaskInstancePydantic, state, session: Session) -> bool: if not isinstance(ti, TaskInstance): ti = session.scalars( @@ -2465,7 +2451,6 @@ def ready_for_retry(self) -> bool: return self.state == TaskInstanceState.UP_FOR_RETRY and self.next_retry_datetime() < timezone.utcnow() @staticmethod - @internal_api_call def _get_dagrun(dag_id, run_id, session) -> DagRun: from airflow.models.dagrun import DagRun # Avoid circular import @@ -2515,7 +2500,6 @@ def ensure_dag( return task_instance.task.dag @classmethod - @internal_api_call @provide_session def _check_and_change_state_before_execution( cls, @@ -2791,7 +2775,6 @@ def _register_asset_changes( TaskInstance._register_asset_changes_int(ti=self, events=events) @staticmethod - @internal_api_call @provide_session def _register_asset_changes_int( ti: TaskInstance, *, events: OutletEventAccessors, session: Session = NEW_SESSION @@ -3174,7 +3157,6 @@ def fetch_handle_failure_context( } @staticmethod - @internal_api_call @provide_session def save_to_db(ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION): ti = _coalesce_to_orm_ti(ti=ti, session=session) diff --git a/airflow/models/variable.py b/airflow/models/variable.py index 563cac46e8c84..4b3e4f77e47e1 100644 --- a/airflow/models/variable.py +++ b/airflow/models/variable.py @@ -25,7 +25,6 @@ from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlalchemy.orm import declared_attr, reconstructor, synonym -from airflow.api_internal.internal_api_call import internal_api_call from airflow.configuration import ensure_secrets_loaded from airflow.models.base import ID_LEN, Base from airflow.models.crypto import get_fernet @@ -182,7 +181,6 @@ def set( @staticmethod @provide_session - @internal_api_call def _set( key: str, value: Any, @@ -238,7 +236,6 @@ def update( @staticmethod @provide_session - @internal_api_call def _update( key: str, value: Any, @@ -279,7 +276,6 @@ def delete(key: str, session: Session = None) -> int: @staticmethod @provide_session - @internal_api_call def _delete(key: str, session: Session = None) -> int: """ Delete an Airflow Variable for a given key. diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py index df643d22b2167..404bbcab6d30f 100644 --- a/airflow/models/xcom_arg.py +++ b/airflow/models/xcom_arg.py @@ -25,7 +25,6 @@ from sqlalchemy import func, or_, select -from airflow.api_internal.internal_api_call import internal_api_call from airflow.exceptions import AirflowException, XComNotFound from airflow.models import MappedOperator, TaskInstance from airflow.models.abstractoperator import AbstractOperator @@ -232,7 +231,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): SetupTeardownContext.set_work_task_roots_and_leaves() -@internal_api_call @provide_session def _get_task_map_length( *, diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index 1c56aa42005a0..b69bf16238431 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -29,7 +29,6 @@ from sqlalchemy import select from airflow import settings -from airflow.api_internal.internal_api_call import internal_api_call from airflow.configuration import conf from airflow.exceptions import ( AirflowException, @@ -83,7 +82,6 @@ def __bool__(self) -> bool: return self.is_done -@internal_api_call @provide_session def _orig_start_date( dag_id: str, task_id: str, run_id: str, map_index: int, try_number: int, session: Session = NEW_SESSION diff --git a/airflow/utils/cli_action_loggers.py b/airflow/utils/cli_action_loggers.py index 568286c11179b..08ad89915eec6 100644 --- a/airflow/utils/cli_action_loggers.py +++ b/airflow/utils/cli_action_loggers.py @@ -28,7 +28,6 @@ import logging from typing import TYPE_CHECKING, Callable -from airflow.api_internal.internal_api_call import internal_api_call from airflow.utils.session import NEW_SESSION, provide_session if TYPE_CHECKING: @@ -121,7 +120,6 @@ def default_action_log(sub_command, user, task_id, dag_id, logical_date, host_na ) -@internal_api_call @provide_session def _default_action_log_internal( *, diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 38b17a9a9f11e..6483b83da86ea 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -31,7 +31,6 @@ import pendulum -from airflow.api_internal.internal_api_call import internal_api_call from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.executors.executor_loader import ExecutorLoader @@ -266,7 +265,6 @@ def close(self): self.handler.close() @staticmethod - @internal_api_call @provide_session def _render_filename_db_access( *, ti: TaskInstance | TaskInstancePydantic, try_number: int, session=None diff --git a/providers/src/airflow/providers/edge/models/edge_job.py b/providers/src/airflow/providers/edge/models/edge_job.py index c591b6d30529e..bd4b3b191c1b9 100644 --- a/providers/src/airflow/providers/edge/models/edge_job.py +++ b/providers/src/airflow/providers/edge/models/edge_job.py @@ -30,7 +30,6 @@ text, ) -from airflow.api_internal.internal_api_call import internal_api_call from airflow.models.base import Base, StringID from airflow.models.taskinstancekey import TaskInstanceKey from airflow.serialization.serialized_objects import add_pydantic_class_type_mapping @@ -127,7 +126,6 @@ def key(self) -> TaskInstanceKey: return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, self.try_number, self.map_index) @staticmethod - @internal_api_call @provide_session def reserve_task( worker_name: str, @@ -170,7 +168,6 @@ def reserve_task( ) @staticmethod - @internal_api_call @provide_session def set_state(task: TaskInstanceKey | tuple, state: TaskInstanceState, session: Session = NEW_SESSION): if isinstance(task, tuple): diff --git a/providers/src/airflow/providers/edge/models/edge_logs.py b/providers/src/airflow/providers/edge/models/edge_logs.py index 29625f5be757b..e58527c18a4c3 100644 --- a/providers/src/airflow/providers/edge/models/edge_logs.py +++ b/providers/src/airflow/providers/edge/models/edge_logs.py @@ -30,7 +30,6 @@ ) from sqlalchemy.dialects.mysql import MEDIUMTEXT -from airflow.api_internal.internal_api_call import internal_api_call from airflow.configuration import conf from airflow.models.base import Base, StringID from airflow.models.taskinstance import TaskInstance @@ -99,7 +98,6 @@ class EdgeLogs(BaseModel, LoggingMixin): model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True) @staticmethod - @internal_api_call @provide_session def push_logs( task: TaskInstanceKey | tuple, diff --git a/providers/src/airflow/providers/edge/models/edge_worker.py b/providers/src/airflow/providers/edge/models/edge_worker.py index a1287fdb96c8f..c48a98b0e0bca 100644 --- a/providers/src/airflow/providers/edge/models/edge_worker.py +++ b/providers/src/airflow/providers/edge/models/edge_worker.py @@ -30,7 +30,6 @@ select, ) -from airflow.api_internal.internal_api_call import internal_api_call from airflow.exceptions import AirflowException from airflow.models.base import Base from airflow.serialization.serialized_objects import add_pydantic_class_type_mapping @@ -225,7 +224,6 @@ def assert_version(sysinfo: dict[str, str]) -> None: ) @staticmethod - @internal_api_call @provide_session def register_worker( worker_name: str, @@ -258,7 +256,6 @@ def register_worker( ) @staticmethod - @internal_api_call @provide_session def set_state( worker_name: str, diff --git a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py index 812c30261bb97..620c89473ffb3 100644 --- a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -34,7 +34,6 @@ import attrs from sqlalchemy import select -from airflow.api_internal.internal_api_call import internal_api_call from airflow.serialization.dag_dependency import DagDependency from airflow.typing_compat import TypedDict from airflow.utils.session import NEW_SESSION, provide_session @@ -492,7 +491,6 @@ def as_expression(self) -> dict[str, Any]: return {"any": [o.as_expression() for o in self.objects]} -@internal_api_call @provide_session def expand_alias_to_assets(alias: str | AssetAlias, *, session: Session = NEW_SESSION) -> list[BaseAsset]: """Expand asset alias to resolved assets."""