Skip to content

Commit

Permalink
Remove internal api call decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
shahar1 committed Nov 30, 2024
1 parent f58bd73 commit 9e4b038
Show file tree
Hide file tree
Showing 16 changed files with 0 additions and 61 deletions.
2 changes: 0 additions & 2 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from sqlalchemy import select

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagRunNotFound, TaskDeferred, TaskInstanceNotFound
Expand Down Expand Up @@ -193,7 +192,6 @@ def _get_dag_run(
raise ValueError(f"unknown create_if_necessary value: {create_if_necessary!r}")


@internal_api_call
@provide_session
def _get_ti_db_access(
dag: DAG,
Expand Down
5 changes: 0 additions & 5 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from tabulate import tabulate

import airflow.models
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
Expand Down Expand Up @@ -499,7 +498,6 @@ def _scan_stale_dags(self):
self.last_deactivate_stale_dags_time = timezone.utcnow()

@classmethod
@internal_api_call
@provide_session
def deactivate_stale_dags(
cls,
Expand Down Expand Up @@ -698,7 +696,6 @@ def _run_parsing_loop(self):
poll_time = 0.0

@classmethod
@internal_api_call
@provide_session
def _fetch_callbacks(
cls,
Expand Down Expand Up @@ -765,7 +762,6 @@ def _refresh_requested_filelocs(self) -> None:
self._file_path_queue.appendleft(fileloc)

@classmethod
@internal_api_call
@provide_session
def _get_priority_filelocs(cls, session: Session = NEW_SESSION):
"""Get filelocs from DB table."""
Expand Down Expand Up @@ -831,7 +827,6 @@ def _print_stat(self):
self.last_stat_print_time = time.monotonic()

@staticmethod
@internal_api_call
@provide_session
def clear_nonexistent_import_errors(
file_paths: list[str] | None, processor_subdir: str | None, session=NEW_SESSION
Expand Down
7 changes: 0 additions & 7 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from sqlalchemy import delete, event, select

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import (
DagCallbackRequest,
TaskCallbackRequest,
Expand Down Expand Up @@ -430,7 +429,6 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L
self._last_num_of_db_queries = 0

@staticmethod
@internal_api_call
@provide_session
def update_import_errors(
file_last_changed: dict[str, datetime],
Expand Down Expand Up @@ -507,7 +505,6 @@ def get_pools(dag) -> dict[str, set[str]]:
return DagFileProcessor._validate_task_pools_and_update_dag_warnings(pool_dict, dag_ids)

@classmethod
@internal_api_call
@provide_session
def _validate_task_pools_and_update_dag_warnings(
cls, pool_dict: dict[str, set[str]], dag_ids: set[str], session: Session = NEW_SESSION
Expand Down Expand Up @@ -545,7 +542,6 @@ def _validate_task_pools_and_update_dag_warnings(
session.commit()

@classmethod
@internal_api_call
@provide_session
def execute_callbacks(
cls,
Expand Down Expand Up @@ -582,7 +578,6 @@ def execute_callbacks(
session.commit()

@classmethod
@internal_api_call
@provide_session
def execute_callbacks_without_dag(
cls, callback_requests: list[CallbackRequest], unit_test_mode: bool, session: Session = NEW_SESSION
Expand Down Expand Up @@ -621,7 +616,6 @@ def _execute_dag_callbacks(cls, dagbag: DagBag, request: DagCallbackRequest, ses
DAG.execute_callback(callbacks, context, dag.dag_id)

@classmethod
@internal_api_call
@provide_session
def _execute_task_callbacks(
cls, dagbag: DagBag | None, request: TaskCallbackRequest, unit_test_mode: bool, session: Session
Expand Down Expand Up @@ -779,7 +773,6 @@ def _cache_last_num_of_db_queries(self, query_counter: _QueryCounter | None = No
return self._last_num_of_db_queries

@staticmethod
@internal_api_call
@provide_session
def save_dag_to_db(
dags: dict[str, DAG],
Expand Down
2 changes: 0 additions & 2 deletions airflow/models/dagwarning.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

from sqlalchemy import Column, ForeignKeyConstraint, Index, String, Text, delete, false, select

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.models.base import Base, StringID
from airflow.utils import timezone
from airflow.utils.retries import retry_db_transaction
Expand Down Expand Up @@ -71,7 +70,6 @@ def __hash__(self) -> int:
return hash((self.dag_id, self.warning_type))

@classmethod
@internal_api_call
@provide_session
def purge_inactive_dag_warnings(cls, session: Session = NEW_SESSION) -> None:
"""
Expand Down
2 changes: 0 additions & 2 deletions airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import relationship

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import conf
from airflow.models.base import StringID, TaskInstanceDependencies
from airflow.serialization.helpers import serialize_template_field
Expand Down Expand Up @@ -155,7 +154,6 @@ def _redact(self):
self.rendered_fields[field] = redact(rendered, field)

@classmethod
@internal_api_call
@provide_session
def _update_runtime_evaluated_template_fields(
cls, ti: TaskInstance, session: Session = NEW_SESSION
Expand Down
3 changes: 0 additions & 3 deletions airflow/models/skipmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

from sqlalchemy import update

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.exceptions import AirflowException
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
Expand Down Expand Up @@ -106,7 +105,6 @@ def skip(
SkipMixin._skip(dag_run=dag_run, task_id=task_id, tasks=tasks, map_index=map_index)

@staticmethod
@internal_api_call
@provide_session
def _skip(
dag_run: DagRun | DagRunPydantic,
Expand Down Expand Up @@ -163,7 +161,6 @@ def skip_all_except(
SkipMixin._skip_all_except(ti=ti, branch_task_ids=branch_task_ids)

@classmethod
@internal_api_call
@provide_session
def _skip_all_except(
cls,
Expand Down
18 changes: 0 additions & 18 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
from sqlalchemy_utils import UUIDType

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.assets.manager import asset_manager
from airflow.configuration import conf
from airflow.exceptions import (
Expand Down Expand Up @@ -185,14 +184,12 @@ class TaskReturnCode(Enum):
"""When task exits with deferral to trigger."""


@internal_api_call
@provide_session
def _merge_ti(ti, session: Session = NEW_SESSION):
session.merge(ti)
session.commit()


@internal_api_call
@provide_session
def _add_log(
event,
Expand All @@ -215,7 +212,6 @@ def _add_log(
)


@internal_api_call
@provide_session
def _update_ti_heartbeat(id: str, when: datetime, session: Session = NEW_SESSION):
session.execute(update(TaskInstance).where(TaskInstance.id == id).values(last_heartbeat_at=when))
Expand Down Expand Up @@ -546,7 +542,6 @@ def clear_task_instances(
session.flush()


@internal_api_call
@provide_session
def _xcom_pull(
*,
Expand Down Expand Up @@ -920,7 +915,6 @@ def _clear_next_method_args(*, task_instance: TaskInstance | TaskInstancePydanti
task_instance.next_kwargs = None


@internal_api_call
def _get_template_context(
*,
task_instance: TaskInstance | TaskInstancePydantic,
Expand Down Expand Up @@ -1095,7 +1089,6 @@ def _is_eligible_to_retry(*, task_instance: TaskInstance | TaskInstancePydantic)


@provide_session
@internal_api_call
def _handle_failure(
*,
task_instance: TaskInstance | TaskInstancePydantic,
Expand Down Expand Up @@ -1208,7 +1201,6 @@ def _refresh_from_task(
task_instance_mutation_hook(task_instance)


@internal_api_call
@provide_session
def _record_task_map_for_downstreams(
*,
Expand Down Expand Up @@ -1546,7 +1538,6 @@ def _get_previous_ti(
return dagrun.get_task_instance(task_instance.task_id, session=session)


@internal_api_call
@provide_session
def _update_rtif(ti, rendered_fields, session: Session = NEW_SESSION):
from airflow.models.renderedtifields import RenderedTaskInstanceFields
Expand Down Expand Up @@ -1577,7 +1568,6 @@ def _coalesce_to_orm_ti(*, ti: TaskInstancePydantic | TaskInstance, session: Ses
return ti


@internal_api_call
@provide_session
def _defer_task(
ti: TaskInstance | TaskInstancePydantic,
Expand Down Expand Up @@ -1652,7 +1642,6 @@ def _defer_task(
return ti


@internal_api_call
@provide_session
def _handle_reschedule(
ti,
Expand Down Expand Up @@ -2142,7 +2131,6 @@ def error(self, session: Session = NEW_SESSION) -> None:
session.commit()

@classmethod
@internal_api_call
@provide_session
def get_task_instance(
cls,
Expand Down Expand Up @@ -2195,7 +2183,6 @@ def refresh_from_task(self, task: Operator, pool_override: str | None = None) ->
_refresh_from_task(task_instance=self, task=task, pool_override=pool_override)

@staticmethod
@internal_api_call
@provide_session
def _clear_xcom_data(ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION) -> None:
"""
Expand Down Expand Up @@ -2231,7 +2218,6 @@ def key(self) -> TaskInstanceKey:
return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, self.try_number, self.map_index)

@staticmethod
@internal_api_call
def _set_state(ti: TaskInstance | TaskInstancePydantic, state, session: Session) -> bool:
if not isinstance(ti, TaskInstance):
ti = session.scalars(
Expand Down Expand Up @@ -2465,7 +2451,6 @@ def ready_for_retry(self) -> bool:
return self.state == TaskInstanceState.UP_FOR_RETRY and self.next_retry_datetime() < timezone.utcnow()

@staticmethod
@internal_api_call
def _get_dagrun(dag_id, run_id, session) -> DagRun:
from airflow.models.dagrun import DagRun # Avoid circular import

Expand Down Expand Up @@ -2515,7 +2500,6 @@ def ensure_dag(
return task_instance.task.dag

@classmethod
@internal_api_call
@provide_session
def _check_and_change_state_before_execution(
cls,
Expand Down Expand Up @@ -2791,7 +2775,6 @@ def _register_asset_changes(
TaskInstance._register_asset_changes_int(ti=self, events=events)

@staticmethod
@internal_api_call
@provide_session
def _register_asset_changes_int(
ti: TaskInstance, *, events: OutletEventAccessors, session: Session = NEW_SESSION
Expand Down Expand Up @@ -3174,7 +3157,6 @@ def fetch_handle_failure_context(
}

@staticmethod
@internal_api_call
@provide_session
def save_to_db(ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION):
ti = _coalesce_to_orm_ti(ti=ti, session=session)
Expand Down
4 changes: 0 additions & 4 deletions airflow/models/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from sqlalchemy.dialects.mysql import MEDIUMTEXT
from sqlalchemy.orm import declared_attr, reconstructor, synonym

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import ensure_secrets_loaded
from airflow.models.base import ID_LEN, Base
from airflow.models.crypto import get_fernet
Expand Down Expand Up @@ -182,7 +181,6 @@ def set(

@staticmethod
@provide_session
@internal_api_call
def _set(
key: str,
value: Any,
Expand Down Expand Up @@ -238,7 +236,6 @@ def update(

@staticmethod
@provide_session
@internal_api_call
def _update(
key: str,
value: Any,
Expand Down Expand Up @@ -279,7 +276,6 @@ def delete(key: str, session: Session = None) -> int:

@staticmethod
@provide_session
@internal_api_call
def _delete(key: str, session: Session = None) -> int:
"""
Delete an Airflow Variable for a given key.
Expand Down
2 changes: 0 additions & 2 deletions airflow/models/xcom_arg.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

from sqlalchemy import func, or_, select

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.exceptions import AirflowException, XComNotFound
from airflow.models import MappedOperator, TaskInstance
from airflow.models.abstractoperator import AbstractOperator
Expand Down Expand Up @@ -232,7 +231,6 @@ def __exit__(self, exc_type, exc_val, exc_tb):
SetupTeardownContext.set_work_task_roots_and_leaves()


@internal_api_call
@provide_session
def _get_task_map_length(
*,
Expand Down
2 changes: 0 additions & 2 deletions airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from sqlalchemy import select

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
Expand Down Expand Up @@ -83,7 +82,6 @@ def __bool__(self) -> bool:
return self.is_done


@internal_api_call
@provide_session
def _orig_start_date(
dag_id: str, task_id: str, run_id: str, map_index: int, try_number: int, session: Session = NEW_SESSION
Expand Down
2 changes: 0 additions & 2 deletions airflow/utils/cli_action_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import logging
from typing import TYPE_CHECKING, Callable

from airflow.api_internal.internal_api_call import internal_api_call
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
Expand Down Expand Up @@ -121,7 +120,6 @@ def default_action_log(sub_command, user, task_id, dag_id, logical_date, host_na
)


@internal_api_call
@provide_session
def _default_action_log_internal(
*,
Expand Down
Loading

0 comments on commit 9e4b038

Please sign in to comment.