From 1637e49c87763aba558f8c49ff1a0733490a02da Mon Sep 17 00:00:00 2001 From: John Horan Date: Fri, 23 Aug 2024 16:29:11 +0100 Subject: [PATCH 1/4] move param consumer --- cosmos/operators/base.py | 2 ++ cosmos/operators/local.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 9a723383f..165e53277 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -139,6 +139,8 @@ def __init__( self.dbt_cmd_global_flags = dbt_cmd_global_flags or [] self.cache_dir = cache_dir self.extra_context = extra_context or {} + kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes + super().__init__(**kwargs) def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]: diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 49bf45293..2ba2b18ff 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -145,7 +145,6 @@ def __init__( self._dbt_runner: dbtRunner | None = None if self.invocation_mode: self._set_invocation_methods() - kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes super().__init__(**kwargs) # For local execution mode, we're consistent with the LoadMode.DBT_LS command in forwarding the environment From 9439b5710ce755a38d762e704dea9a4b80be84d4 Mon Sep 17 00:00:00 2001 From: John Horan Date: Fri, 23 Aug 2024 17:06:11 +0100 Subject: [PATCH 2/4] remove whitespace --- cosmos/operators/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index 165e53277..d82083a23 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -140,7 +140,6 @@ def __init__( self.cache_dir = cache_dir self.extra_context = extra_context or {} kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes - super().__init__(**kwargs) def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]: From b3bd76bcf0483db052066f490491baa95c3c759b Mon Sep 17 00:00:00 2001 From: John Horan Date: Fri, 23 Aug 2024 19:05:45 +0100 Subject: [PATCH 3/4] add tests --- tests/operators/test_kubernetes.py | 67 ++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index d0be2acad..311bb4bc0 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -285,3 +285,70 @@ def test_created_pod(): ] assert container.args == expected_container_args assert container.command == [] + + +@pytest.mark.parametrize( + "operator_class,kwargs,expected_cmd", + [ + ( + DbtSeedKubernetesOperator, + {"full_refresh": True}, + ['dbt', 'seed', '--full-refresh', '--project-dir', 'my/dir'], + ), + ( + DbtBuildKubernetesOperator, + {"full_refresh": True}, + ['dbt', 'build', '--full-refresh', '--project-dir', 'my/dir'], + ), + ( + DbtRunKubernetesOperator, + {"full_refresh": True}, + ['dbt', 'run', '--full-refresh', '--project-dir', 'my/dir'], + ), + ( + DbtTestKubernetesOperator, + {}, + ['dbt', 'test', '--project-dir', 'my/dir'], + ), + ( + DbtTestKubernetesOperator, + {"select": []}, + ['dbt', 'test', '--project-dir', 'my/dir'], + ), + ( + DbtTestKubernetesOperator, + {"full_refresh": True, "select": ["tag:daily"], "exclude": ["tag:disabled"]}, + ['dbt', 'test', '--select', 'tag:daily', '--exclude', 'tag:disabled', '--project-dir', 'my/dir'], + ), + ( + DbtTestKubernetesOperator, + {"full_refresh": True, "selector": "nightly_snowplow"}, + ['dbt', 'test', '--selector', 'nightly_snowplow', '--project-dir', 'my/dir'], + ), + ], +) +def test_operator_execute_with_flags( operator_class, kwargs, expected_cmd): + task = operator_class( + task_id="my-task", + project_dir="my/dir", + **kwargs, + ) + + with patch( + "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.hook", + is_in_cluster=False, + ), patch( + "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup" + ), patch( + "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.get_or_create_pod", + side_effect=ValueError("Mock"), + ) as get_or_create_pod: + try: + task.execute(context={}) + except ValueError as e: + if e != get_or_create_pod.side_effect: + raise + + pod_args = get_or_create_pod.call_args.kwargs["pod_request_obj"].to_dict()['spec']['containers'][0]['args'] + + assert expected_cmd == pod_args From 06f3db3eacd2c1f41028563d4326099e63752b29 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 23 Aug 2024 18:06:03 +0000 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/operators/test_kubernetes.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 311bb4bc0..8e0dda9c0 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -293,41 +293,41 @@ def test_created_pod(): ( DbtSeedKubernetesOperator, {"full_refresh": True}, - ['dbt', 'seed', '--full-refresh', '--project-dir', 'my/dir'], + ["dbt", "seed", "--full-refresh", "--project-dir", "my/dir"], ), ( DbtBuildKubernetesOperator, {"full_refresh": True}, - ['dbt', 'build', '--full-refresh', '--project-dir', 'my/dir'], + ["dbt", "build", "--full-refresh", "--project-dir", "my/dir"], ), ( DbtRunKubernetesOperator, {"full_refresh": True}, - ['dbt', 'run', '--full-refresh', '--project-dir', 'my/dir'], + ["dbt", "run", "--full-refresh", "--project-dir", "my/dir"], ), ( DbtTestKubernetesOperator, {}, - ['dbt', 'test', '--project-dir', 'my/dir'], + ["dbt", "test", "--project-dir", "my/dir"], ), ( DbtTestKubernetesOperator, {"select": []}, - ['dbt', 'test', '--project-dir', 'my/dir'], + ["dbt", "test", "--project-dir", "my/dir"], ), ( DbtTestKubernetesOperator, {"full_refresh": True, "select": ["tag:daily"], "exclude": ["tag:disabled"]}, - ['dbt', 'test', '--select', 'tag:daily', '--exclude', 'tag:disabled', '--project-dir', 'my/dir'], + ["dbt", "test", "--select", "tag:daily", "--exclude", "tag:disabled", "--project-dir", "my/dir"], ), ( DbtTestKubernetesOperator, {"full_refresh": True, "selector": "nightly_snowplow"}, - ['dbt', 'test', '--selector', 'nightly_snowplow', '--project-dir', 'my/dir'], + ["dbt", "test", "--selector", "nightly_snowplow", "--project-dir", "my/dir"], ), ], ) -def test_operator_execute_with_flags( operator_class, kwargs, expected_cmd): +def test_operator_execute_with_flags(operator_class, kwargs, expected_cmd): task = operator_class( task_id="my-task", project_dir="my/dir", @@ -337,9 +337,7 @@ def test_operator_execute_with_flags( operator_class, kwargs, expected_cmd): with patch( "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.hook", is_in_cluster=False, - ), patch( - "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup" - ), patch( + ), patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup"), patch( "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.get_or_create_pod", side_effect=ValueError("Mock"), ) as get_or_create_pod: @@ -349,6 +347,6 @@ def test_operator_execute_with_flags( operator_class, kwargs, expected_cmd): if e != get_or_create_pod.side_effect: raise - pod_args = get_or_create_pod.call_args.kwargs["pod_request_obj"].to_dict()['spec']['containers'][0]['args'] + pod_args = get_or_create_pod.call_args.kwargs["pod_request_obj"].to_dict()["spec"]["containers"][0]["args"] assert expected_cmd == pod_args