Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use common columns for incremental schema changes #4170

Merged
merged 4 commits into from
Nov 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
### Under the hood
- Bump artifact schema versions for 1.0.0: manifest v4, run results v4, sources v3. Notable changes: schema test + data test nodes are renamed to generic test + singular test nodes; freshness threshold default values ([#4191](https://github.com/dbt-labs/dbt-core/pull/4191))
- Speed up node selection by skipping `incorporate_indirect_nodes` if not needed ([#4213](https://github.com/dbt-labs/dbt-core/issues/4213), [#4214](https://github.com/dbt-labs/dbt-core/issues/4214))
- When on_schema_change is set, pass common columns as dest_columns in incremental merge macros ([#4144](https://github.com/dbt-labs/dbt-core/issues/4144), [#4170](https://github.com/dbt-labs/dbt-core/pull/4170))

Contributors:
- [@kadero](https://github.com/kadero) ([3955](https://github.com/dbt-labs/dbt-core/pull/3955))
- [@frankcash](https://github.com/frankcash) ([4136](https://github.com/dbt-labs/dbt-core/pull/4136))
- [@Kayrnt](https://github.com/Kayrnt) ([4136](https://github.com/dbt-labs/dbt-core/pull/4170))

## dbt-core 1.0.0b2 (October 25, 2021)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{% set build_sql = get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}

{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
{%- set target_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set source_not_in_target = diff_columns(source_columns, target_columns) -%}
{%- set target_not_in_source = diff_columns(target_columns, source_columns) -%}

{% set new_target_types = diff_column_data_types(source_columns, target_columns) %}

{% if source_not_in_target != [] %}
Expand All @@ -72,6 +72,8 @@
'schema_changed': schema_changed,
'source_not_in_target': source_not_in_target,
'target_not_in_source': target_not_in_source,
'source_columns': source_columns,
'target_columns': target_columns,
'new_target_types': new_target_types
} %}

Expand Down Expand Up @@ -132,7 +134,11 @@

{% macro process_schema_changes(on_schema_change, source_relation, target_relation) %}

{% if on_schema_change != 'ignore' %}
{% if on_schema_change == 'ignore' %}

{{ return({}) }}

{% else %}

{% set schema_changes_dict = check_for_schema_changes(source_relation, target_relation) %}

Expand All @@ -158,6 +164,8 @@
{% endif %}

{% endif %}

{{ return(schema_changes_dict['source_columns']) }}

{% endif %}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='append_new_columns'
)
}}

{% set string_type = 'varchar(10)' %}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field3 as {{string_type}}) as field3,
cast(field4 as {{string_type}}) as field4
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2
FROM source_data where id <= 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{
config(materialized='table')
}}

{% set string_type = 'varchar(10)' %}

with source_data as (

select * from {{ ref('model_a') }}

)

select id,
cast(field1 as {{string_type}}) as field1,
cast(CASE WHEN id > 3 THEN NULL ELSE field2 END as {{string_type}}) AS field2,
cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3,
cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4

from source_data
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ def run_incremental_append_new_columns(self):
compare_source = 'incremental_append_new_columns'
compare_target = 'incremental_append_new_columns_target'
self.run_twice_and_assert(select, compare_source, compare_target)

def run_incremental_append_new_columns_remove_one(self):
select = 'model_a incremental_append_new_columns_remove_one incremental_append_new_columns_remove_one_target'
compare_source = 'incremental_append_new_columns_remove_one'
compare_target = 'incremental_append_new_columns_remove_one_target'
self.run_twice_and_assert(select, compare_source, compare_target)

def run_incremental_sync_all_columns(self):
select = 'model_a incremental_sync_all_columns incremental_sync_all_columns_target'
Expand All @@ -70,6 +76,7 @@ def test__postgres__run_incremental_ignore(self):
@use_profile('postgres')
def test__postgres__run_incremental_append_new_columns(self):
self.run_incremental_append_new_columns()
self.run_incremental_append_new_columns_remove_one()

@use_profile('postgres')
def test__postgres__run_incremental_sync_all_columns(self):
Expand Down