Skip to content

Commit

Permalink
Revert "fix: scheduler crashing with OL provider on airflow standalone (
Browse files Browse the repository at this point in the history
apache#40353)" (apache#40402)

This reverts commit fbcee8d.
  • Loading branch information
kacpermuda authored and romsharon98 committed Jul 26, 2024
1 parent d10be0d commit ac2430a
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,6 @@ def _get_try_number_success(val):
return val.try_number - 1


def _executor_initializer():
"""
Initialize worker processes for the executor used for DagRun listener.
This function must be picklable, so it cannot be defined as an inner method or local function.
Reconfigures the ORM engine to prevent issues that arise when multiple processes interact with
the Airflow database.
"""
settings.configure_orm()


class OpenLineageListener:
"""OpenLineage listener sends events on task instance and dag run starts, completes and failures."""

Expand Down Expand Up @@ -378,10 +366,16 @@ def _fork_execute(self, callable, callable_name: str):

@property
def executor(self) -> ProcessPoolExecutor:
# Executor for dag_run listener
def initializer():
# Re-configure the ORM engine as there are issues with multiple processes
# if process calls Airflow DB.
settings.configure_orm()

if not self._executor:
self._executor = ProcessPoolExecutor(
max_workers=conf.dag_state_change_process_pool_size(),
initializer=_executor_initializer(),
initializer=initializer,
)
return self._executor

Expand Down

0 comments on commit ac2430a

Please sign in to comment.