From 87749f9f93306d453dd7bd3afd62b5cdf2c138c7 Mon Sep 17 00:00:00 2001 From: David Steinar Asgrimsson Date: Tue, 28 May 2024 14:58:27 +0200 Subject: [PATCH 1/5] Improve error logging in DbtLocalBaseOperator Improve error logging when the `dbt` command returns a non-zero exit code. Instead of raising an `AirflowException` with the full output, log the output using the logger and then raise the exception with a concise error message. This makes the dbt output more readable and not in a single line. --- cosmos/operators/local.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 1104b43fb..c62f708e8 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -179,10 +179,8 @@ def handle_exception_subprocess(self, result: FullOutputSubprocessResult) -> Non if self.skip_exit_code is not None and result.exit_code == self.skip_exit_code: raise AirflowSkipException(f"dbt command returned exit code {self.skip_exit_code}. Skipping.") elif result.exit_code != 0: - raise AirflowException( - f"dbt command failed. The command returned a non-zero exit code {result.exit_code}. Details: ", - *result.full_output, - ) + logger.error("\n".join(result.full_output)) + raise AirflowException(f"dbt command failed. The command returned a non-zero exit code {result.exit_code}.") def handle_exception_dbt_runner(self, result: dbtRunnerResult) -> None: """dbtRunnerResult has an attribute `success` that is False if the command failed.""" From 39d0a9bff45baf9de0c5936e588fc5942c31eaff Mon Sep 17 00:00:00 2001 From: David Steinar Asgrimsson Date: Fri, 31 May 2024 11:25:40 +0200 Subject: [PATCH 2/5] test error logging in DbtLocalBaseOperator --- tests/operators/test_local.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 0f35705b6..c7ede093c 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -24,6 +24,7 @@ parse_number_of_warnings_subprocess, ) from cosmos.exceptions import AirflowCompatibilityError +from cosmos.hooks.subprocess import FullOutputSubprocessResult from cosmos.operators.local import ( DbtBuildLocalOperator, DbtDocsAzureStorageLocalOperator, @@ -957,3 +958,20 @@ def test_dbt_local_operator_on_kill_sigterm(mock_send_sigterm) -> None: dbt_base_operator.on_kill() mock_send_sigterm.assert_called_once() + + +def test_handle_exception_subprocess(caplog): + """ + Test the handle_exception_subprocess method of the DbtLocalBaseOperator class for non-zero dbt exit code. + """ + operator = DbtLocalBaseOperator(profile_config=None) + result = FullOutputSubprocessResult( + exit_code=1, output="test", full_output=["n" * n for n in range(1, 1000)] + ) + + caplog.set_level(logging.ERROR) + # Test when exit_code is non-zero + with pytest.raises(AirflowException) as err_context: + operator.handle_exception_subprocess(result) + assert len(str(err_context.value)) < 100 # Ensure the error message is not too long + assert len(caplog.text) > 1000 # Ensure the log message is not truncated From 635b28f5cc807d1d9c4bb5558c6652746282076b Mon Sep 17 00:00:00 2001 From: David Steinar Asgrimsson Date: Fri, 31 May 2024 11:44:45 +0200 Subject: [PATCH 3/5] use ConcreteDbtLocalBaseOperator --- tests/operators/test_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 68895660b..fbb8ce31d 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -922,7 +922,7 @@ def test_handle_exception_subprocess(caplog): """ Test the handle_exception_subprocess method of the DbtLocalBaseOperator class for non-zero dbt exit code. """ - operator = DbtLocalBaseOperator(profile_config=None) + operator = ConcreteDbtLocalBaseOperator(profile_config=None) result = FullOutputSubprocessResult( exit_code=1, output="test", full_output=["n" * n for n in range(1, 1000)] ) From 92d55c695ec5f46294e024377418275801cd73d0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 31 May 2024 09:45:54 +0000 Subject: [PATCH 4/5] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_local.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index fbb8ce31d..4c8ab1851 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -23,7 +23,6 @@ parse_number_of_warnings_dbt_runner, parse_number_of_warnings_subprocess, ) -from cosmos.exceptions import AirflowCompatibilityError from cosmos.hooks.subprocess import FullOutputSubprocessResult from cosmos.operators.local import ( DbtBuildLocalOperator, @@ -923,9 +922,7 @@ def test_handle_exception_subprocess(caplog): Test the handle_exception_subprocess method of the DbtLocalBaseOperator class for non-zero dbt exit code. """ operator = ConcreteDbtLocalBaseOperator(profile_config=None) - result = FullOutputSubprocessResult( - exit_code=1, output="test", full_output=["n" * n for n in range(1, 1000)] - ) + result = FullOutputSubprocessResult(exit_code=1, output="test", full_output=["n" * n for n in range(1, 1000)]) caplog.set_level(logging.ERROR) # Test when exit_code is non-zero From 7e558f66df0403974be642cc97b8e440bb9a034d Mon Sep 17 00:00:00 2001 From: David Steinar Asgrimsson Date: Fri, 31 May 2024 12:03:13 +0200 Subject: [PATCH 5/5] fix class instantiation args --- tests/operators/test_local.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 4c8ab1851..5513b1c4b 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -921,7 +921,11 @@ def test_handle_exception_subprocess(caplog): """ Test the handle_exception_subprocess method of the DbtLocalBaseOperator class for non-zero dbt exit code. """ - operator = ConcreteDbtLocalBaseOperator(profile_config=None) + operator = ConcreteDbtLocalBaseOperator( + profile_config=None, + task_id="my-task", + project_dir="my/dir", + ) result = FullOutputSubprocessResult(exit_code=1, output="test", full_output=["n" * n for n in range(1, 1000)]) caplog.set_level(logging.ERROR)