Skip to content

Commit

Permalink
Feature: nullable error_after in source (#3955)
Browse files Browse the repository at this point in the history
* Add nullable error after feature

* add merge_error_after method

* Fix FreshnessThreshold merged test

* Fix other tests

* Fix merge error after

* Fix test docs generate integration test

* Fix source integration test

* Typo and fix linting.

* Fix mypy test

* More terse way to express merge_freshness_time_thresholds

* Update Changelog.md

* Add integration test

* Fix conflict

* Fix contributing.md

* Fix integration tests

* Move up changelog entry

Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com>
  • Loading branch information
kadero and jtcohen6 authored Oct 26, 2021
1 parent c34f353 commit d2aa920
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 21 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## dbt-core 1.0.0 (Release TBD)

### Features
- Allow nullable `error_after` in source freshness ([#3874](https://github.com/dbt-labs/dbt-core/issues/3874), [#3955](https://github.com/dbt-labs/dbt-core/pull/3955))

Contributors:
- [@kadero](https://github.com/kadero) ([3955](https://github.com/dbt-labs/dbt-core/pull/3955))

## dbt-core 1.0.0b2 (October 25, 2021)

### Breaking changes
Expand Down
19 changes: 12 additions & 7 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,25 @@ def plural(self) -> str:


@dataclass
class Time(dbtClassMixin, Replaceable):
count: int
period: TimePeriod
class Time(dbtClassMixin, Mergeable):
count: Optional[int] = None
period: Optional[TimePeriod] = None

def exceeded(self, actual_age: float) -> bool:
kwargs = {self.period.plural(): self.count}
if self.period is None or self.count is None:
return False
kwargs: Dict[str, int] = {self.period.plural(): self.count}
difference = timedelta(**kwargs).total_seconds()
return actual_age > difference

def __bool__(self):
return self.count is not None and self.period is not None


@dataclass
class FreshnessThreshold(dbtClassMixin, Mergeable):
warn_after: Optional[Time] = None
error_after: Optional[Time] = None
warn_after: Optional[Time] = field(default_factory=Time)
error_after: Optional[Time] = field(default_factory=Time)
filter: Optional[str] = None

def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus":
Expand All @@ -193,7 +198,7 @@ def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus":
return FreshnessStatus.Pass

def __bool__(self):
return self.warn_after is not None or self.error_after is not None
return bool(self.warn_after) or bool(self.error_after)


@dataclass
Expand Down
21 changes: 20 additions & 1 deletion core/dbt/parser/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
UnparsedSourceTableDefinition,
FreshnessThreshold,
UnparsedColumn,
Time
)
from dbt.exceptions import warn_or_error, InternalException
from dbt.node_types import NodeType
Expand Down Expand Up @@ -339,11 +340,29 @@ def get_unused_msg(
return '\n'.join(msg)


def merge_freshness_time_thresholds(
base: Optional[Time], update: Optional[Time]
) -> Optional[Time]:
if base and update:
return base.merged(update)
elif update is None:
return None
else:
return update or base


def merge_freshness(
base: Optional[FreshnessThreshold], update: Optional[FreshnessThreshold]
) -> Optional[FreshnessThreshold]:
if base is not None and update is not None:
return base.merged(update)
merged_freshness = base.merged(update)
# merge one level deeper the error_after and warn_after thresholds
merged_error_after = merge_freshness_time_thresholds(base.error_after, update.error_after)
merged_warn_after = merge_freshness_time_thresholds(base.warn_after, update.warn_after)

merged_freshness.error_after = merged_error_after
merged_freshness.warn_after = merged_warn_after
return merged_freshness
elif base is None and update is not None:
return update
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ def expected_seeded_manifest(self, model_database=None, quote_model=False):
'database': self.default_database,
'description': 'My table',
'external': None,
'freshness': {'error_after': None, 'warn_after': None, 'filter': None},
'freshness': {'error_after': {'count': None, 'period': None}, 'warn_after': {'count': None, 'period': None}, 'filter': None},
'identifier': 'seed',
'loaded_at_field': None,
'loader': 'a_loader',
Expand Down Expand Up @@ -1542,7 +1542,7 @@ def expected_postgres_references_manifest(self, model_database=None):
'database': self.default_database,
'description': 'My table',
'external': None,
'freshness': {'error_after': None, 'warn_after': None, 'filter': None},
'freshness': {'error_after': {'count': None, 'period': None}, 'warn_after': {'count': None, 'period': None}, 'filter': None},
'identifier': 'seed',
'loaded_at_field': None,
'loader': 'a_loader',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: 2
sources:
- name: test_source
loader: custom
freshness: # default freshness
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
loaded_at_field: loaded_at
quoting:
identifier: True
tags:
- my_test_source_tag
tables:
- name: source_a
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
freshness:
warn_after: {count: 6, period: hour}
# use the default error_after defined above
- name: source_b
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
freshness:
warn_after: {count: 6, period: hour}
error_after: {} # use the default error_after defined above
- name: source_c
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
freshness:
warn_after: {count: 6, period: hour}
error_after: null # override: disable error_after for this table
- name: source_d
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
freshness:
warn_after: {count: 6, period: hour}
error_after: {count: 72, period: hour} # override: use this new behavior instead of error_after defined above
- name: source_e
identifier: source
loaded_at_field: "{{ var('test_loaded_at') | as_text }}"
freshness: null # override: disable freshness for this table
76 changes: 76 additions & 0 deletions test/integration/042_sources_test/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def test_postgres_basic_source_def(self):
['expected_multi_source', 'multi_source_model'])
results = self.run_dbt_with_vars(['test'])
self.assertEqual(len(results), 6)
print(results)

@use_profile('postgres')
def test_postgres_source_selector(self):
Expand Down Expand Up @@ -400,6 +401,81 @@ def test_postgres_source_freshness_selection_graph_operation(self):
self.assertEqual(results[0].status, 'pass')
self._assert_freshness_results('target/ancestor_source.json', 'pass')

class TestOverrideSourceFreshness(SuccessfulSourcesTest):

@property
def models(self):
return "override_freshness_models"

@staticmethod
def get_result_from_unique_id(data, unique_id):
try:
return list(filter(lambda x : x['unique_id'] == unique_id, data['results']))[0]
except IndexError:
raise f"No result for the given unique_id. unique_id={unique_id}"

def _run_override_source_freshness(self):
self._set_updated_at_to(timedelta(hours=-30))
self.freshness_start_time = datetime.utcnow()

path = 'target/pass_source.json'
results = self.run_dbt_with_vars(
['source', 'freshness', '-o', path],
expect_pass=False
)
self.assertEqual(len(results), 4) # freshness disabled for source_e

self.assertTrue(os.path.exists(path))
with open(path) as fp:
data = json.load(fp)

result_source_a = self.get_result_from_unique_id(data, 'source.test.test_source.source_a')
self.assertEqual(result_source_a['status'], 'error')
self.assertEqual(
result_source_a['criteria'],
{
'warn_after': {'count': 6, 'period': 'hour'},
'error_after': {'count': 24, 'period': 'hour'},
'filter': None
}
)

result_source_b = self.get_result_from_unique_id(data, 'source.test.test_source.source_b')
self.assertEqual(result_source_b['status'], 'error')
self.assertEqual(
result_source_b['criteria'],
{
'warn_after': {'count': 6, 'period': 'hour'},
'error_after': {'count': 24, 'period': 'hour'},
'filter': None
}
)

result_source_c = self.get_result_from_unique_id(data, 'source.test.test_source.source_c')
self.assertEqual(result_source_c['status'], 'warn')
self.assertEqual(
result_source_c['criteria'],
{
'warn_after': {'count': 6, 'period': 'hour'},
'error_after': None,
'filter': None
}
)

result_source_d = self.get_result_from_unique_id(data, 'source.test.test_source.source_d')
self.assertEqual(result_source_d['status'], 'warn')
self.assertEqual(
result_source_d['criteria'],
{
'warn_after': {'count': 6, 'period': 'hour'},
'error_after': {'count': 72, 'period': 'hour'},
'filter': None
}
)

@use_profile('postgres')
def test_postgres_override_source_freshness(self):
self._run_override_source_freshness()

class TestSourceFreshnessErrors(SuccessfulSourcesTest):
@property
Expand Down
1 change: 1 addition & 0 deletions test/unit/test_contracts_graph_parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1906,6 +1906,7 @@ def complex_parsed_source_definition_dict():
},
'freshness': {
'warn_after': {'period': 'hour', 'count': 1},
'error_after': {}
},
'loaded_at_field': 'loaded_at',
'unrendered_config': {},
Expand Down
19 changes: 8 additions & 11 deletions test/unit/test_contracts_graph_unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ class TestFreshnessThreshold(ContractTestCase):
ContractType = FreshnessThreshold

def test_empty(self):
empty = self.ContractType(None, None)
self.assert_symmetric(empty, {})
empty = self.ContractType()
self.assert_symmetric(empty, {'error_after': {}, 'warn_after': {}})
self.assertEqual(empty.status(float('Inf')), FreshnessStatus.Pass)
self.assertEqual(empty.status(0), FreshnessStatus.Pass)

Expand Down Expand Up @@ -209,15 +209,12 @@ def test_merged(self):
)
threshold = self.ContractType(
warn_after=Time(count=18, period=TimePeriod.hour),
error_after=Time(count=2, period=TimePeriod.day),
error_after=Time(count=None, period=None),
)
self.assertEqual(threshold, t1.merged(t2))

error_seconds = timedelta(days=3).total_seconds()
warn_seconds = timedelta(days=1).total_seconds()
pass_seconds = timedelta(hours=3).total_seconds()
self.assertEqual(threshold.status(
error_seconds), FreshnessStatus.Error)
self.assertEqual(threshold.status(warn_seconds), FreshnessStatus.Warn)
self.assertEqual(threshold.status(pass_seconds), FreshnessStatus.Pass)

Expand Down Expand Up @@ -252,7 +249,7 @@ def test_defaults(self):
to_dict = {
'name': 'foo',
'description': '',
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'quoting': {},
'tables': [],
'loader': '',
Expand All @@ -278,7 +275,7 @@ def test_contents(self):
'description': 'a description',
'quoting': {'database': False},
'loader': 'some_loader',
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'tables': [],
'meta': {},
'tags': [],
Expand Down Expand Up @@ -312,7 +309,7 @@ def test_table_defaults(self):
'name': 'foo',
'description': '',
'loader': '',
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'quoting': {},
'meta': {},
'tables': [
Expand All @@ -323,7 +320,7 @@ def test_table_defaults(self):
'tests': [],
'columns': [],
'quoting': {},
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'meta': {},
'tags': [],
},
Expand All @@ -334,7 +331,7 @@ def test_table_defaults(self):
'tests': [],
'columns': [],
'quoting': {'database': True},
'freshness': {},
'freshness': {'error_after': {}, 'warn_after': {}},
'meta': {},
'tags': [],
},
Expand Down

0 comments on commit d2aa920

Please sign in to comment.