diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a318ada61b..12056bf8ac6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,10 +11,12 @@ ### Under the hood - Bump artifact schema versions for 1.0.0: manifest v4, run results v4, sources v3. Notable changes: schema test + data test nodes are renamed to generic test + singular test nodes; freshness threshold default values ([#4191](https://github.com/dbt-labs/dbt-core/pull/4191)) - Speed up node selection by skipping `incorporate_indirect_nodes` if not needed ([#4213](https://github.com/dbt-labs/dbt-core/issues/4213), [#4214](https://github.com/dbt-labs/dbt-core/issues/4214)) +- When on_schema_change is set, pass common columns as dest_columns in incremental merge macros ([#4144](https://github.com/dbt-labs/dbt-core/issues/4144), [#4170](https://github.com/dbt-labs/dbt-core/pull/4170)) Contributors: - [@kadero](https://github.com/kadero) ([3955](https://github.com/dbt-labs/dbt-core/pull/3955)) - [@frankcash](https://github.com/frankcash) ([4136](https://github.com/dbt-labs/dbt-core/pull/4136)) +- [@Kayrnt](https://github.com/Kayrnt) ([4136](https://github.com/dbt-labs/dbt-core/pull/4170)) ## dbt-core 1.0.0b2 (October 25, 2021) diff --git a/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql b/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql index ccdfbfad39b..07dd1e1b07b 100644 --- a/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql +++ b/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql @@ -53,8 +53,12 @@ {% do adapter.expand_target_column_types( from_relation=tmp_relation, to_relation=target_relation) %} - {% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} - {% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} + {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} + {% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} + {% if not dest_columns %} + {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} + {% endif %} + {% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %} {% endif %} diff --git a/core/dbt/include/global_project/macros/materializations/incremental/on_schema_change.sql b/core/dbt/include/global_project/macros/materializations/incremental/on_schema_change.sql index c346f726f22..4367ad80eb1 100644 --- a/core/dbt/include/global_project/macros/materializations/incremental/on_schema_change.sql +++ b/core/dbt/include/global_project/macros/materializations/incremental/on_schema_change.sql @@ -57,7 +57,7 @@ {%- set target_columns = adapter.get_columns_in_relation(target_relation) -%} {%- set source_not_in_target = diff_columns(source_columns, target_columns) -%} {%- set target_not_in_source = diff_columns(target_columns, source_columns) -%} - + {% set new_target_types = diff_column_data_types(source_columns, target_columns) %} {% if source_not_in_target != [] %} @@ -72,6 +72,8 @@ 'schema_changed': schema_changed, 'source_not_in_target': source_not_in_target, 'target_not_in_source': target_not_in_source, + 'source_columns': source_columns, + 'target_columns': target_columns, 'new_target_types': new_target_types } %} @@ -132,7 +134,11 @@ {% macro process_schema_changes(on_schema_change, source_relation, target_relation) %} - {% if on_schema_change != 'ignore' %} + {% if on_schema_change == 'ignore' %} + + {{ return({}) }} + + {% else %} {% set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) %} @@ -158,6 +164,8 @@ {% endif %} {% endif %} + + {{ return(schema_changes_dict['source_columns']) }} {% endif %} diff --git a/test/integration/070_incremental_schema_tests/models/incremental_append_new_columns_remove_one.sql b/test/integration/070_incremental_schema_tests/models/incremental_append_new_columns_remove_one.sql new file mode 100644 index 00000000000..dbb4962a7e5 --- /dev/null +++ b/test/integration/070_incremental_schema_tests/models/incremental_append_new_columns_remove_one.sql @@ -0,0 +1,28 @@ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='append_new_columns' + ) +}} + +{% set string_type = 'varchar(10)' %} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field3 as {{string_type}}) as field3, + cast(field4 as {{string_type}}) as field4 +FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +SELECT id, + cast(field1 as {{string_type}}) as field1, + cast(field2 as {{string_type}}) as field2 +FROM source_data where id <= 3 + +{% endif %} \ No newline at end of file diff --git a/test/integration/070_incremental_schema_tests/models/incremental_append_new_columns_remove_one_target.sql b/test/integration/070_incremental_schema_tests/models/incremental_append_new_columns_remove_one_target.sql new file mode 100644 index 00000000000..f3a279f0285 --- /dev/null +++ b/test/integration/070_incremental_schema_tests/models/incremental_append_new_columns_remove_one_target.sql @@ -0,0 +1,19 @@ +{{ + config(materialized='table') +}} + +{% set string_type = 'varchar(10)' %} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +select id, + cast(field1 as {{string_type}}) as field1, + cast(CASE WHEN id > 3 THEN NULL ELSE field2 END as {{string_type}}) AS field2, + cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3, + cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4 + +from source_data \ No newline at end of file diff --git a/test/integration/070_incremental_schema_tests/test_incremental_schema.py b/test/integration/070_incremental_schema_tests/test_incremental_schema.py index 3de7d6e7bdc..09a494b8952 100644 --- a/test/integration/070_incremental_schema_tests/test_incremental_schema.py +++ b/test/integration/070_incremental_schema_tests/test_incremental_schema.py @@ -44,6 +44,12 @@ def run_incremental_append_new_columns(self): compare_source = 'incremental_append_new_columns' compare_target = 'incremental_append_new_columns_target' self.run_twice_and_assert(select, compare_source, compare_target) + + def run_incremental_append_new_columns_remove_one(self): + select = 'model_a incremental_append_new_columns_remove_one incremental_append_new_columns_remove_one_target' + compare_source = 'incremental_append_new_columns_remove_one' + compare_target = 'incremental_append_new_columns_remove_one_target' + self.run_twice_and_assert(select, compare_source, compare_target) def run_incremental_sync_all_columns(self): select = 'model_a incremental_sync_all_columns incremental_sync_all_columns_target' @@ -70,6 +76,7 @@ def test__postgres__run_incremental_ignore(self): @use_profile('postgres') def test__postgres__run_incremental_append_new_columns(self): self.run_incremental_append_new_columns() + self.run_incremental_append_new_columns_remove_one() @use_profile('postgres') def test__postgres__run_incremental_sync_all_columns(self):