From b9bb056c98aef7f9f1c2bbd25bb44ccc361403a8 Mon Sep 17 00:00:00 2001 From: Jakub Dardzinski Date: Wed, 24 Apr 2024 16:11:58 +0200 Subject: [PATCH] Use `ProcessPoolExecutor` over `ThreadPoolExecutor`. Signed-off-by: Jakub Dardzinski Make `max_workers` configurable. Signed-off-by: Jakub Dardzinski --- airflow/providers/openlineage/conf.py | 7 +++++++ .../providers/openlineage/plugins/listener.py | 5 +++-- .../openlineage/plugins/test_listener.py | 21 +++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/airflow/providers/openlineage/conf.py b/airflow/providers/openlineage/conf.py index d43806abcaae8..23e663f67e9bd 100644 --- a/airflow/providers/openlineage/conf.py +++ b/airflow/providers/openlineage/conf.py @@ -104,3 +104,10 @@ def is_disabled() -> bool: # Check if both 'transport' and 'config_path' are not present and also # if legacy 'OPENLINEAGE_URL' environment variables is not set return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == "" + + +@cache +def dag_state_change_process_pool_size() -> int: + """[openlineage] dag_state_change_process_pool_size.""" + option = conf.getint(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback=1) + return option diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 9067d53f69847..73f8c8c79ee92 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -17,7 +17,7 @@ from __future__ import annotations import logging -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ProcessPoolExecutor from datetime import datetime from typing import TYPE_CHECKING @@ -25,6 +25,7 @@ from airflow import __version__ as airflow_version from airflow.listeners import hookimpl +from airflow.providers.openlineage import conf from airflow.providers.openlineage.extractors import ExtractorManager from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState from airflow.providers.openlineage.utils.utils import ( @@ -281,7 +282,7 @@ def on_failure(): @property def executor(self): if not self._executor: - self._executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="openlineage_") + self._executor = ProcessPoolExecutor(max_workers=conf.dag_state_change_process_pool_size()) return self._executor @hookimpl diff --git a/tests/providers/openlineage/plugins/test_listener.py b/tests/providers/openlineage/plugins/test_listener.py index fa651de1b22d9..d9fbb0dfd360d 100644 --- a/tests/providers/openlineage/plugins/test_listener.py +++ b/tests/providers/openlineage/plugins/test_listener.py @@ -526,6 +526,27 @@ def test_listener_on_task_instance_success_do_not_call_adapter_when_disabled_ope listener.adapter.complete_task.assert_not_called() +@pytest.mark.parametrize( + "max_workers,expected", + [ + (None, 1), + ("8", 8), + ], +) +@mock.patch("airflow.providers.openlineage.plugins.listener.ProcessPoolExecutor", autospec=True) +def test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_executor, max_workers, expected): + """mock ProcessPoolExecutor and check if conf.dag_state_change_process_pool_size is applied to max_workers""" + listener = OpenLineageListener() + # mock ProcessPoolExecutor class + try: + with conf_vars({("openlineage", "dag_state_change_process_pool_size"): max_workers}): + listener.on_dag_run_running(mock.MagicMock(), None) + mock_executor.assert_called_once_with(max_workers=expected) + mock_executor.return_value.submit.assert_called_once() + finally: + conf.dag_state_change_process_pool_size.cache_clear() + + class TestOpenLineageSelectiveEnable: def setup_method(self): self.dag = DAG(