Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source freshness task node selection and cli command parity #3554

Merged
merged 6 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
### Features
- Add `dbt build` command to run models, tests, seeds, and snapshots in DAG order. ([#2743] (https://github.com/dbt-labs/dbt/issues/2743), [#3490] (https://github.com/dbt-labs/dbt/issues/3490))

### Breaking changes
- Add full node selection to source freshness command and align selection syntax with other tasks (`dbt source freshness --select source_name` --> `dbt source freshness --select source:souce_name`) and rename `dbt source snapshot-freshness` -> `dbt source freshness`. ([#2987](https://github.com/dbt-labs/dbt/issues/2987), [#3554](https://github.com/dbt-labs/dbt/pull/3554))

### Fixes
- Fix docs generation for cross-db sources in REDSHIFT RA3 node ([#3236](https://github.com/fishtown-analytics/dbt/issues/3236), [#3408](https://github.com/fishtown-analytics/dbt/pull/3408))
- Fix type coercion issues when fetching query result sets ([#2984](https://github.com/fishtown-analytics/dbt/issues/2984), [#3499](https://github.com/fishtown-analytics/dbt/pull/3499))
Expand Down
2 changes: 2 additions & 0 deletions core/dbt/contracts/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ class RPCRunOperationParameters(RPCParameters):
class RPCSourceFreshnessParameters(RPCParameters):
threads: Optional[int] = None
select: Union[None, str, List[str]] = None
exclude: Union[None, str, List[str]] = None
selector: Optional[str] = None


@dataclass
Expand Down
29 changes: 14 additions & 15 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class DBTVersion(argparse.Action):
"""This is very very similar to the builtin argparse._Version action,
except it just calls dbt.version.get_version_information().
"""

def __init__(self,
option_strings,
version=None,
Expand Down Expand Up @@ -755,23 +756,14 @@ def _build_test_subparser(subparsers, base_subparser):
return sub


def _build_source_snapshot_freshness_subparser(subparsers, base_subparser):
def _build_source_freshness_subparser(subparsers, base_subparser):
sub = subparsers.add_parser(
'snapshot-freshness',
'freshness',
parents=[base_subparser],
help='''
Snapshots the current freshness of the project's sources
''',
)
sub.add_argument(
'-s',
'--select',
required=False,
nargs='+',
help='''
Specify the sources to snapshot freshness
''',
dest='selected'
aliases=['snapshot-freshness'],
)
sub.add_argument(
'-o',
Expand All @@ -792,9 +784,16 @@ def _build_source_snapshot_freshness_subparser(subparsers, base_subparser):
)
sub.set_defaults(
cls=freshness_task.FreshnessTask,
which='snapshot-freshness',
rpc_method='snapshot-freshness',
which='source-freshness',
rpc_method='source-freshness',
)
_add_select_argument(
sub,
dest='select',
metavar='SELECTOR',
required=False,
)
_add_common_selector_arguments(sub)
return sub


Expand Down Expand Up @@ -1084,7 +1083,7 @@ def parse_args(args, cls=DBTArgumentParser):
_add_table_mutability_arguments(run_sub, compile_sub)

_build_docs_serve_subparser(docs_subs, base_subparser)
_build_source_snapshot_freshness_subparser(source_subs, base_subparser)
_build_source_freshness_subparser(source_subs, base_subparser)
_build_run_operation_subparser(subs, base_subparser)

if len(args) == 0:
Expand Down
20 changes: 13 additions & 7 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from dbt.logger import print_timestamped_line
from dbt.node_types import NodeType

from dbt.graph import NodeSelector, SelectionSpec, parse_difference
from dbt.graph import ResourceTypeSelector, SelectionSpec, parse_difference
from dbt.contracts.graph.parsed import ParsedSourceDefinition


Expand Down Expand Up @@ -117,7 +117,7 @@ def compile(self, manifest):
return self.node


class FreshnessSelector(NodeSelector):
class FreshnessSelector(ResourceTypeSelector):
def node_is_match(self, node):
if not super().node_is_match(node):
return False
Expand All @@ -137,11 +137,16 @@ def raise_on_first_error(self):
return False

def get_selection_spec(self) -> SelectionSpec:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of non-obvious what's happening here. A comment could be useful.

include = [
'source:{}'.format(s)
for s in (self.args.selected or ['*'])
]
spec = parse_difference(include, None)
"""Generates a selection spec from task arguments to use when
processing graph. A SelectionSpec describes what nodes to select
when creating queue from graph of nodes.
"""
if self.args.selector_name:
# use pre-defined selector (--selector) to create selection spec
spec = self.config.get_selector(self.args.selector_name)
else:
# use --select and --exclude args to create selection spec
spec = parse_difference(self.args.select, self.args.exclude)
return spec

def get_node_selector(self):
Expand All @@ -153,6 +158,7 @@ def get_node_selector(self):
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=[NodeType.Source]
)

def get_runner_type(self, _):
Expand Down
13 changes: 11 additions & 2 deletions core/dbt/task/rpc/project_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,24 @@ class RemoteSourceFreshnessTask(
RPCCommandTask[RPCSourceFreshnessParameters],
FreshnessTask
):
METHOD_NAME = 'snapshot-freshness'
METHOD_NAME = 'source-freshness'

def set_args(self, params: RPCSourceFreshnessParameters) -> None:
self.args.selected = self._listify(params.select)
self.args.select = self._listify(params.select)
self.args.exclude = self._listify(params.exclude)
self.args.selector_name = params.selector
if params.threads is not None:
self.args.threads = params.threads
self.args.output = None


class RemoteSourceSnapshotFreshnessTask(
RemoteSourceFreshnessTask
):
""" Deprecated task method name, aliases to `source-freshness` """
METHOD_NAME = 'snapshot-freshness'


# this is a weird and special method.
class GetManifest(
RemoteManifestMethod[GetManifestParameters, GetManifestResult]
Expand Down
90 changes: 83 additions & 7 deletions test/integration/042_sources_test/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def _run_source_freshness(self):
# by default, our data set is way out of date!
self.freshness_start_time = datetime.utcnow()
results = self.run_dbt_with_vars(
['source', 'snapshot-freshness', '-o', 'target/error_source.json'],
['source', 'freshness', '-o', 'target/error_source.json'],
expect_pass=False
)
self.assertEqual(len(results), 1)
Expand All @@ -292,7 +292,7 @@ def _run_source_freshness(self):
self._set_updated_at_to(timedelta(hours=-12))
self.freshness_start_time = datetime.utcnow()
results = self.run_dbt_with_vars(
['source', 'snapshot-freshness', '-o', 'target/warn_source.json'],
['source', 'freshness', '-o', 'target/warn_source.json'],
)
self.assertEqual(len(results), 1)
self.assertEqual(results[0].status, 'warn')
Expand All @@ -301,7 +301,7 @@ def _run_source_freshness(self):
self._set_updated_at_to(timedelta(hours=-2))
self.freshness_start_time = datetime.utcnow()
results = self.run_dbt_with_vars(
['source', 'snapshot-freshness', '-o', 'target/pass_source.json'],
['source', 'freshness', '-o', 'target/pass_source.json'],
)
self.assertEqual(len(results), 1)
self.assertEqual(results[0].status, 'pass')
Expand All @@ -311,6 +311,38 @@ def _run_source_freshness(self):
def test_postgres_source_freshness(self):
self._run_source_freshness()

@use_profile('postgres')
def test_postgres_source_snapshot_freshness(self):
"""Ensures that the deprecated command `source snapshot-freshness`
aliases to `source freshness` command.
"""
self.freshness_start_time = datetime.utcnow()
results = self.run_dbt_with_vars(
['source', 'snapshot-freshness', '-o', 'target/error_source.json'],
expect_pass=False
)
self.assertEqual(len(results), 1)
self.assertEqual(results[0].status, 'error')
self._assert_freshness_results('target/error_source.json', 'error')

self._set_updated_at_to(timedelta(hours=-12))
self.freshness_start_time = datetime.utcnow()
results = self.run_dbt_with_vars(
['source', 'snapshot-freshness', '-o', 'target/warn_source.json'],
)
self.assertEqual(len(results), 1)
self.assertEqual(results[0].status, 'warn')
self._assert_freshness_results('target/warn_source.json', 'warn')

self._set_updated_at_to(timedelta(hours=-2))
self.freshness_start_time = datetime.utcnow()
results = self.run_dbt_with_vars(
['source', 'snapshot-freshness', '-o', 'target/pass_source.json'],
)
self.assertEqual(len(results), 1)
self.assertEqual(results[0].status, 'pass')
self._assert_freshness_results('target/pass_source.json', 'pass')

@use_profile('snowflake')
def test_snowflake_source_freshness(self):
self._run_source_freshness()
Expand All @@ -323,6 +355,50 @@ def test_redshift_source_freshness(self):
def test_bigquery_source_freshness(self):
self._run_source_freshness()

@use_profile('postgres')
def test_postgres_source_freshness_selection_select(self):
"""Tests node selection using the --select argument."""
self._set_updated_at_to(timedelta(hours=-2))
self.freshness_start_time = datetime.utcnow()
# select source directly
results = self.run_dbt_with_vars(
['source', 'freshness', '--select',
'source:test_source.test_table', '-o', 'target/pass_source.json'],
)
self.assertEqual(len(results), 1)
self.assertEqual(results[0].status, 'pass')
self._assert_freshness_results('target/pass_source.json', 'pass')

@use_profile('postgres')
def test_postgres_source_freshness_selection_exclude(self):
"""Tests node selection using the --select argument. It 'excludes' the
only source in the project so it should return no results."""
self._set_updated_at_to(timedelta(hours=-2))
self.freshness_start_time = datetime.utcnow()
# exclude source directly
results = self.run_dbt_with_vars(
['source', 'freshness', '--exclude',
'source:test_source.test_table', '-o', 'target/exclude_source.json'],
)
self.assertEqual(len(results), 0)

@use_profile('postgres')
def test_postgres_source_freshness_selection_graph_operation(self):
"""Tests node selection using the --select argument with graph
operations. `+descendant_model` == select all nodes `descendant_model`
depends on.
"""
self._set_updated_at_to(timedelta(hours=-2))
self.freshness_start_time = datetime.utcnow()
# select model ancestors
results = self.run_dbt_with_vars(
['source', 'freshness', '--select',
'+descendant_model', '-o', 'target/ancestor_source.json']
)
self.assertEqual(len(results), 1)
self.assertEqual(results[0].status, 'pass')
self._assert_freshness_results('target/ancestor_source.json', 'pass')


class TestSourceFreshnessErrors(SuccessfulSourcesTest):
@property
Expand All @@ -332,7 +408,7 @@ def models(self):
@use_profile('postgres')
def test_postgres_error(self):
results = self.run_dbt_with_vars(
['source', 'snapshot-freshness'],
['source', 'freshness'],
expect_pass=False
)
self.assertEqual(len(results), 1)
Expand All @@ -348,18 +424,18 @@ def models(self):
def test_postgres_all_records(self):
# all records are filtered out
self.run_dbt_with_vars(
['source', 'snapshot-freshness'], expect_pass=False)
['source', 'freshness'], expect_pass=False)
# we should insert a record with #101 that's fresh, but will still fail
# because the filter excludes it
self._set_updated_at_to(timedelta(hours=-2))
self.run_dbt_with_vars(
['source', 'snapshot-freshness'], expect_pass=False)
['source', 'freshness'], expect_pass=False)

# we should now insert a record with #102 that's fresh, and the filter
# includes it
self._set_updated_at_to(timedelta(hours=-2))
results = self.run_dbt_with_vars(
['source', 'snapshot-freshness'], expect_pass=True)
['source', 'freshness'], expect_pass=True)


class TestMalformedSources(BaseSourcesTest):
Expand Down
Loading