Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: OL dag start event not being emitted #42448

Merged
merged 1 commit into from
Sep 26, 2024

Conversation

kacpermuda
Copy link
Contributor

@kacpermuda kacpermuda commented Sep 24, 2024

In apache-airflow-providers-openlineage==1.12.0 after changes from #41690 the DAG start events are not being emitted properly.

I think the main cause is trying to pass run_id to adapter's dag_started method

self.submit_callable(
self.adapter.dag_started,
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
logical_date=dag_run.logical_date,
start_date=dag_run.start_date,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
run_facets=run_facets,
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None,
description=dag_run.dag.description if dag_run.dag else None,
# AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects,
# as it causes lack of some TaskGroup attributes and crashes event emission.
job_facets=get_airflow_job_facet(dag_run=dag_run),
)

when the adapter's method do not accept this argument

def dag_started(
self,
dag_id: str,
logical_date: datetime,
start_date: datetime,
nominal_start_time: str,
nominal_end_time: str,
owners: list[str],
run_facets: dict[str, RunFacet],
description: str | None = None,
job_facets: dict[str, JobFacet] | None = None, # Custom job facets
):

Error in scheduler logs:

[2024-09-24T15:14:49.910+0000] {listener.py:529} WARNING - Failed to submit method to executor
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
TypeError: dag_started() got an unexpected keyword argument 'run_id'
"""

The above exception was the direct cause of the following exception:

TypeError: dag_started() got an unexpected keyword argument 'run_id'

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Signed-off-by: Kacper Muda <mudakacper@gmail.com>
@JDarDagran
Copy link
Contributor

Confirming #41690 broke one of the base functionalities in 1.12.0 release. Thanks @kacpermuda for spotting this and fixing.

@eladkal do you think we could mark 1.12.0 as yanked version and cut new release of the provider witth the next wave?

@eladkal eladkal merged commit 84e8cdf into apache:main Sep 26, 2024
57 checks passed
@kacpermuda kacpermuda deleted the fix-ol-dag-start branch September 26, 2024 17:43
joaopamaral pushed a commit to joaopamaral/airflow that referenced this pull request Oct 21, 2024
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
ellisms pushed a commit to ellisms/airflow that referenced this pull request Nov 13, 2024
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants