From d2aa920275318d574df0a6063253ded8d0ad5db7 Mon Sep 17 00:00:00 2001 From: kadero Date: Tue, 26 Oct 2021 15:23:57 +0200 Subject: [PATCH] Feature: nullable error_after in source (#3955) * Add nullable error after feature * add merge_error_after method * Fix FreshnessThreshold merged test * Fix other tests * Fix merge error after * Fix test docs generate integration test * Fix source integration test * Typo and fix linting. * Fix mypy test * More terse way to express merge_freshness_time_thresholds * Update Changelog.md * Add integration test * Fix conflict * Fix contributing.md * Fix integration tests * Move up changelog entry Co-authored-by: Jeremy Cohen --- CHANGELOG.md | 6 ++ core/dbt/contracts/graph/unparsed.py | 19 +++-- core/dbt/parser/sources.py | 21 ++++- .../test_docs_generate.py | 4 +- .../override_freshness_models/schema.yml | 42 ++++++++++ .../042_sources_test/test_sources.py | 76 +++++++++++++++++++ test/unit/test_contracts_graph_parsed.py | 1 + test/unit/test_contracts_graph_unparsed.py | 19 ++--- 8 files changed, 167 insertions(+), 21 deletions(-) create mode 100644 test/integration/042_sources_test/override_freshness_models/schema.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index f868158352c..bb0167cfb1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ ## dbt-core 1.0.0 (Release TBD) +### Features +- Allow nullable `error_after` in source freshness ([#3874](https://github.com/dbt-labs/dbt-core/issues/3874), [#3955](https://github.com/dbt-labs/dbt-core/pull/3955)) + +Contributors: +- [@kadero](https://github.com/kadero) ([3955](https://github.com/dbt-labs/dbt-core/pull/3955)) + ## dbt-core 1.0.0b2 (October 25, 2021) ### Breaking changes diff --git a/core/dbt/contracts/graph/unparsed.py b/core/dbt/contracts/graph/unparsed.py index a68a58a6810..3b973d22acd 100644 --- a/core/dbt/contracts/graph/unparsed.py +++ b/core/dbt/contracts/graph/unparsed.py @@ -167,20 +167,25 @@ def plural(self) -> str: @dataclass -class Time(dbtClassMixin, Replaceable): - count: int - period: TimePeriod +class Time(dbtClassMixin, Mergeable): + count: Optional[int] = None + period: Optional[TimePeriod] = None def exceeded(self, actual_age: float) -> bool: - kwargs = {self.period.plural(): self.count} + if self.period is None or self.count is None: + return False + kwargs: Dict[str, int] = {self.period.plural(): self.count} difference = timedelta(**kwargs).total_seconds() return actual_age > difference + def __bool__(self): + return self.count is not None and self.period is not None + @dataclass class FreshnessThreshold(dbtClassMixin, Mergeable): - warn_after: Optional[Time] = None - error_after: Optional[Time] = None + warn_after: Optional[Time] = field(default_factory=Time) + error_after: Optional[Time] = field(default_factory=Time) filter: Optional[str] = None def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus": @@ -193,7 +198,7 @@ def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus": return FreshnessStatus.Pass def __bool__(self): - return self.warn_after is not None or self.error_after is not None + return bool(self.warn_after) or bool(self.error_after) @dataclass diff --git a/core/dbt/parser/sources.py b/core/dbt/parser/sources.py index 20b34dadf7a..98eb418f4e0 100644 --- a/core/dbt/parser/sources.py +++ b/core/dbt/parser/sources.py @@ -24,6 +24,7 @@ UnparsedSourceTableDefinition, FreshnessThreshold, UnparsedColumn, + Time ) from dbt.exceptions import warn_or_error, InternalException from dbt.node_types import NodeType @@ -339,11 +340,29 @@ def get_unused_msg( return '\n'.join(msg) +def merge_freshness_time_thresholds( + base: Optional[Time], update: Optional[Time] +) -> Optional[Time]: + if base and update: + return base.merged(update) + elif update is None: + return None + else: + return update or base + + def merge_freshness( base: Optional[FreshnessThreshold], update: Optional[FreshnessThreshold] ) -> Optional[FreshnessThreshold]: if base is not None and update is not None: - return base.merged(update) + merged_freshness = base.merged(update) + # merge one level deeper the error_after and warn_after thresholds + merged_error_after = merge_freshness_time_thresholds(base.error_after, update.error_after) + merged_warn_after = merge_freshness_time_thresholds(base.warn_after, update.warn_after) + + merged_freshness.error_after = merged_error_after + merged_freshness.warn_after = merged_warn_after + return merged_freshness elif base is None and update is not None: return update else: diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index 3a12bc46eaa..7ad0219becc 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -1094,7 +1094,7 @@ def expected_seeded_manifest(self, model_database=None, quote_model=False): 'database': self.default_database, 'description': 'My table', 'external': None, - 'freshness': {'error_after': None, 'warn_after': None, 'filter': None}, + 'freshness': {'error_after': {'count': None, 'period': None}, 'warn_after': {'count': None, 'period': None}, 'filter': None}, 'identifier': 'seed', 'loaded_at_field': None, 'loader': 'a_loader', @@ -1542,7 +1542,7 @@ def expected_postgres_references_manifest(self, model_database=None): 'database': self.default_database, 'description': 'My table', 'external': None, - 'freshness': {'error_after': None, 'warn_after': None, 'filter': None}, + 'freshness': {'error_after': {'count': None, 'period': None}, 'warn_after': {'count': None, 'period': None}, 'filter': None}, 'identifier': 'seed', 'loaded_at_field': None, 'loader': 'a_loader', diff --git a/test/integration/042_sources_test/override_freshness_models/schema.yml b/test/integration/042_sources_test/override_freshness_models/schema.yml new file mode 100644 index 00000000000..dabcf0779a1 --- /dev/null +++ b/test/integration/042_sources_test/override_freshness_models/schema.yml @@ -0,0 +1,42 @@ +version: 2 +sources: + - name: test_source + loader: custom + freshness: # default freshness + warn_after: {count: 12, period: hour} + error_after: {count: 24, period: hour} + schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}" + loaded_at_field: loaded_at + quoting: + identifier: True + tags: + - my_test_source_tag + tables: + - name: source_a + identifier: source + loaded_at_field: "{{ var('test_loaded_at') | as_text }}" + freshness: + warn_after: {count: 6, period: hour} + # use the default error_after defined above + - name: source_b + identifier: source + loaded_at_field: "{{ var('test_loaded_at') | as_text }}" + freshness: + warn_after: {count: 6, period: hour} + error_after: {} # use the default error_after defined above + - name: source_c + identifier: source + loaded_at_field: "{{ var('test_loaded_at') | as_text }}" + freshness: + warn_after: {count: 6, period: hour} + error_after: null # override: disable error_after for this table + - name: source_d + identifier: source + loaded_at_field: "{{ var('test_loaded_at') | as_text }}" + freshness: + warn_after: {count: 6, period: hour} + error_after: {count: 72, period: hour} # override: use this new behavior instead of error_after defined above + - name: source_e + identifier: source + loaded_at_field: "{{ var('test_loaded_at') | as_text }}" + freshness: null # override: disable freshness for this table diff --git a/test/integration/042_sources_test/test_sources.py b/test/integration/042_sources_test/test_sources.py index 412f27f5dec..e9362fd2517 100644 --- a/test/integration/042_sources_test/test_sources.py +++ b/test/integration/042_sources_test/test_sources.py @@ -137,6 +137,7 @@ def test_postgres_basic_source_def(self): ['expected_multi_source', 'multi_source_model']) results = self.run_dbt_with_vars(['test']) self.assertEqual(len(results), 6) + print(results) @use_profile('postgres') def test_postgres_source_selector(self): @@ -400,6 +401,81 @@ def test_postgres_source_freshness_selection_graph_operation(self): self.assertEqual(results[0].status, 'pass') self._assert_freshness_results('target/ancestor_source.json', 'pass') +class TestOverrideSourceFreshness(SuccessfulSourcesTest): + + @property + def models(self): + return "override_freshness_models" + + @staticmethod + def get_result_from_unique_id(data, unique_id): + try: + return list(filter(lambda x : x['unique_id'] == unique_id, data['results']))[0] + except IndexError: + raise f"No result for the given unique_id. unique_id={unique_id}" + + def _run_override_source_freshness(self): + self._set_updated_at_to(timedelta(hours=-30)) + self.freshness_start_time = datetime.utcnow() + + path = 'target/pass_source.json' + results = self.run_dbt_with_vars( + ['source', 'freshness', '-o', path], + expect_pass=False + ) + self.assertEqual(len(results), 4) # freshness disabled for source_e + + self.assertTrue(os.path.exists(path)) + with open(path) as fp: + data = json.load(fp) + + result_source_a = self.get_result_from_unique_id(data, 'source.test.test_source.source_a') + self.assertEqual(result_source_a['status'], 'error') + self.assertEqual( + result_source_a['criteria'], + { + 'warn_after': {'count': 6, 'period': 'hour'}, + 'error_after': {'count': 24, 'period': 'hour'}, + 'filter': None + } + ) + + result_source_b = self.get_result_from_unique_id(data, 'source.test.test_source.source_b') + self.assertEqual(result_source_b['status'], 'error') + self.assertEqual( + result_source_b['criteria'], + { + 'warn_after': {'count': 6, 'period': 'hour'}, + 'error_after': {'count': 24, 'period': 'hour'}, + 'filter': None + } + ) + + result_source_c = self.get_result_from_unique_id(data, 'source.test.test_source.source_c') + self.assertEqual(result_source_c['status'], 'warn') + self.assertEqual( + result_source_c['criteria'], + { + 'warn_after': {'count': 6, 'period': 'hour'}, + 'error_after': None, + 'filter': None + } + ) + + result_source_d = self.get_result_from_unique_id(data, 'source.test.test_source.source_d') + self.assertEqual(result_source_d['status'], 'warn') + self.assertEqual( + result_source_d['criteria'], + { + 'warn_after': {'count': 6, 'period': 'hour'}, + 'error_after': {'count': 72, 'period': 'hour'}, + 'filter': None + } + ) + + @use_profile('postgres') + def test_postgres_override_source_freshness(self): + self._run_override_source_freshness() class TestSourceFreshnessErrors(SuccessfulSourcesTest): @property diff --git a/test/unit/test_contracts_graph_parsed.py b/test/unit/test_contracts_graph_parsed.py index 65a4aa23586..5d37cc7078b 100644 --- a/test/unit/test_contracts_graph_parsed.py +++ b/test/unit/test_contracts_graph_parsed.py @@ -1906,6 +1906,7 @@ def complex_parsed_source_definition_dict(): }, 'freshness': { 'warn_after': {'period': 'hour', 'count': 1}, + 'error_after': {} }, 'loaded_at_field': 'loaded_at', 'unrendered_config': {}, diff --git a/test/unit/test_contracts_graph_unparsed.py b/test/unit/test_contracts_graph_unparsed.py index 26b2bc3b947..097325ac08b 100644 --- a/test/unit/test_contracts_graph_unparsed.py +++ b/test/unit/test_contracts_graph_unparsed.py @@ -174,8 +174,8 @@ class TestFreshnessThreshold(ContractTestCase): ContractType = FreshnessThreshold def test_empty(self): - empty = self.ContractType(None, None) - self.assert_symmetric(empty, {}) + empty = self.ContractType() + self.assert_symmetric(empty, {'error_after': {}, 'warn_after': {}}) self.assertEqual(empty.status(float('Inf')), FreshnessStatus.Pass) self.assertEqual(empty.status(0), FreshnessStatus.Pass) @@ -209,15 +209,12 @@ def test_merged(self): ) threshold = self.ContractType( warn_after=Time(count=18, period=TimePeriod.hour), - error_after=Time(count=2, period=TimePeriod.day), + error_after=Time(count=None, period=None), ) self.assertEqual(threshold, t1.merged(t2)) - error_seconds = timedelta(days=3).total_seconds() warn_seconds = timedelta(days=1).total_seconds() pass_seconds = timedelta(hours=3).total_seconds() - self.assertEqual(threshold.status( - error_seconds), FreshnessStatus.Error) self.assertEqual(threshold.status(warn_seconds), FreshnessStatus.Warn) self.assertEqual(threshold.status(pass_seconds), FreshnessStatus.Pass) @@ -252,7 +249,7 @@ def test_defaults(self): to_dict = { 'name': 'foo', 'description': '', - 'freshness': {}, + 'freshness': {'error_after': {}, 'warn_after': {}}, 'quoting': {}, 'tables': [], 'loader': '', @@ -278,7 +275,7 @@ def test_contents(self): 'description': 'a description', 'quoting': {'database': False}, 'loader': 'some_loader', - 'freshness': {}, + 'freshness': {'error_after': {}, 'warn_after': {}}, 'tables': [], 'meta': {}, 'tags': [], @@ -312,7 +309,7 @@ def test_table_defaults(self): 'name': 'foo', 'description': '', 'loader': '', - 'freshness': {}, + 'freshness': {'error_after': {}, 'warn_after': {}}, 'quoting': {}, 'meta': {}, 'tables': [ @@ -323,7 +320,7 @@ def test_table_defaults(self): 'tests': [], 'columns': [], 'quoting': {}, - 'freshness': {}, + 'freshness': {'error_after': {}, 'warn_after': {}}, 'meta': {}, 'tags': [], }, @@ -334,7 +331,7 @@ def test_table_defaults(self): 'tests': [], 'columns': [], 'quoting': {'database': True}, - 'freshness': {}, + 'freshness': {'error_after': {}, 'warn_after': {}}, 'meta': {}, 'tags': [], },