diff --git a/CHANGELOG.md b/CHANGELOG.md index 1dda32435cf..1f64c326ebf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ - Support BigQuery-specific aliases `target_dataset` and `target_project` in snapshot configs ([#3694](https://github.com/dbt-labs/dbt/issues/3694), [#3834](https://github.com/dbt-labs/dbt/pull/3834)) - `dbt debug` shows a summary of whether all checks passed or not ([#3831](https://github.com/dbt-labs/dbt/issues/3831), [#3832](https://github.com/dbt-labs/dbt/issues/3831)) -- Fix issue when running the `deps` task after the `list` task in the RPC server ([#3846](https://github.com/dbt-labs/dbt/issues/3846), [#3848](https://github.com/dbt-labs/dbt/pull/3848)) +- Fix issue when running the `deps` task after the `list` task in the RPC server ([#3846](https://github.com/dbt-labs/dbt/issues/3846), [#3848](https://github.com/dbt-labs/dbt/pull/3848), [#3850](https://github.com/dbt-labs/dbt/pull/3850)) - Fix bug with initializing a dataclass that inherits from `typing.Protocol`, specifically for `dbt.config.profile.Profile` ([#3843](https://github.com/dbt-labs/dbt/issues/3843), [#3855](https://github.com/dbt-labs/dbt/pull/3855)) - Introduce a macro, `get_where_subquery`, for tests that use `where` config. Alias filtering subquery as `dbt_subquery` instead of resource identifier ([#3857](https://github.com/dbt-labs/dbt/issues/3857), [#3859](https://github.com/dbt-labs/dbt/issues/3859)) diff --git a/core/dbt/rpc/method.py b/core/dbt/rpc/method.py index 5e9ffdc1707..b1066394260 100644 --- a/core/dbt/rpc/method.py +++ b/core/dbt/rpc/method.py @@ -1,6 +1,5 @@ import inspect from abc import abstractmethod -from copy import deepcopy from typing import List, Optional, Type, TypeVar, Generic, Dict, Any from dbt.dataclass_schema import dbtClassMixin, ValidationError @@ -21,7 +20,7 @@ class RemoteMethod(Generic[Parameters, Result]): METHOD_NAME: Optional[str] = None def __init__(self, args, config): - self.args = deepcopy(args) + self.args = args self.config = config @classmethod diff --git a/core/dbt/rpc/task_manager.py b/core/dbt/rpc/task_manager.py index 6e3eada382b..c5ab227f695 100644 --- a/core/dbt/rpc/task_manager.py +++ b/core/dbt/rpc/task_manager.py @@ -1,3 +1,4 @@ +from copy import deepcopy import threading import uuid from datetime import datetime @@ -155,7 +156,7 @@ def _get_manifest_callable( f'Manifest should not be None if the last parse state is ' f'{state}' ) - return task(self.args, self.config, self.manifest) + return task(deepcopy(self.args), self.config, self.manifest) def rpc_task( self, method_name: str @@ -167,7 +168,7 @@ def rpc_task( elif issubclass(task, RemoteManifestMethod): return self._get_manifest_callable(task) elif issubclass(task, RemoteMethod): - return task(self.args, self.config) + return task(deepcopy(self.args), self.config) else: raise dbt.exceptions.InternalException( f'Got a task with an invalid type! {task} with method ' diff --git a/test/rpc/test_deps.py b/test/rpc/test_deps.py index 764238ef80e..043ada8fd80 100644 --- a/test/rpc/test_deps.py +++ b/test/rpc/test_deps.py @@ -6,73 +6,6 @@ ) -def deps_with_packages(packages, bad_packages, project_dir, profiles_dir, schema): - project = ProjectDefinition( - models={ - 'my_model.sql': 'select 1 as id', - }, - packages={'packages': packages}, - ) - querier_ctx = get_querier( - project_def=project, - project_dir=project_dir, - profiles_dir=profiles_dir, - schema=schema, - test_kwargs={}, - criteria='error', - ) - - with querier_ctx as querier: - # we should be able to run sql queries at startup - querier.is_error(querier.run_sql('select 1 as id')) - - # the status should be an error as deps wil not be defined - querier.is_result(querier.status()) - - # deps should pass - querier.async_wait_for_result(querier.deps()) - - # queries should work after deps - tok1 = querier.is_async_result(querier.run()) - tok2 = querier.is_async_result(querier.run_sql('select 1 as id')) - - querier.is_result(querier.async_wait(tok2)) - querier.is_result(querier.async_wait(tok1)) - - # now break the project - project.packages['packages'] = bad_packages - project.write_packages(project_dir, remove=True) - - # queries should still work because we haven't reloaded - tok1 = querier.is_async_result(querier.run()) - tok2 = querier.is_async_result(querier.run_sql('select 1 as id')) - - querier.is_result(querier.async_wait(tok2)) - querier.is_result(querier.async_wait(tok1)) - - # now run deps again, it should be sad - querier.async_wait_for_error(querier.deps()) - # it should also not be running. - result = querier.is_result(querier.ps(active=True, completed=False)) - assert result['rows'] == [] - - # fix packages again - project.packages['packages'] = packages - project.write_packages(project_dir, remove=True) - # keep queries broken, we haven't run deps yet - querier.is_error(querier.run()) - - # deps should pass now - querier.async_wait_for_result(querier.deps()) - querier.is_result(querier.status()) - - tok1 = querier.is_async_result(querier.run()) - tok2 = querier.is_async_result(querier.run_sql('select 1 as id')) - - querier.is_result(querier.async_wait(tok2)) - querier.is_result(querier.async_wait(tok1)) - - @pytest.mark.parametrize( "packages, bad_packages", # from dbt hub @@ -147,4 +80,110 @@ def deps_with_packages(packages, bad_packages, project_dir, profiles_dir, schema ) @pytest.mark.supported('postgres') def test_rpc_deps_packages(project_root, profiles_root, dbt_profile, unique_schema, packages, bad_packages): - deps_with_packages(packages, bad_packages, project_root, profiles_root, unique_schema) + project = ProjectDefinition( + models={ + 'my_model.sql': 'select 1 as id', + }, + packages={'packages': packages}, + ) + querier_ctx = get_querier( + project_def=project, + project_dir=project_root, + profiles_dir=profiles_root, + schema=unique_schema, + test_kwargs={}, + criteria='error', + ) + with querier_ctx as querier: + # we should be able to run sql queries at startup + querier.is_error(querier.run_sql('select 1 as id')) + + # the status should be an error as deps wil not be defined + querier.is_result(querier.status()) + + # deps should pass + querier.async_wait_for_result(querier.deps()) + + # queries should work after deps + tok1 = querier.is_async_result(querier.run()) + tok2 = querier.is_async_result(querier.run_sql('select 1 as id')) + + querier.is_result(querier.async_wait(tok2)) + querier.is_result(querier.async_wait(tok1)) + + # now break the project + project.packages['packages'] = bad_packages + project.write_packages(project_root, remove=True) + + # queries should still work because we haven't reloaded + tok1 = querier.is_async_result(querier.run()) + tok2 = querier.is_async_result(querier.run_sql('select 1 as id')) + + querier.is_result(querier.async_wait(tok2)) + querier.is_result(querier.async_wait(tok1)) + + # now run deps again, it should be sad + querier.async_wait_for_error(querier.deps()) + # it should also not be running. + result = querier.is_result(querier.ps(active=True, completed=False)) + assert result['rows'] == [] + + # fix packages again + project.packages['packages'] = packages + project.write_packages(project_root, remove=True) + # keep queries broken, we haven't run deps yet + querier.is_error(querier.run()) + + # deps should pass now + querier.async_wait_for_result(querier.deps()) + querier.is_result(querier.status()) + + tok1 = querier.is_async_result(querier.run()) + tok2 = querier.is_async_result(querier.run_sql('select 1 as id')) + + querier.is_result(querier.async_wait(tok2)) + querier.is_result(querier.async_wait(tok1)) + + +@pytest.mark.supported('postgres') +def test_rpc_deps_after_list(project_root, profiles_root, dbt_profile, unique_schema): + packages = [{ + 'package': 'dbt-labs/dbt_utils', + 'version': '0.5.0', + }] + project = ProjectDefinition( + models={ + 'my_model.sql': 'select 1 as id', + }, + packages={'packages': packages}, + ) + querier_ctx = get_querier( + project_def=project, + project_dir=project_root, + profiles_dir=profiles_root, + schema=unique_schema, + test_kwargs={}, + criteria='error', + ) + with querier_ctx as querier: + # we should be able to run sql queries at startup + querier.is_error(querier.run_sql('select 1 as id')) + + # the status should be an error as deps wil not be defined + querier.is_result(querier.status()) + + # deps should pass + querier.async_wait_for_result(querier.deps()) + + # queries should work after deps + tok1 = querier.is_async_result(querier.run()) + tok2 = querier.is_async_result(querier.run_sql('select 1 as id')) + + querier.is_result(querier.async_wait(tok2)) + querier.is_result(querier.async_wait(tok1)) + + # list should pass + querier.list() + + # deps should pass + querier.async_wait_for_result(querier.deps()) diff --git a/test/rpc/util.py b/test/rpc/util.py index f6beb832af5..d97eefab156 100644 --- a/test/rpc/util.py +++ b/test/rpc/util.py @@ -209,6 +209,9 @@ def cli_args(self, cli: str, request_id: int = 1): def deps(self, request_id: int = 1): return self.request(method='deps', request_id=request_id) + def list(self, request_id: int = 1): + return self.request(method='list', request_id=request_id) + def compile( self, models: Optional[Union[str, List[str]]] = None,