From 179b96378251db258d564ba091deef2ab762d12d Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 27 Mar 2024 15:22:48 -0700 Subject: [PATCH] Don't dispose sqlalchemy engine when using internal api (#38562) --- .../providers/celery/executors/celery_executor_utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/providers/celery/executors/celery_executor_utils.py b/airflow/providers/celery/executors/celery_executor_utils.py index b9f915b102475..c5735394a3d7f 100644 --- a/airflow/providers/celery/executors/celery_executor_utils.py +++ b/airflow/providers/celery/executors/celery_executor_utils.py @@ -40,6 +40,7 @@ from sqlalchemy import select import airflow.settings as settings +from airflow.api_internal.internal_api_call import InternalApiConfig from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowTaskTimeout from airflow.executors.base_executor import BaseExecutor @@ -155,8 +156,9 @@ def _execute_in_fork(command_to_exec: CommandType, celery_task_id: str | None = try: from airflow.cli.cli_parser import get_parser - settings.engine.pool.dispose() - settings.engine.dispose() + if not InternalApiConfig.get_use_internal_api(): + settings.engine.pool.dispose() + settings.engine.dispose() parser = get_parser() # [1:] - remove "airflow" from the start of the command @@ -166,6 +168,7 @@ def _execute_in_fork(command_to_exec: CommandType, celery_task_id: str | None = args.external_executor_id = celery_task_id setproctitle(f"airflow task supervisor: {command_to_exec}") + log.debug("calling func '%s' with args %s", args.func.__name__, args) args.func(args) ret = 0 except Exception: