diff --git a/CHANGELOG.md b/CHANGELOG.md index a28b42dc..cc21d4f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## dbt-databricks 1.9.1 (TBD) +### Features + +- Merge strategy now supports the `update set ...` action with the explicit list of updates for `when not matched by source` ([866](https://github.com/databricks/dbt-databricks/pull/866)) (thanks @mi-volodin). + ### Under the Hood - Removed pins for pandas and pydantic to ease user burdens ([874](https://github.com/databricks/dbt-databricks/pull/874)) diff --git a/dbt/include/databricks/macros/materializations/incremental/strategies.sql b/dbt/include/databricks/macros/materializations/incremental/strategies.sql index 57db5496..311c2f40 100644 --- a/dbt/include/databricks/macros/materializations/incremental/strategies.sql +++ b/dbt/include/databricks/macros/materializations/incremental/strategies.sql @@ -91,8 +91,15 @@ select {{source_cols_csv}} from {{ source_relation }} {%- set not_matched_by_source_action = config.get('not_matched_by_source_action') -%} {%- set not_matched_by_source_condition = config.get('not_matched_by_source_condition') -%} - + {%- set not_matched_by_source_action_trimmed = not_matched_by_source_action | lower | trim(' \n\t') %} + {%- set not_matched_by_source_action_is_set = ( + not_matched_by_source_action_trimmed == 'delete' + or not_matched_by_source_action_trimmed.startswith('update') + ) + %} + + {% if unique_key %} {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} {% for key in unique_key %} @@ -137,12 +144,12 @@ select {{source_cols_csv}} from {{ source_relation }} then insert {{ get_merge_insert(on_schema_change, source_columns, source_alias) }} {%- endif %} - {%- if not_matched_by_source_action == 'delete' %} + {%- if not_matched_by_source_action_is_set %} when not matched by source {%- if not_matched_by_source_condition %} and ({{ not_matched_by_source_condition }}) {%- endif %} - then delete + then {{ not_matched_by_source_action }} {%- endif %} {% endmacro %} diff --git a/docs/databricks-merge.md b/docs/databricks-merge.md index caa00336..4034b8f6 100644 --- a/docs/databricks-merge.md +++ b/docs/databricks-merge.md @@ -18,7 +18,11 @@ From v.1.9 onwards `merge` behavior can be tuned by setting the additional param - `skip_matched_step`: if set to `true`, dbt will completely skip the `matched` clause of the merge statement. - `skip_not_matched_step`: similarly if `true` the `not matched` clause will be skipped. - - `not_matched_by_source_action`: if set to `delete` the corresponding `when not matched by source ... then delete` clause will be added to the merge statement. + - `not_matched_by_source_action`: can be set to an action for the case the record does not exist in a source dataset. + - if set to `delete` the corresponding `when not matched by source ... then delete` clause will be added to the merge statement. + - if the action starts with `update` then the format `update set ` is assumed, which will run update statement syntactically as provided. + Can be multiline formatted. + - in other cases by default no action is taken and now error raised. - `merge_with_schema_evolution`: when set to `true` dbt generates the merge statement with `WITH SCHEMA EVOLUTION` clause. - Step conditions that are expressed with an explicit SQL predicates allow to execute corresponding action only in case the conditions are met in addition to matching by the `unique_key`. @@ -40,7 +44,11 @@ Example below illustrates how these parameters affect the merge statement genera matched_condition='t.tech_change_ts < s.tech_change_ts', not_matched_condition='s.attr1 IS NOT NULL', not_matched_by_source_condition='t.tech_change_ts < current_timestamp()', - not_matched_by_source_action='delete', + not_matched_by_source_action=''' + update set + t.attr1 = 'deleted', + t.tech_change_ts = current_timestamp() + ''', merge_with_schema_evolution=true ) }} @@ -93,5 +101,7 @@ when not matched when not matched by source and t.tech_change_ts < current_timestamp() - then delete + then update set + t.attr1 = 'deleted', + t.tech_change_ts = current_timestamp() ``` diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index 9d0f2913..18fcee40 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -264,7 +264,14 @@ 4,Baron,Harkonnen,1 """ -not_matched_by_source_expected = """id,first,second,V +not_matched_by_source_then_del_expected = """id,first,second,V +2,Paul,Atreides,0 +3,Dunkan,Aidaho,1 +4,Baron,Harkonnen,1 +""" + +not_matched_by_source_then_upd_expected = """id,first,second,V +1,--,--,-1 2,Paul,Atreides,0 3,Dunkan,Aidaho,1 4,Baron,Harkonnen,1 @@ -411,7 +418,7 @@ {% endif %} """ -not_matched_by_source_model = """ +not_matched_by_source_then_delete_model = """ {{ config( materialized = 'incremental', unique_key = 'id', @@ -446,6 +453,46 @@ {% endif %} """ +not_matched_by_source_then_update_model = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy='merge', + target_alias='t', + source_alias='s', + skip_matched_step=true, + not_matched_by_source_condition='t.V > 0', + not_matched_by_source_action=''' + update set + t.first = \\\'--\\\', + t.second = \\\'--\\\', + t.V = -1 + ''', +) }} + +{% if not is_incremental() %} + +-- data for first invocation of model + +select 1 as id, 'Vasya' as first, 'Pupkin' as second, 1 as V +union all +select 2 as id, 'Paul' as first, 'Atreides' as second, 0 as V +union all +select 3 as id, 'Dunkan' as first, 'Aidaho' as second, 1 as V + +{% else %} + +-- data for subsequent incremental update + +-- id = 1 should be updated with +-- id = 2 should be kept as condition doesn't hold (t.V = 0) +select 3 as id, 'Dunkan' as first, 'Aidaho' as second, 2 as V -- No update, skipped +union all +select 4 as id, 'Baron' as first, 'Harkonnen' as second, 1 as V -- should append + +{% endif %} +""" + merge_schema_evolution_model = """ {{ config( materialized = 'incremental', diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index 88db60ee..45a24362 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -288,17 +288,38 @@ def test_merge(self, project): ) -class TestNotMatchedBySourceAndCondition(IncrementalBase): +class TestNotMatchedBySourceAndConditionThenDelete(IncrementalBase): @pytest.fixture(scope="class") def seeds(self): return { - "not_matched_by_source_expected.csv": fixtures.not_matched_by_source_expected, + "not_matched_by_source_expected.csv": fixtures.not_matched_by_source_then_del_expected, } @pytest.fixture(scope="class") def models(self): return { - "not_matched_by_source.sql": fixtures.not_matched_by_source_model, + "not_matched_by_source.sql": fixtures.not_matched_by_source_then_delete_model, + } + + def test_merge(self, project): + self.seed_and_run_twice() + util.check_relations_equal( + project.adapter, + ["not_matched_by_source", "not_matched_by_source_expected"], + ) + + +class TestNotMatchedBySourceAndConditionThenUpdate(IncrementalBase): + @pytest.fixture(scope="class") + def seeds(self): + return { + "not_matched_by_source_expected.csv": fixtures.not_matched_by_source_then_upd_expected, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "not_matched_by_source.sql": fixtures.not_matched_by_source_then_update_model, } def test_merge(self, project):