Skip to content

Commit

Permalink
Optimize database calls when there are not objects
Browse files Browse the repository at this point in the history
This saves a few calls when no dataset/alias references were found in
any DAGs.
  • Loading branch information
uranusjr committed Sep 18, 2024
1 parent 96c89c5 commit d1862e6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
31 changes: 23 additions & 8 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ def collect(cls, dags: dict[str, DAG]) -> Self:
)
return coll

def write_datasets(self, *, session: Session) -> dict[str, DatasetModel]:
def add_datasets(self, *, session: Session) -> dict[str, DatasetModel]:
# Optimization: skip all database calls if no datasets were collected.
if not self.datasets:
return {}
orm_datasets: dict[str, DatasetModel] = {
dm.uri: dm
for dm in session.scalars(select(DatasetModel).where(DatasetModel.uri.in_(self.datasets)))
Expand All @@ -305,7 +308,10 @@ def _resolve_dataset_addition() -> Iterator[DatasetModel]:
dataset_manager.create_datasets(list(_resolve_dataset_addition()), session=session)
return orm_datasets

def write_dataset_aliases(self, *, session: Session) -> dict[str, DatasetAliasModel]:
def add_dataset_aliases(self, *, session: Session) -> dict[str, DatasetAliasModel]:
# Optimization: skip all database calls if no dataset aliases were collected.
if not self.dataset_aliases:
return {}
orm_aliases: dict[str, DatasetAliasModel] = {
da.name: da
for da in session.scalars(
Expand All @@ -320,15 +326,18 @@ def write_dataset_aliases(self, *, session: Session) -> dict[str, DatasetAliasMo
session.add(da)
return orm_aliases

def write_dag_dataset_references(
def add_dag_dataset_references(
self,
dags: dict[str, DagModel],
datasets: dict[str, DatasetModel],
*,
session: Session,
) -> None:
# Optimization: No datasets means there are no references to update.
if not datasets:
return
for dag_id, references in self.schedule_dataset_references.items():
# Optimization: no references at all, just clear everything.
# Optimization: no references at all; this is faster than repeated delete().
if not references:
dags[dag_id].schedule_dataset_references = []
continue
Expand All @@ -343,15 +352,18 @@ def write_dag_dataset_references(
if dataset_id not in orm_refs
)

def write_dag_dataset_alias_references(
def add_dag_dataset_alias_references(
self,
dags: dict[str, DagModel],
aliases: dict[str, DatasetAliasModel],
*,
session: Session,
) -> None:
# Optimization: No aliases means there are no references to update.
if not aliases:
return
for dag_id, references in self.schedule_dataset_alias_references.items():
# Optimization: no references at all, just clear everything.
# Optimization: no references at all; this is faster than repeated delete().
if not references:
dags[dag_id].schedule_dataset_alias_references = []
continue
Expand All @@ -366,15 +378,18 @@ def write_dag_dataset_alias_references(
if alias_id not in orm_refs
)

def write_task_dataset_references(
def add_task_dataset_references(
self,
dags: dict[str, DagModel],
datasets: dict[str, DatasetModel],
*,
session: Session,
) -> None:
# Optimization: No datasets means there are no references to update.
if not datasets:
return
for dag_id, references in self.outlet_references.items():
# Optimization: no references at all, just clear everything.
# Optimization: no references at all; this is faster than repeated delete().
if not references:
dags[dag_id].task_outlet_dataset_references = []
continue
Expand Down
13 changes: 7 additions & 6 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2665,13 +2665,14 @@ def bulk_write_to_db(
DagCode.bulk_sync_to_db((dag.fileloc for dag in dags_by_ids.values()), session=session)

dataset_collection = DatasetCollection.collect(dags_by_ids)
orm_datasets = dataset_collection.write_datasets(session=session)
orm_dataset_aliases = dataset_collection.write_dataset_aliases(session=session)
session.flush()

dataset_collection.write_dag_dataset_references(orm_dags, orm_datasets, session=session)
dataset_collection.write_dag_dataset_alias_references(orm_dags, orm_dataset_aliases, session=session)
dataset_collection.write_task_dataset_references(orm_dags, orm_datasets, session=session)
orm_datasets = dataset_collection.add_datasets(session=session)
orm_dataset_aliases = dataset_collection.add_dataset_aliases(session=session)
session.flush() # This populates id so we can create fks in later calls.

dataset_collection.add_dag_dataset_references(orm_dags, orm_datasets, session=session)
dataset_collection.add_dag_dataset_alias_references(orm_dags, orm_dataset_aliases, session=session)
dataset_collection.add_task_dataset_references(orm_dags, orm_datasets, session=session)
session.flush()

@provide_session
Expand Down

0 comments on commit d1862e6

Please sign in to comment.