Skip to content

Commit

Permalink
deepcopy args when passed down to rpc pask (#3850)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle Wigley authored Sep 10, 2021
1 parent 44e7390 commit 3effade
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 73 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

- Support BigQuery-specific aliases `target_dataset` and `target_project` in snapshot configs ([#3694](https://github.com/dbt-labs/dbt/issues/3694), [#3834](https://github.com/dbt-labs/dbt/pull/3834))
- `dbt debug` shows a summary of whether all checks passed or not ([#3831](https://github.com/dbt-labs/dbt/issues/3831), [#3832](https://github.com/dbt-labs/dbt/issues/3831))
- Fix issue when running the `deps` task after the `list` task in the RPC server ([#3846](https://github.com/dbt-labs/dbt/issues/3846), [#3848](https://github.com/dbt-labs/dbt/pull/3848))
- Fix issue when running the `deps` task after the `list` task in the RPC server ([#3846](https://github.com/dbt-labs/dbt/issues/3846), [#3848](https://github.com/dbt-labs/dbt/pull/3848), [#3850](https://github.com/dbt-labs/dbt/pull/3850))
- Fix bug with initializing a dataclass that inherits from `typing.Protocol`, specifically for `dbt.config.profile.Profile` ([#3843](https://github.com/dbt-labs/dbt/issues/3843), [#3855](https://github.com/dbt-labs/dbt/pull/3855))
- Introduce a macro, `get_where_subquery`, for tests that use `where` config. Alias filtering subquery as `dbt_subquery` instead of resource identifier ([#3857](https://github.com/dbt-labs/dbt/issues/3857), [#3859](https://github.com/dbt-labs/dbt/issues/3859))

Expand Down
3 changes: 1 addition & 2 deletions core/dbt/rpc/method.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import inspect
from abc import abstractmethod
from copy import deepcopy
from typing import List, Optional, Type, TypeVar, Generic, Dict, Any

from dbt.dataclass_schema import dbtClassMixin, ValidationError
Expand All @@ -21,7 +20,7 @@ class RemoteMethod(Generic[Parameters, Result]):
METHOD_NAME: Optional[str] = None

def __init__(self, args, config):
self.args = deepcopy(args)
self.args = args
self.config = config

@classmethod
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/rpc/task_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
import threading
import uuid
from datetime import datetime
Expand Down Expand Up @@ -155,7 +156,7 @@ def _get_manifest_callable(
f'Manifest should not be None if the last parse state is '
f'{state}'
)
return task(self.args, self.config, self.manifest)
return task(deepcopy(self.args), self.config, self.manifest)

def rpc_task(
self, method_name: str
Expand All @@ -167,7 +168,7 @@ def rpc_task(
elif issubclass(task, RemoteManifestMethod):
return self._get_manifest_callable(task)
elif issubclass(task, RemoteMethod):
return task(self.args, self.config)
return task(deepcopy(self.args), self.config)
else:
raise dbt.exceptions.InternalException(
f'Got a task with an invalid type! {task} with method '
Expand Down
175 changes: 107 additions & 68 deletions test/rpc/test_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,73 +6,6 @@
)


def deps_with_packages(packages, bad_packages, project_dir, profiles_dir, schema):
project = ProjectDefinition(
models={
'my_model.sql': 'select 1 as id',
},
packages={'packages': packages},
)
querier_ctx = get_querier(
project_def=project,
project_dir=project_dir,
profiles_dir=profiles_dir,
schema=schema,
test_kwargs={},
criteria='error',
)

with querier_ctx as querier:
# we should be able to run sql queries at startup
querier.is_error(querier.run_sql('select 1 as id'))

# the status should be an error as deps wil not be defined
querier.is_result(querier.status())

# deps should pass
querier.async_wait_for_result(querier.deps())

# queries should work after deps
tok1 = querier.is_async_result(querier.run())
tok2 = querier.is_async_result(querier.run_sql('select 1 as id'))

querier.is_result(querier.async_wait(tok2))
querier.is_result(querier.async_wait(tok1))

# now break the project
project.packages['packages'] = bad_packages
project.write_packages(project_dir, remove=True)

# queries should still work because we haven't reloaded
tok1 = querier.is_async_result(querier.run())
tok2 = querier.is_async_result(querier.run_sql('select 1 as id'))

querier.is_result(querier.async_wait(tok2))
querier.is_result(querier.async_wait(tok1))

# now run deps again, it should be sad
querier.async_wait_for_error(querier.deps())
# it should also not be running.
result = querier.is_result(querier.ps(active=True, completed=False))
assert result['rows'] == []

# fix packages again
project.packages['packages'] = packages
project.write_packages(project_dir, remove=True)
# keep queries broken, we haven't run deps yet
querier.is_error(querier.run())

# deps should pass now
querier.async_wait_for_result(querier.deps())
querier.is_result(querier.status())

tok1 = querier.is_async_result(querier.run())
tok2 = querier.is_async_result(querier.run_sql('select 1 as id'))

querier.is_result(querier.async_wait(tok2))
querier.is_result(querier.async_wait(tok1))


@pytest.mark.parametrize(
"packages, bad_packages",
# from dbt hub
Expand Down Expand Up @@ -147,4 +80,110 @@ def deps_with_packages(packages, bad_packages, project_dir, profiles_dir, schema
)
@pytest.mark.supported('postgres')
def test_rpc_deps_packages(project_root, profiles_root, dbt_profile, unique_schema, packages, bad_packages):
deps_with_packages(packages, bad_packages, project_root, profiles_root, unique_schema)
project = ProjectDefinition(
models={
'my_model.sql': 'select 1 as id',
},
packages={'packages': packages},
)
querier_ctx = get_querier(
project_def=project,
project_dir=project_root,
profiles_dir=profiles_root,
schema=unique_schema,
test_kwargs={},
criteria='error',
)
with querier_ctx as querier:
# we should be able to run sql queries at startup
querier.is_error(querier.run_sql('select 1 as id'))

# the status should be an error as deps wil not be defined
querier.is_result(querier.status())

# deps should pass
querier.async_wait_for_result(querier.deps())

# queries should work after deps
tok1 = querier.is_async_result(querier.run())
tok2 = querier.is_async_result(querier.run_sql('select 1 as id'))

querier.is_result(querier.async_wait(tok2))
querier.is_result(querier.async_wait(tok1))

# now break the project
project.packages['packages'] = bad_packages
project.write_packages(project_root, remove=True)

# queries should still work because we haven't reloaded
tok1 = querier.is_async_result(querier.run())
tok2 = querier.is_async_result(querier.run_sql('select 1 as id'))

querier.is_result(querier.async_wait(tok2))
querier.is_result(querier.async_wait(tok1))

# now run deps again, it should be sad
querier.async_wait_for_error(querier.deps())
# it should also not be running.
result = querier.is_result(querier.ps(active=True, completed=False))
assert result['rows'] == []

# fix packages again
project.packages['packages'] = packages
project.write_packages(project_root, remove=True)
# keep queries broken, we haven't run deps yet
querier.is_error(querier.run())

# deps should pass now
querier.async_wait_for_result(querier.deps())
querier.is_result(querier.status())

tok1 = querier.is_async_result(querier.run())
tok2 = querier.is_async_result(querier.run_sql('select 1 as id'))

querier.is_result(querier.async_wait(tok2))
querier.is_result(querier.async_wait(tok1))


@pytest.mark.supported('postgres')
def test_rpc_deps_after_list(project_root, profiles_root, dbt_profile, unique_schema):
packages = [{
'package': 'dbt-labs/dbt_utils',
'version': '0.5.0',
}]
project = ProjectDefinition(
models={
'my_model.sql': 'select 1 as id',
},
packages={'packages': packages},
)
querier_ctx = get_querier(
project_def=project,
project_dir=project_root,
profiles_dir=profiles_root,
schema=unique_schema,
test_kwargs={},
criteria='error',
)
with querier_ctx as querier:
# we should be able to run sql queries at startup
querier.is_error(querier.run_sql('select 1 as id'))

# the status should be an error as deps wil not be defined
querier.is_result(querier.status())

# deps should pass
querier.async_wait_for_result(querier.deps())

# queries should work after deps
tok1 = querier.is_async_result(querier.run())
tok2 = querier.is_async_result(querier.run_sql('select 1 as id'))

querier.is_result(querier.async_wait(tok2))
querier.is_result(querier.async_wait(tok1))

# list should pass
querier.list()

# deps should pass
querier.async_wait_for_result(querier.deps())
3 changes: 3 additions & 0 deletions test/rpc/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ def cli_args(self, cli: str, request_id: int = 1):
def deps(self, request_id: int = 1):
return self.request(method='deps', request_id=request_id)

def list(self, request_id: int = 1):
return self.request(method='list', request_id=request_id)

def compile(
self,
models: Optional[Union[str, List[str]]] = None,
Expand Down

0 comments on commit 3effade

Please sign in to comment.