Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate DagFileProcessorManager.clear_nonexistent_import_errors to internal API #28976

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from flask import Response

from airflow.api_connexion.types import APIResponse
from airflow.dag_processing.manager import DagFileProcessorManager
from airflow.serialization.serialized_objects import BaseSerialization

log = logging.getLogger(__name__)
Expand All @@ -38,6 +39,7 @@ def _initialize_map() -> dict[str, Callable]:
functions: list[Callable] = [
DagFileProcessor.update_import_errors,
DagModel.get_paused_dag_ids,
DagFileProcessorManager.clear_nonexistent_import_errors,
]
return {f"{func.__module__}.{func.__name__}": func for func in functions}

Expand Down
12 changes: 8 additions & 4 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from tabulate import tabulate

import airflow.models
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
Expand Down Expand Up @@ -725,7 +726,7 @@ def _refresh_dag_dir(self):

try:
self.log.debug("Removing old import errors")
self.clear_nonexistent_import_errors()
DagFileProcessorManager.clear_nonexistent_import_errors(file_paths=self._file_paths)
except Exception:
self.log.exception("Error removing old import errors")

Expand Down Expand Up @@ -769,16 +770,19 @@ def _print_stat(self):
self._log_file_processing_stats(self._file_paths)
self.last_stat_print_time = time.monotonic()

@staticmethod
@internal_api_call
@provide_session
def clear_nonexistent_import_errors(self, session):
def clear_nonexistent_import_errors(file_paths: list[str] | None, session=NEW_SESSION):
"""
Clears import errors for files that no longer exist.

:param file_paths: list of paths to DAG definition files
:param session: session for ORM operations
"""
query = session.query(errors.ImportError)
if self._file_paths:
query = query.filter(~errors.ImportError.filename.in_(self._file_paths))
if file_paths:
query = query.filter(~errors.ImportError.filename.in_(file_paths))
query.delete(synchronize_session="fetch")
session.commit()

Expand Down