Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TriggerDagRunOperator External Link doesn't link with execution date to match to triggered DAG #19264

Closed
1 of 2 tasks
ljades opened this issue Oct 27, 2021 · 21 comments
Closed
1 of 2 tasks
Assignees
Labels
affected_version:2.2 Issues Reported for 2.2 area:core kind:bug This is a clearly a bug

Comments

@ljades
Copy link

ljades commented Oct 27, 2021

Apache Airflow version

2.2.0 (latest released)

Operating System

Debian

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

Using base Docker image astronomerinc/ap-airflow:2.0.2-2-buster.

What happened

The TriggerDagRunOperator embeds an external link that links to the DAG that's being triggered. The problem is, the link currently includes a parameter for restricting to execution dates EARLIER than the execution date of the triggering DAG itself.

This means that the external link is always guaranteed to not link to the right DAG run (and not even show it as an option until you manually change the execution date filter in the new page). It links to the DAG page but never to the right instance without some manual tampering after clicking on the link.

What you expected to happen

The Triggered DAG should point to the execution date of the triggered dag, not the triggerING DAG.

If this is not feasible, it should at least point to the Triggered DAG page with no execution date constraints. At least this would show the latest run, and it will include in the dropdown the triggered run.

How to reproduce

Create a DAG with a TriggerDagRunOperator that triggers another DAG. Run the former DAG, wait for the task to run. Click on the task, click on the "Triggered DAG" external link. It will not show the triggered run that just happened. You will need to manually adjust the date constraint in the upper left of the DAG views, and then select your run.

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@ljades ljades added area:core kind:bug This is a clearly a bug labels Oct 27, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 27, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@ljades
Copy link
Author

ljades commented Oct 27, 2021

Oh, one more thing to note: a band-aid solution I'm currently using is to set the execution_date parameter of the TriggerDagRunOperator to "{{ execution_date }}", which sets it to the execution date of the root DAG itself. The problem with this, however, is that it is sort of telling the trigger to lie about the history of that DAG, and it also means I can't Clear status of the operator and have it attempt a retry of the DAG because the dag run ID gets set based on execution date and can't replace an existing one of the same ID.

@o-nikolas
Copy link
Contributor

Hey @ljades,

I looked into this, but couldn't replicate it on current main branch. Do you have example dag(s) code to repro the issue?

@ljades
Copy link
Author

ljades commented Oct 28, 2021

Hey @ljades,

I looked into this, but couldn't replicate it on current main branch. Do you have example dag(s) code to repro the issue?

Sure! Here's an excerpt that's based on the stuff I have in my own application (left out company-specific stuff, and stitched together methods from different modules).

def get_docker_build_task(dag: DAG, image_name: str) -> DockerBuildPodOperator:
    """..."""
    git_url = "...".format(...)
    build_args = {...}

    return DockerBuildPodOperator(
        task_id="...-docker-build",
        image_name=image_name,
        build_context=git_url,
        dockerfile_path="Dockerfile",
        build_args=build_args,
        dag=dag,
        # Improve completion time of docker build
        resources=get_docker_build_resource_requirements(),
    )

def get_build_short_circuit_skip_task(dag: DAG) -> ShortCircuitOperator:
    """Return a ShortCircuitOperator that sets all following tasks to skip if worker build fails"""
    # Check if the docker build worker task failed
    def is_worker_build_succeeded(**kwargs: Any) -> bool:
        return kwargs["dag_run"].get_task_instance("...-docker-build").state == State.SUCCESS

    return ShortCircuitOperator(
        task_id="short-on-build-fail",
        python_callable=is_worker_build_succeeded,
        trigger_rule="all_done",
        dag=dag,
    )


def get_trigger_dag_run_and_wait_task(dag: DAG, dag_id: str) -> TriggerDagRunOperator:
    """Return a TriggerDagRunOperator for triggering other dags"""
    # The default trigger rule is "all_success": only run this task if previous ones all succeeded
    # We change to "all_done" which means that when the upstream task is done, regardless
    # of status, it moves on.
    trigger_rule = "all_done"

    dag_config = {}

    return TriggerDagRunOperator(
        task_id=f"trigger_and_wait_{dag_id}",
        trigger_dag_id=dag_id,
        wait_for_completion=True,
        conf=dag_config,
        trigger_rule=trigger_rule,
        execution_date="{{execution_date}}",  # Added this param to patch the problem, without it, the external link doesn't link properly
        dag=dag,
    )


def get_fail_if_task(dag: DAG, fail_condition: str = "all_failed") -> BashOperator:
    """Return a BashOperator that only runs trivially when all upstream tasks succeed"""
    return get_simple_sleep_task(
        dag=dag,
        task_name=f"fail_for_{fail_condition}",
        trigger_rule="none_failed_or_skipped",
        sleep_seconds="2",
    )


def get_run_all_subdag() -> DAG:
    """Return a DAG that sequentially runs all other relevant DAGs"""
    dag = DAG(
        dag_id="trigger_OTHERS",
        default_args=...,
        schedule_interval=None,
        tags=[...],
    )

    image_name = "..."
    # Always add the build task for a full run
    build = get_docker_build_task(dag, image_name)
    short_circuit = get_build_short_circuit_skip_task(dag)
    # The short circuit skip will skip the fail status for all downstream
    # tasks, so we need extra status checkers to make sure that doesn't happen.
    # Two potential fail points: if the docker build fails, and if any dag
    # fails. We need two separate ones because if the build failure status checker
    # is downstream from the short circuit, it will short the status checker too.
    fail_if_build_failed = get_fail_if_task(dag, fail_condition="build")
    fail_for_any_subdag = get_fail_if_task(dag, fail_condition="any_subdag")

    build >> short_circuit
    build >> fail_if_build_failed

    # Track a segment containing the dag trigger for each document source
    dag_trigger_tasks = []
    for dag_id in ["my", "list", "of", "dag", "ids", "..."]:

        subdag_trigger_task = get_trigger_dag_run_and_wait_task(
            dag=dag, dag_id=dag_id,
        )

        if len(dag_trigger_tasks):
            subdag_trigger_task.set_upstream(dag_trigger_tasks[-1])
        else:
            subdag_trigger_task.set_upstream(short_circuit)

        subdag_trigger_task.set_downstream(fail_for_any_subdag)
        dag_trigger_tasks.append(subdag_trigger_task)

    return dag


run_all_subdag = get_run_all_subdag()

@o-nikolas
Copy link
Contributor

o-nikolas commented Oct 28, 2021

I see an even more odd behaviour.

In the URL I see a time stamp that is definitely before the triggered dag (it is from the triggering dag), but the UI still renders all recent dags (as if the filter was not applied!). When I click on the "Update" filter button to apply the filter from the URL, I indeed no longer seen the latest triggered dag run.

So I'm observing two issues:

  1. The filter timestamp in the url from the triggering dag does not work at all
  2. When you manually update the filter, it is as @ljades described, where you do not see the latest dag run.

UPDATE:
Actually the more I look at it, the triggered dag's "Data Interval" time matches the time in the triggerer URL. But the "UTC" time is the time the triggered dag actually started running, which is well after this time. So perhaps the filter actually works on the "Data Interval" (full disclosure, I have no idea what that is) and not the "UTC" time, which is why the URL correctly shows the current dag run. But this doesn't explain why @ljades is not seeing the correct behaviour.

@o-nikolas
Copy link
Contributor

o-nikolas commented Oct 28, 2021

In fact, if I modify the execution_date=... url parameter, to any value, it seems to have zero effect on how the page loads. The filter by default just loads to the start of the latest data interval no matter the input in the url.

All this testing is on main, btw.

@o-nikolas
Copy link
Contributor

From reading the views.py code, it's looking for an url argument called base_date not execution_date, which is why the url paramter was having no effect.

@o-nikolas
Copy link
Contributor

Okay, now that I can reproduce it I think I can fix this issue, or at least give it a shot :) @potiuk can you assign this one to me?

Note: This issue aside, I still stand by the statement of the webserver filtering user experience being quite bad. The url paramters don't properly update the UI for example (the date has no effect and the num runs is updated but not to the value you provided). This needs some investment I think.

@uranusjr
Copy link
Member

uranusjr commented Oct 29, 2021

I dug through history and it seems like TriggerDagRunOperator's extra link is simply out of date. The link was added in ff1ab97, at which point /graph indeed took an execution_date:

date_time = www_request.args.get('execution_date')
if date_time:
date_time = timezone.parse(date_time)
else:
date_time = dag.get_latest_execution_date(session=session) or timezone.utcnow()

But the graph endpoint later went through some refactoring and now takes a base_date instead. So we should fix the extra link to pass the right argument. Feel free to submit a fix for this! (And if possible, a way to write tests so this doesn't break again in the future.)

@o-nikolas
Copy link
Contributor

o-nikolas commented Oct 29, 2021

I dug through history and it seems like TriggerDagRunOperator's extra link is simply out of date.

Yupp that's what I saw as well. I'll patch it up so the right arg is passed in.

As for the wrong date being used: after a bit more investigation, it's going to be quite tricky to get the execution date of the triggered dag at the point the extra link code is being evaluated. Because both the triggered dag's execution time and dag run id are both calculated at runtime in the execute method of the operator. So the webserver does not have access to the results of those computations when it's building the link. A DB query could be used to get the correct time perhaps? Not sure if it's worth doing a query each time the link is built for such a feature though (or if it's even possible because you'd need the dag run id for the query). Perhaps just linking to the dag at the current time will suffice. Thoughts?

@uranusjr
Copy link
Member

I think you can store the triggered DAG's execution date on the operator instance, and do operator.execution_date in get_link()?

@ljades
Copy link
Author

ljades commented Oct 29, 2021

Worst case scenario and it's a bit much to query for the trigger datetime for the triggered DAG itself, I mentioned earlier it could help settling for removing the param/filter entirely from the link. It wouldn't zero in on the particular DAG triggered, but it'd still be effective.

@o-nikolas
Copy link
Contributor

I think you can store the triggered DAG's execution date on the operator instance, and do operator.execution_date in get_link()?

Yeah, this was my first thought at well. But operator.execution_date returns None in the get_link() method even if you set self.execution_date = ... in the execute() method. And to me that makes sense because the get_link() code is being evaluated on the webserver whereas the instance that's running execute() is running on the worker/scheduler, so you're not dealing with the same object instance in each case.

But do correct me if I'm wrong here, perhaps there was an error in my testing.

@uranusjr
Copy link
Member

I think you're right. Honestly I don't feel the extra SQL query is that problematic, it's not like the link is rendered in a lot of places, and we can at least cache the result so there's only one additional query for each render.

@o-nikolas
Copy link
Contributor

I think you're right. Honestly I don't feel the extra SQL query is that problematic, it's not like the link is rendered in a lot of places, and we can at least cache the result so there's only one additional query for each render.

Okay, I'll play around with this. I'm still not sure there is the required data for a query either, but I'll try some things out.

@o-nikolas
Copy link
Contributor

So after poking at this a bit more, I don't think it's possible with the current schema:
At the point where we're creating the extra link we have access to the dag id, execution time and task_id of the triggering dag/task. From which we can also query for the dag run and the task instance objects. However, none of these are sufficient to query for the triggered dag run. We know the id of the triggered dag but we don't know the run_id or the execution time of the triggered dag run since these are computed in execute of the triggering dag (unless they are provided by the user as arguments to the operator, in which case we have them) and are not persisted or left as a bread crumbs.

When we create the triggered dag run, the only noteworthy thing we set is externally_triggered=True, but we don't set what externally triggered it. As far as I can tell we simply don't have a field in dag run model to represent what entity externally triggered the dag (whether it be by another dag like in this case, or the cli run dag api, etc). So until we add something like that I think it is impossible to link to the proper location in all cases. Please do correct me if any of the above is wrong though.

Paths forward:

  1. Remove the execution time from the extra link and just link to the triggered dag id alone. This will show the full history of dag runs (up to the default 25 runs of course) and the user can do further filtering if required by hand
  2. 1b) in addition to 1. add the execution time to the url or fetch it from the dag_run if the user provides those optional inputs to the operator. Though this will lead to differing behaviour depending on whether or not those inputs are provided and it may not be clear to the customer why they're seeing said differing behaviour. This is an optimization that may or may not be worth it.
  3. Update the dag run model to include a new field which represents what externally triggered that dag run if externally_triggered is true. This way we can do a query in the future, when building the link, for all dag runs that were triggered by a specific previous dag run (hopefully that makes sense). This is easy to kludge but obviously any changes to the DB schema needs to be done with great care and thoughtfulness.

@augusto-herrmann
Copy link
Contributor

By the way, since execution_date has been renamed to logical_date, but TriggerDagRunOperator still uses this nomenclature, shouldn't this be changed? Like so:

  • rename the execution_date parameter to logical_date
  • using the execution_date parameter in this class would call logical_date for backward compatibility, after raising a deprecation warning

If this makes sense, I can create a new issue about it.

@uranusjr
Copy link
Member

uranusjr commented Sep 8, 2022

Yes, but that’s an entirely different issue and should be worked on separately. Feel free to open a pull request for that, but let’s leave that discussion out of this particular issue.

@eladkal eladkal added the affected_version:2.2 Issues Reported for 2.2 label Sep 16, 2022
@o-nikolas
Copy link
Contributor

I agree with @uranusjr that is a separate issue. Let's resolve this issue and cut a new one?

@eladkal
Copy link
Contributor

eladkal commented Oct 6, 2022

Closed in #19410

@eladkal
Copy link
Contributor

eladkal commented Oct 6, 2022

New issue reference: #26916

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.2 Issues Reported for 2.2 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

5 participants