Skip to content

Commit

Permalink
MERGE EXTENSION: when not matched by source - update (#866)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Volodin <volodin.d.v@gmail.com>
  • Loading branch information
mi-volodin authored Dec 12, 2024
1 parent 40925aa commit 3fc74df
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## dbt-databricks 1.9.1 (TBD)

### Features

- Merge strategy now supports the `update set ...` action with the explicit list of updates for `when not matched by source` ([866](https://github.com/databricks/dbt-databricks/pull/866)) (thanks @mi-volodin).

### Under the Hood

- Removed pins for pandas and pydantic to ease user burdens ([874](https://github.com/databricks/dbt-databricks/pull/874))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,15 @@ select {{source_cols_csv}} from {{ source_relation }}

{%- set not_matched_by_source_action = config.get('not_matched_by_source_action') -%}
{%- set not_matched_by_source_condition = config.get('not_matched_by_source_condition') -%}


{%- set not_matched_by_source_action_trimmed = not_matched_by_source_action | lower | trim(' \n\t') %}
{%- set not_matched_by_source_action_is_set = (
not_matched_by_source_action_trimmed == 'delete'
or not_matched_by_source_action_trimmed.startswith('update')
)
%}


{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
Expand Down Expand Up @@ -137,12 +144,12 @@ select {{source_cols_csv}} from {{ source_relation }}
then insert
{{ get_merge_insert(on_schema_change, source_columns, source_alias) }}
{%- endif %}
{%- if not_matched_by_source_action == 'delete' %}
{%- if not_matched_by_source_action_is_set %}
when not matched by source
{%- if not_matched_by_source_condition %}
and ({{ not_matched_by_source_condition }})
{%- endif %}
then delete
then {{ not_matched_by_source_action }}
{%- endif %}
{% endmacro %}

Expand Down
16 changes: 13 additions & 3 deletions docs/databricks-merge.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ From v.1.9 onwards `merge` behavior can be tuned by setting the additional param

- `skip_matched_step`: if set to `true`, dbt will completely skip the `matched` clause of the merge statement.
- `skip_not_matched_step`: similarly if `true` the `not matched` clause will be skipped.
- `not_matched_by_source_action`: if set to `delete` the corresponding `when not matched by source ... then delete` clause will be added to the merge statement.
- `not_matched_by_source_action`: can be set to an action for the case the record does not exist in a source dataset.
- if set to `delete` the corresponding `when not matched by source ... then delete` clause will be added to the merge statement.
- if the action starts with `update` then the format `update set <actions>` is assumed, which will run update statement syntactically as provided.
Can be multiline formatted.
- in other cases by default no action is taken and now error raised.
- `merge_with_schema_evolution`: when set to `true` dbt generates the merge statement with `WITH SCHEMA EVOLUTION` clause.

- Step conditions that are expressed with an explicit SQL predicates allow to execute corresponding action only in case the conditions are met in addition to matching by the `unique_key`.
Expand All @@ -40,7 +44,11 @@ Example below illustrates how these parameters affect the merge statement genera
matched_condition='t.tech_change_ts < s.tech_change_ts',
not_matched_condition='s.attr1 IS NOT NULL',
not_matched_by_source_condition='t.tech_change_ts < current_timestamp()',
not_matched_by_source_action='delete',
not_matched_by_source_action='''
update set
t.attr1 = 'deleted',
t.tech_change_ts = current_timestamp()
''',
merge_with_schema_evolution=true
) }}

Expand Down Expand Up @@ -93,5 +101,7 @@ when not matched

when not matched by source
and t.tech_change_ts < current_timestamp()
then delete
then update set
t.attr1 = 'deleted',
t.tech_change_ts = current_timestamp()
```
51 changes: 49 additions & 2 deletions tests/functional/adapter/incremental/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,14 @@
4,Baron,Harkonnen,1
"""

not_matched_by_source_expected = """id,first,second,V
not_matched_by_source_then_del_expected = """id,first,second,V
2,Paul,Atreides,0
3,Dunkan,Aidaho,1
4,Baron,Harkonnen,1
"""

not_matched_by_source_then_upd_expected = """id,first,second,V
1,--,--,-1
2,Paul,Atreides,0
3,Dunkan,Aidaho,1
4,Baron,Harkonnen,1
Expand Down Expand Up @@ -411,7 +418,7 @@
{% endif %}
"""

not_matched_by_source_model = """
not_matched_by_source_then_delete_model = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
Expand Down Expand Up @@ -446,6 +453,46 @@
{% endif %}
"""

not_matched_by_source_then_update_model = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
target_alias='t',
source_alias='s',
skip_matched_step=true,
not_matched_by_source_condition='t.V > 0',
not_matched_by_source_action='''
update set
t.first = \\\'--\\\',
t.second = \\\'--\\\',
t.V = -1
''',
) }}
{% if not is_incremental() %}
-- data for first invocation of model
select 1 as id, 'Vasya' as first, 'Pupkin' as second, 1 as V
union all
select 2 as id, 'Paul' as first, 'Atreides' as second, 0 as V
union all
select 3 as id, 'Dunkan' as first, 'Aidaho' as second, 1 as V
{% else %}
-- data for subsequent incremental update
-- id = 1 should be updated with
-- id = 2 should be kept as condition doesn't hold (t.V = 0)
select 3 as id, 'Dunkan' as first, 'Aidaho' as second, 2 as V -- No update, skipped
union all
select 4 as id, 'Baron' as first, 'Harkonnen' as second, 1 as V -- should append
{% endif %}
"""

merge_schema_evolution_model = """
{{ config(
materialized = 'incremental',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,38 @@ def test_merge(self, project):
)


class TestNotMatchedBySourceAndCondition(IncrementalBase):
class TestNotMatchedBySourceAndConditionThenDelete(IncrementalBase):
@pytest.fixture(scope="class")
def seeds(self):
return {
"not_matched_by_source_expected.csv": fixtures.not_matched_by_source_expected,
"not_matched_by_source_expected.csv": fixtures.not_matched_by_source_then_del_expected,
}

@pytest.fixture(scope="class")
def models(self):
return {
"not_matched_by_source.sql": fixtures.not_matched_by_source_model,
"not_matched_by_source.sql": fixtures.not_matched_by_source_then_delete_model,
}

def test_merge(self, project):
self.seed_and_run_twice()
util.check_relations_equal(
project.adapter,
["not_matched_by_source", "not_matched_by_source_expected"],
)


class TestNotMatchedBySourceAndConditionThenUpdate(IncrementalBase):
@pytest.fixture(scope="class")
def seeds(self):
return {
"not_matched_by_source_expected.csv": fixtures.not_matched_by_source_then_upd_expected,
}

@pytest.fixture(scope="class")
def models(self):
return {
"not_matched_by_source.sql": fixtures.not_matched_by_source_then_update_model,
}

def test_merge(self, project):
Expand Down

0 comments on commit 3fc74df

Please sign in to comment.