Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: support for arbitrary predicates for incremental models #4546

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ class NodeConfig(NodeAndTestConfig):
)
full_refresh: Optional[bool] = None
on_schema_change: Optional[str] = 'ignore'
incremental_predicates: Optional[List[Dict[str, Any]]] = None

@classmethod
def __pre_deserialize__(cls, data):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{#

These macros will compile the proper join predicates for incremental models,
merging default behavior with the optional, user-supplied incremental_predicates
config.

#}

{% macro get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates, partitions=none) %}
{{ adapter.dispatch('get_incremental_predicates')(target_relation, incremental_strategy, unique_key, user_predicates, partitions) }}
{% endmacro %}

{% macro default__get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates=none, partitions=none) %}
{#

This behavior should only be observed when dbt calls the default
`get_delete_insert_merge_sql` strategy in dbt-core

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean incremental predicates will only work for delete+insert strategy? is this just to get the feature started with least amount of work, or is there a blocker to implement for the merge strategy?

I don't exactly have an exact use case to share, so just curious and generally thinking about pros/cons of moving from merge to delete+insert strategy.


#}
{%- if user_predicates -%}
{%- set predicates %}
{%- for condition in user_predicates -%} and {{ target_relation.name }}.{{ condition.source_col }} {{ condition.expression }} {% endfor -%}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to call the parameter source_col?

I'd expect something like target_col or target_column to clarify that the predicate is applied to the target table in the context of an incremental load into a target model (sometimes src or source is used as alias in a merge statement)

{%- endset -%}
{%- endif -%}

{{ return(predicates) }}

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
{%- set full_refresh_mode = (should_full_refresh()) -%}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}
{% set user_predicates = config.get('incremental_predicates', default=None) %}
{% set incremental_strategy = config.get('incremental_strategy', default=None) %}
{% set predicates = get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates) %}

{% set tmp_identifier = model['name'] + '__dbt_tmp' %}
{% set backup_identifier = model['name'] + "__dbt_backup" %}
Expand Down Expand Up @@ -58,7 +61,7 @@
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, predicates) %}

{% endif %}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,17 @@
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%}
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{{ adapter.dispatch('get_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }}
{%- endmacro %}

{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{% if unique_key %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}
on {{ predicates }}

{% if unique_key %}
when matched then update set
Expand All @@ -39,11 +29,11 @@
{% endmacro %}


{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns) }}
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }}
{%- endmacro %}

{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

Expand All @@ -52,8 +42,11 @@
where ({{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
);
{% endif %}
)
{{ predicates }}
;

{%- endif %}

insert into {{ target }} ({{ dest_cols_csv }})
(
Expand All @@ -69,7 +62,6 @@
{%- endmacro %}

{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set sql_header = config.get('sql_header', none) -%}

Expand All @@ -80,7 +72,7 @@
on FALSE

when not matched by source
{% if predicates %} and {{ predicates | join(' and ') }} {% endif %}
{% if predicates %} and {{ predicates }} {% endif %}
then delete

when not matched then insert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ def rendered_model_config(self, **updates):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'meta': {},
}
result.update(updates)
Expand All @@ -481,6 +482,7 @@ def rendered_seed_config(self, **updates):
'quote_columns': True,
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
Expand Down Expand Up @@ -509,6 +511,7 @@ def rendered_snapshot_config(self, **updates):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'strategy': 'check',
'check_cols': 'all',
'unique_key': 'id',
Expand Down
8 changes: 8 additions & 0 deletions test/integration/047_dbt_ls_test/test_ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def expect_snapshot_output(self):
'alias': None,
'check_cols': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'meta': {},
},
'unique_id': 'snapshot.test.my_snapshot',
Expand Down Expand Up @@ -124,6 +125,7 @@ def expect_analyses_output(self):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
Expand Down Expand Up @@ -159,6 +161,7 @@ def expect_model_output(self):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
Expand All @@ -185,6 +188,7 @@ def expect_model_output(self):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'incremental_strategy': 'delete+insert',
'database': None,
'schema': None,
Expand Down Expand Up @@ -212,6 +216,7 @@ def expect_model_output(self):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
Expand All @@ -238,6 +243,7 @@ def expect_model_output(self):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
Expand Down Expand Up @@ -275,6 +281,7 @@ def expect_model_ephemeral_output(self):
'persist_docs': {},
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
Expand Down Expand Up @@ -333,6 +340,7 @@ def expect_seed_output(self):
'quote_columns': False,
'full_refresh': None,
'on_schema_change': 'ignore',
'incremental_predicates': None,
'database': None,
'schema': None,
'alias': None,
Expand Down