From a1e6e598ed834f0a3d63d0215b73df04e9c12dbc Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Mon, 24 Jun 2024 17:14:01 +0200 Subject: [PATCH] Revert "fix: scheduler crashing with OL provider on airflow standalone (#40353)" (#40402) This reverts commit fbcee8d01bddd100d9335404796a40247a6c6487. --- .../providers/openlineage/plugins/listener.py | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 43553e8ba9b4f..12ebe7c6e6f63 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -62,18 +62,6 @@ def _get_try_number_success(val): return val.try_number - 1 -def _executor_initializer(): - """ - Initialize worker processes for the executor used for DagRun listener. - - This function must be picklable, so it cannot be defined as an inner method or local function. - - Reconfigures the ORM engine to prevent issues that arise when multiple processes interact with - the Airflow database. - """ - settings.configure_orm() - - class OpenLineageListener: """OpenLineage listener sends events on task instance and dag run starts, completes and failures.""" @@ -378,10 +366,16 @@ def _fork_execute(self, callable, callable_name: str): @property def executor(self) -> ProcessPoolExecutor: + # Executor for dag_run listener + def initializer(): + # Re-configure the ORM engine as there are issues with multiple processes + # if process calls Airflow DB. + settings.configure_orm() + if not self._executor: self._executor = ProcessPoolExecutor( max_workers=conf.dag_state_change_process_pool_size(), - initializer=_executor_initializer(), + initializer=initializer, ) return self._executor