Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-66 Refactor DagRun to DagVersion association #46626

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

ephraimbuddy
Copy link
Contributor

This removes DagRun.dag_version association and replaces it with dag_versions property on DagRun that collects all the dag_version_ids associated with the task instances of the DagRun.

closes: #46565

@boring-cyborg boring-cyborg bot added area:db-migrations PRs with DB migration area:dev-tools area:Scheduler including HA (high availability) scheduler kind:documentation labels Feb 10, 2025
airflow/models/dag.py Outdated Show resolved Hide resolved
tih = session.scalars(
select(TIH.dag_version_id).where(TIH.run_id == self.run_id, TIH.dag_id == self.dag_id).distinct()
).all()
return list(tis + tih)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably union this instead so it's a single query vs 2.

Copy link
Member

@pierrejeambrun pierrejeambrun Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only issue I see with that is:

A query will be emitted each time we do:

dag_run.dag_versions()

When we will serialize multiple dag_runs (100 of them), this will happen in a loop, resulting in an extra 100 query being executed to fetch dag_versions for each dag_run.

What would be great is the ability to query dag_run and eagerly load that: (pseudo code)

query(DagRun).options(joinedload(DagRun.dag_versions))

This way in the webserver we can eagerly load all the dag_versions of the dag_run query all at the same time in one query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

joinedload won't be possible since we want only the IDs from DR.tis

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be able to do something like this:

query(DagRun).options(
joinedload(DagRun.tis)
joinedload(TaskInstance.dag_versions)
joinedload(DagRun.tihs)
joinedload(TaskInstanceHistory.dag_versions)
)

Basically, joinedload both ti and this, and their respective dag_versions. Not sure if that'd work, it might.

Copy link
Member

@pierrejeambrun pierrejeambrun Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure if this will have any effect on the “dag_versions” method. Counting on caching to not emit the following requests ?

Maybe we can transform that into a relationship or a hybrid property so we can use loading options / explicitly select them ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking more closely, if we just add a tih relationship to dagrun, we should be able to joinedload them - and we don't need the whole version record, just the ids are enough, and those are already on ti/tih.

But, instead of having a method on dagrun to do this, it might be better to have a helper/classmethod that knows how to do this in bulk, e.g. def get_dag_version_ids(list[DagRun], dict[DagRun, list[uuids]]).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using cached_property instead of the property decorator and using union without joinload support? Would this make it much simpler?

    @cache_property
    @provide_session
    def dag_versions(self, session: Session = NEW_SESSION) -> list[DagVersion]:
        """
        Return the DAG versions associated with the TIs of this DagRun.
        :param session: database session
        """
        tis = select(TI.dag_version_id).where(
            TI.dag_id == self.run_id, TI.run_id == self.run_id
        )
        tih = select(TIH.dag_version_id).where(
            TIH.dag_id == self.run_id,
            TIH.run_id == self.run_id,
        )
        return (
            session.execute(
                tis.distinct().union(tih.distinct())
            )
            .scalars()
            .all()
        )

Copy link
Contributor Author

@ephraimbuddy ephraimbuddy Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jedcunningham That will still make a separate query when accessed. I will suggest we keep what we have currently and add the above in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pierrejeambrun , won't it be easier to retain the DagVersion.dag_version_id and filter by the TI.dag_version_id in API response?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jason810496 , your query is good, though it will still make a separate query when accessed, and I'm not sure how cached_property would help since we might not access it repeatedly

tests/models/test_dagrun.py Show resolved Hide resolved
Copy link
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

tih = session.scalars(
select(TIH.dag_version_id).where(TIH.run_id == self.run_id, TIH.dag_id == self.dag_id).distinct()
).all()
return list(tis + tih)
Copy link
Member

@pierrejeambrun pierrejeambrun Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only issue I see with that is:

A query will be emitted each time we do:

dag_run.dag_versions()

When we will serialize multiple dag_runs (100 of them), this will happen in a loop, resulting in an extra 100 query being executed to fetch dag_versions for each dag_run.

What would be great is the ability to query dag_run and eagerly load that: (pseudo code)

query(DagRun).options(joinedload(DagRun.dag_versions))

This way in the webserver we can eagerly load all the dag_versions of the dag_run query all at the same time in one query.

@@ -17,7 +17,7 @@
---
default_stages: [pre-commit, pre-push]
default_language_version:
python: python3
python: python3.12
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that related to this PR ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Had issues with pre-commit. Will remove it when ready

@ephraimbuddy ephraimbuddy force-pushed the refactor-dagrun-dagversion branch from f03b619 to e5360f9 Compare February 11, 2025 17:17
@ephraimbuddy ephraimbuddy marked this pull request as ready for review February 11, 2025 20:13
@ephraimbuddy ephraimbuddy force-pushed the refactor-dagrun-dagversion branch from e5360f9 to 954ef68 Compare February 11, 2025 20:13
This removes DagRun.dag_version association and replaces it with dag_versions
property on DagRun that collects all the dag_version_ids associated with the
task instances of the DagRun.

closes: apache#46565
@ephraimbuddy ephraimbuddy force-pushed the refactor-dagrun-dagversion branch from 954ef68 to 48376ac Compare February 12, 2025 14:15
@ephraimbuddy ephraimbuddy force-pushed the refactor-dagrun-dagversion branch from 48376ac to e96cf8a Compare February 12, 2025 22:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:db-migrations PRs with DB migration area:dev-tools area:Scheduler including HA (high availability) scheduler kind:documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AIP-66 Refactor DagRun to DagVersion association
4 participants