From 1340f7de3ed0a8c090fca4d9477d34a196a6e39a Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 12 Nov 2024 20:57:10 +0100 Subject: [PATCH 1/3] Print adapter response --- opendbt/dbt/__init__.py | 2 ++ opendbt/dbt/shared/task/run.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 opendbt/dbt/shared/task/run.py diff --git a/opendbt/dbt/__init__.py b/opendbt/dbt/__init__.py index bd566a9..58e2d41 100644 --- a/opendbt/dbt/__init__.py +++ b/opendbt/dbt/__init__.py @@ -23,6 +23,8 @@ def patch_dbt(): # shared code patches import opendbt.dbt.shared.cli.main + import opendbt.dbt.shared.task.run dbt.cli.main.sqlfluff = opendbt.dbt.shared.cli.main.sqlfluff dbt.cli.main.sqlfluff_lint = opendbt.dbt.shared.cli.main.sqlfluff_lint dbt.cli.main.sqlfluff_fix = opendbt.dbt.shared.cli.main.sqlfluff_fix + dbt.task.run.ModelRunner = opendbt.dbt.shared.task.run.ModelRunner diff --git a/opendbt/dbt/shared/task/run.py b/opendbt/dbt/shared/task/run.py new file mode 100644 index 0000000..37252a2 --- /dev/null +++ b/opendbt/dbt/shared/task/run.py @@ -0,0 +1,34 @@ +from dbt.artifacts.schemas.results import NodeStatus +from dbt.events.types import ( + LogModelResult, +) +from dbt.task import run +from dbt_common.events.base_types import EventLevel +from dbt_common.events.functions import fire_event + + +class ModelRunner(run.ModelRunner): + + def print_result_adapter_response(self, result): + if hasattr(result, 'adapter_response'): + if result.status == NodeStatus.Error: + status = result.status + level = EventLevel.ERROR + else: + status = result.message + level = EventLevel.INFO + fire_event( + LogModelResult( + description=str(result.adapter_response), + status=status, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time, + node_info=self.node.node_info, + ), + level=level, + ) + + def print_result_line(self, result): + super().print_result_line(result) + self.print_result_adapter_response(result=result) From 988ee12b6865a38da3aff0ffc4f53bcf08febc98 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:13:00 +0100 Subject: [PATCH 2/3] Print adapter response --- opendbt/dbt/__init__.py | 8 ++++---- opendbt/dbt/{shared => v18}/task/run.py | 0 2 files changed, 4 insertions(+), 4 deletions(-) rename opendbt/dbt/{shared => v18}/task/run.py (100%) diff --git a/opendbt/dbt/__init__.py b/opendbt/dbt/__init__.py index 58e2d41..a87a098 100644 --- a/opendbt/dbt/__init__.py +++ b/opendbt/dbt/__init__.py @@ -9,22 +9,22 @@ def patch_dbt(): dbt_version = Version(dbt.version.get_installed_version().to_version_string(skip_matcher=True)) if dbt_version >= Version("1.7.0") and dbt_version < Version("1.8.0"): from opendbt.dbt.v17.task.docs.generate import OpenDbtGenerateTask - from opendbt.dbt.v17.adapters.factory import OpenDbtAdapterContainer dbt.task.generate.GenerateTask = OpenDbtGenerateTask + from opendbt.dbt.v17.adapters.factory import OpenDbtAdapterContainer dbt.adapters.factory.FACTORY = OpenDbtAdapterContainer() elif dbt_version >= Version("1.8.0") and dbt_version < Version("1.9.0"): from opendbt.dbt.v18.task.docs.generate import OpenDbtGenerateTask - from opendbt.dbt.v18.adapters.factory import OpenDbtAdapterContainer dbt.task.docs.generate.GenerateTask = OpenDbtGenerateTask + from opendbt.dbt.v18.adapters.factory import OpenDbtAdapterContainer dbt.adapters.factory.FACTORY = OpenDbtAdapterContainer() + from opendbt.dbt.v18.task.run import ModelRunner + dbt.task.run.ModelRunner = ModelRunner else: raise Exception( f"Unsupported dbt version {dbt_version}, please make sure dbt version is supported/integrated by opendbt") # shared code patches import opendbt.dbt.shared.cli.main - import opendbt.dbt.shared.task.run dbt.cli.main.sqlfluff = opendbt.dbt.shared.cli.main.sqlfluff dbt.cli.main.sqlfluff_lint = opendbt.dbt.shared.cli.main.sqlfluff_lint dbt.cli.main.sqlfluff_fix = opendbt.dbt.shared.cli.main.sqlfluff_fix - dbt.task.run.ModelRunner = opendbt.dbt.shared.task.run.ModelRunner diff --git a/opendbt/dbt/shared/task/run.py b/opendbt/dbt/v18/task/run.py similarity index 100% rename from opendbt/dbt/shared/task/run.py rename to opendbt/dbt/v18/task/run.py From 4ae605298aa0b1e306da8442f4ba2354b4515e18 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:17:18 +0100 Subject: [PATCH 3/3] Print adapter response --- opendbt/dbt/__init__.py | 2 ++ opendbt/dbt/v17/task/run.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 opendbt/dbt/v17/task/run.py diff --git a/opendbt/dbt/__init__.py b/opendbt/dbt/__init__.py index a87a098..9ec3333 100644 --- a/opendbt/dbt/__init__.py +++ b/opendbt/dbt/__init__.py @@ -12,6 +12,8 @@ def patch_dbt(): dbt.task.generate.GenerateTask = OpenDbtGenerateTask from opendbt.dbt.v17.adapters.factory import OpenDbtAdapterContainer dbt.adapters.factory.FACTORY = OpenDbtAdapterContainer() + from opendbt.dbt.v17.task.run import ModelRunner + dbt.task.run.ModelRunner = ModelRunner elif dbt_version >= Version("1.8.0") and dbt_version < Version("1.9.0"): from opendbt.dbt.v18.task.docs.generate import OpenDbtGenerateTask dbt.task.docs.generate.GenerateTask = OpenDbtGenerateTask diff --git a/opendbt/dbt/v17/task/run.py b/opendbt/dbt/v17/task/run.py new file mode 100644 index 0000000..5dd3f29 --- /dev/null +++ b/opendbt/dbt/v17/task/run.py @@ -0,0 +1,34 @@ +from dbt.contracts.results import NodeStatus +from dbt.events.base_types import EventLevel +from dbt.events.functions import fire_event +from dbt.events.types import ( + LogModelResult, +) +from dbt.task import run + + +class ModelRunner(run.ModelRunner): + + def print_result_adapter_response(self, result): + if hasattr(result, 'adapter_response'): + if result.status == NodeStatus.Error: + status = result.status + level = EventLevel.ERROR + else: + status = result.message + level = EventLevel.INFO + fire_event( + LogModelResult( + description=str(result.adapter_response), + status=status, + index=self.node_index, + total=self.num_nodes, + execution_time=result.execution_time, + node_info=self.node.node_info, + ), + level=level, + ) + + def print_result_line(self, result): + super().print_result_line(result) + self.print_result_adapter_response(result=result)