diff --git a/core/dbt/contracts/graph/model_config.py b/core/dbt/contracts/graph/model_config.py index 7a5b621fbf2..7910d18eae5 100644 --- a/core/dbt/contracts/graph/model_config.py +++ b/core/dbt/contracts/graph/model_config.py @@ -410,6 +410,7 @@ class NodeConfig(NodeAndTestConfig): ) full_refresh: Optional[bool] = None on_schema_change: Optional[str] = 'ignore' + incremental_predicates: Optional[List[Dict[str, Any]]] = None @classmethod def __pre_deserialize__(cls, data): diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/get_incremental_predicates.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/get_incremental_predicates.sql new file mode 100644 index 00000000000..d4fe8780776 --- /dev/null +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/get_incremental_predicates.sql @@ -0,0 +1,28 @@ +{# + + These macros will compile the proper join predicates for incremental models, + merging default behavior with the optional, user-supplied incremental_predicates + config. + +#} + +{% macro get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates, partitions=none) %} + {{ adapter.dispatch('get_incremental_predicates')(target_relation, incremental_strategy, unique_key, user_predicates, partitions) }} +{% endmacro %} + +{% macro default__get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates=none, partitions=none) %} + {# + + This behavior should only be observed when dbt calls the default + `get_delete_insert_merge_sql` strategy in dbt-core + + #} + {%- if user_predicates -%} + {%- set predicates %} + {%- for condition in user_predicates -%} and {{ target_relation.name }}.{{ condition.source_col }} {{ condition.expression }} {% endfor -%} + {%- endset -%} + {%- endif -%} + + {{ return(predicates) }} + +{% endmacro %} \ No newline at end of file diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql index 07dd1e1b07b..801e16cecef 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql @@ -9,6 +9,9 @@ {%- set full_refresh_mode = (should_full_refresh()) -%} {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} + {% set user_predicates = config.get('incremental_predicates', default=None) %} + {% set incremental_strategy = config.get('incremental_strategy', default=None) %} + {% set predicates = get_incremental_predicates(target_relation, incremental_strategy, unique_key, user_predicates) %} {% set tmp_identifier = model['name'] + '__dbt_tmp' %} {% set backup_identifier = model['name'] + "__dbt_backup" %} @@ -58,7 +61,7 @@ {% if not dest_columns %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} {% endif %} - {% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %} + {% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns, predicates) %} {% endif %} diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql index d5cda4f969c..9ba625bb88d 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql @@ -1,27 +1,17 @@ -{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%} +{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates) -%} {{ adapter.dispatch('get_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }} {%- endmacro %} {% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%} - {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="quoted") | list) -%} {%- set sql_header = config.get('sql_header', none) -%} - {% if unique_key %} - {% set unique_key_match %} - DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} - {% endset %} - {% do predicates.append(unique_key_match) %} - {% else %} - {% do predicates.append('FALSE') %} - {% endif %} - {{ sql_header if sql_header is not none }} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE - on {{ predicates | join(' and ') }} + on {{ predicates }} {% if unique_key %} when matched then update set @@ -39,11 +29,11 @@ {% endmacro %} -{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%} - {{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns) }} +{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%} + {{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }} {%- endmacro %} -{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%} +{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} @@ -52,8 +42,11 @@ where ({{ unique_key }}) in ( select ({{ unique_key }}) from {{ source }} - ); - {% endif %} + ) + {{ predicates }} + ; + + {%- endif %} insert into {{ target }} ({{ dest_cols_csv }}) ( @@ -69,7 +62,6 @@ {%- endmacro %} {% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%} - {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set sql_header = config.get('sql_header', none) -%} @@ -80,7 +72,7 @@ on FALSE when not matched by source - {% if predicates %} and {{ predicates | join(' and ') }} {% endif %} + {% if predicates %} and {{ predicates }} {% endif %} then delete when not matched then insert diff --git a/test/integration/029_docs_generate_tests/test_docs_generate.py b/test/integration/029_docs_generate_tests/test_docs_generate.py index 67f33b6e55c..2b7c65ff74c 100644 --- a/test/integration/029_docs_generate_tests/test_docs_generate.py +++ b/test/integration/029_docs_generate_tests/test_docs_generate.py @@ -460,6 +460,7 @@ def rendered_model_config(self, **updates): 'persist_docs': {}, 'full_refresh': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'meta': {}, } result.update(updates) @@ -481,6 +482,7 @@ def rendered_seed_config(self, **updates): 'quote_columns': True, 'full_refresh': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'database': None, 'schema': None, 'alias': None, @@ -509,6 +511,7 @@ def rendered_snapshot_config(self, **updates): 'persist_docs': {}, 'full_refresh': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'strategy': 'check', 'check_cols': 'all', 'unique_key': 'id', diff --git a/test/integration/047_dbt_ls_test/test_ls.py b/test/integration/047_dbt_ls_test/test_ls.py index db66406f98b..40fc55be61a 100644 --- a/test/integration/047_dbt_ls_test/test_ls.py +++ b/test/integration/047_dbt_ls_test/test_ls.py @@ -93,6 +93,7 @@ def expect_snapshot_output(self): 'alias': None, 'check_cols': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'meta': {}, }, 'unique_id': 'snapshot.test.my_snapshot', @@ -124,6 +125,7 @@ def expect_analyses_output(self): 'persist_docs': {}, 'full_refresh': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'database': None, 'schema': None, 'alias': None, @@ -159,6 +161,7 @@ def expect_model_output(self): 'persist_docs': {}, 'full_refresh': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'database': None, 'schema': None, 'alias': None, @@ -185,6 +188,7 @@ def expect_model_output(self): 'persist_docs': {}, 'full_refresh': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'incremental_strategy': 'delete+insert', 'database': None, 'schema': None, @@ -212,6 +216,7 @@ def expect_model_output(self): 'persist_docs': {}, 'full_refresh': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'database': None, 'schema': None, 'alias': None, @@ -238,6 +243,7 @@ def expect_model_output(self): 'persist_docs': {}, 'full_refresh': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'database': None, 'schema': None, 'alias': None, @@ -275,6 +281,7 @@ def expect_model_ephemeral_output(self): 'persist_docs': {}, 'full_refresh': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'database': None, 'schema': None, 'alias': None, @@ -333,6 +340,7 @@ def expect_seed_output(self): 'quote_columns': False, 'full_refresh': None, 'on_schema_change': 'ignore', + 'incremental_predicates': None, 'database': None, 'schema': None, 'alias': None,