Skip to content

Commit

Permalink
Fix DagFileProcessor interfering with dags outside its ``processor_su…
Browse files Browse the repository at this point in the history
…bdir`` (#33357)

* Fix standalone DagProcessor interfering with DAG outsite of its subdir

* Add tests

* Update code review

(cherry picked from commit 35b1830)
  • Loading branch information
pierrejeambrun authored and ephraimbuddy committed Aug 28, 2023
1 parent 14c91db commit f44a324
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 10 deletions.
12 changes: 8 additions & 4 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ def _run_processor_manager(
pickle_dags: bool,
async_mode: bool,
) -> None:

# Make this process start as a new process group - that makes it easy
# to kill all sub-process of this at the OS-level, rather than having
# to iterate the child processes
Expand Down Expand Up @@ -793,8 +792,14 @@ def _iter_dag_filelocs(fileloc: str) -> Iterator[str]:
alive_dag_filelocs=dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagModel.deactivate_deleted_dags(dag_filelocs)
DagCode.remove_deleted_code(dag_filelocs)
DagModel.deactivate_deleted_dags(
dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagCode.remove_deleted_code(
dag_filelocs,
processor_subdir=self.get_dag_directory(),
)

return True
return False
Expand Down Expand Up @@ -1133,7 +1138,6 @@ def prepare_file_path_queue(self):
file_paths_recently_processed = []
file_paths_to_stop_watching = set()
for file_path in self._file_paths:

if is_mtime_mode:
try:
files_with_mtime[file_path] = os.path.getmtime(file_path)
Expand Down
13 changes: 12 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3565,16 +3565,27 @@ def set_is_paused(self, is_paused: bool, including_subdags: bool = True, session
def deactivate_deleted_dags(
cls,
alive_dag_filelocs: Container[str],
processor_subdir: str,
session: Session = NEW_SESSION,
) -> None:
"""
Set ``is_active=False`` on the DAGs for which the DAG files have been removed.
:param alive_dag_filelocs: file paths of alive DAGs
:param processor_subdir: dag processor subdir
:param session: ORM Session
"""
log.debug("Deactivating DAGs (for which DAG files are deleted) from %s table ", cls.__tablename__)
dag_models = session.scalars(select(cls).where(cls.fileloc.is_not(None)))
dag_models = session.scalars(
select(cls).where(
cls.fileloc.is_not(None),
or_(
cls.processor_subdir.is_(None),
cls.processor_subdir == processor_subdir,
),
)
)

for dag_model in dag_models:
if dag_model.fileloc not in alive_dag_filelocs:
dag_model.is_active = False
Expand Down
14 changes: 12 additions & 2 deletions airflow/models/dagcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,16 @@ def bulk_sync_to_db(cls, filelocs: Iterable[str], session: Session = NEW_SESSION

@classmethod
@provide_session
def remove_deleted_code(cls, alive_dag_filelocs: Collection[str], session: Session = NEW_SESSION) -> None:
def remove_deleted_code(
cls,
alive_dag_filelocs: Collection[str],
processor_subdir: str,
session: Session = NEW_SESSION,
) -> None:
"""Delete code not included in alive_dag_filelocs.
:param alive_dag_filelocs: file paths of alive DAGs
:param processor_subdir: dag processor subdir
:param session: ORM Session
"""
alive_fileloc_hashes = [cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
Expand All @@ -137,7 +143,11 @@ def remove_deleted_code(cls, alive_dag_filelocs: Collection[str], session: Sessi

session.execute(
delete(cls)
.where(cls.fileloc_hash.notin_(alive_fileloc_hashes), cls.fileloc.notin_(alive_dag_filelocs))
.where(
cls.fileloc_hash.notin_(alive_fileloc_hashes),
cls.fileloc.notin_(alive_dag_filelocs),
cls.fileloc.contains(processor_subdir),
)
.execution_options(synchronize_session="fetch")
)

Expand Down
1 change: 1 addition & 0 deletions airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def remove_deleted_dags(
"""Delete DAGs not included in alive_dag_filelocs.
:param alive_dag_filelocs: file paths of alive DAGs
:param processor_subdir: dag processor subdir
:param session: ORM Session
"""
alive_fileloc_hashes = [DagCode.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
Expand Down
35 changes: 34 additions & 1 deletion tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ def test_remove_file_clears_import_error(self, tmpdir):

@conf_vars({("core", "load_examples"): "False"})
def test_max_runs_when_no_files(self):

child_pipe, parent_pipe = multiprocessing.Pipe()

with TemporaryDirectory(prefix="empty-airflow-dags-") as dags_folder:
Expand Down Expand Up @@ -1001,6 +1000,40 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir):
# assert dag deactivated
assert not dag.get_is_active()

def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, tmpdir):
"""Test DagProcessorJobRunner._refresh_dag_dir should not update dags outside its processor_subdir"""

dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
dag_path = os.path.join(TEST_DAGS_FOLDER, "test_miscellaneous.py")
dagbag.process_file(dag_path)
dag = dagbag.get_dag("miscellaneous_test_dag")
dag.sync_to_db(processor_subdir=str(TEST_DAG_FOLDER))
SerializedDagModel.write_dag(dag, processor_subdir=str(TEST_DAG_FOLDER))

assert SerializedDagModel.has_dag("miscellaneous_test_dag")
assert dag.get_is_active()
assert DagCode.has_dag(dag.fileloc)

manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=TEST_DAG_FOLDER / "subdir2" / "subdir3",
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)
manager.processor.last_dag_dir_refresh_time = timezone.utcnow() - timedelta(minutes=10)

manager.processor._refresh_dag_dir()

assert SerializedDagModel.has_dag("miscellaneous_test_dag")
assert dag.get_is_active()
assert DagCode.has_dag(dag.fileloc)

@conf_vars(
{
("core", "load_examples"): "False",
Expand Down
7 changes: 5 additions & 2 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1303,14 +1303,17 @@ def test_dag_is_deactivated_upon_dagfile_deletion(self):
dag.fileloc = dag_fileloc
session = settings.Session()
with mock.patch("airflow.models.dag.DagCode.bulk_sync_to_db"):
dag.sync_to_db(session=session)
dag.sync_to_db(session=session, processor_subdir="/usr/local/airflow/dags/")

orm_dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one()

assert orm_dag.is_active
assert orm_dag.fileloc == dag_fileloc

DagModel.deactivate_deleted_dags(list_py_file_paths(settings.DAGS_FOLDER))
DagModel.deactivate_deleted_dags(
list_py_file_paths(settings.DAGS_FOLDER),
processor_subdir="/usr/local/airflow/dags/",
)

orm_dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one()
assert not orm_dag.is_active
Expand Down

0 comments on commit f44a324

Please sign in to comment.