Skip to content

Commit

Permalink
Revert "fix: scheduler crashing with OL provider on airflow standalone (
Browse files Browse the repository at this point in the history
#40353)" (#40402)

This reverts commit fbcee8d.
  • Loading branch information
kacpermuda authored Jun 24, 2024
1 parent e8a8208 commit a1e6e59
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,6 @@ def _get_try_number_success(val):
return val.try_number - 1


def _executor_initializer():
"""
Initialize worker processes for the executor used for DagRun listener.
This function must be picklable, so it cannot be defined as an inner method or local function.
Reconfigures the ORM engine to prevent issues that arise when multiple processes interact with
the Airflow database.
"""
settings.configure_orm()


class OpenLineageListener:
"""OpenLineage listener sends events on task instance and dag run starts, completes and failures."""

Expand Down Expand Up @@ -378,10 +366,16 @@ def _fork_execute(self, callable, callable_name: str):

@property
def executor(self) -> ProcessPoolExecutor:
# Executor for dag_run listener
def initializer():
# Re-configure the ORM engine as there are issues with multiple processes
# if process calls Airflow DB.
settings.configure_orm()

if not self._executor:
self._executor = ProcessPoolExecutor(
max_workers=conf.dag_state_change_process_pool_size(),
initializer=_executor_initializer(),
initializer=initializer,
)
return self._executor

Expand Down

0 comments on commit a1e6e59

Please sign in to comment.