Skip to content

Commit

Permalink
Use common columns for incremental schema changes (dbt-labs#51)
Browse files Browse the repository at this point in the history
* Use common columns for incremental schema changes

* Review changes

* Follow up change for lean return version
  • Loading branch information
Kayrnt authored Nov 8, 2021
1 parent 8e11299 commit b8bde99
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 7 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
N/A

### Fixes
- Fix problem with bytes proccessed return None value when the service account used to connect DBT in bigquery had a row policy access.
- Fix problem with bytes processed return None value when the service account used to connect DBT in bigquery had a row policy access.
([#47](https://github.com/dbt-labs/dbt-bigquery/issues/47), [#48](https://github.com/dbt-labs/dbt-bigquery/pull/48))
- When on_schema_change is set, pass common columns as dest_columns in incremental merge macros ([#4144](https://github.com/dbt-labs/dbt-core/issues/4144))

### Under the hood
- Capping `google-api-core` to version `1.31.3` due to `protobuf` dependency conflict ([#53](https://github.com/dbt-labs/dbt-bigquery/pull/53))

### Contributors
- [@imartynetz](https://github.com/imartynetz) ([#48](https://github.com/dbt-labs/dbt-bigquery/pull/48))
- [@Kayrnt](https://github.com/Kayrnt) ([#51](https://github.com/dbt-labs/dbt-bigquery/pull/51))

## dbt-bigquery 1.0.0b2 (October 25, 2021)

Expand Down
8 changes: 5 additions & 3 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@
declare_dbt_max_partition(this, partition_by, sql) + create_table_as(True, tmp_relation, sql)
) %}
{% set tmp_relation_exists = true %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% endif %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% set build_sql = bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
{% if is_incremental() %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2,
cast(field3 as {{string_type}}) as field3,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2,
cast(field3 as {{string_type}}) as field3,
cast(field4 as {{string_type}}) as field4
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='append_new_columns'
)
}}

{% set string_type = 'string' %}

WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )

{% if is_incremental() %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field3 as {{string_type}}) as field3,
cast(field4 as {{string_type}}) as field4
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )

{% else %}

SELECT id,
cast(field1 as {{string_type}}) as field1,
cast(field2 as {{string_type}}) as field2
FROM source_data where id <= 3

{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{{
config(materialized='table')
}}

{% set string_type = 'string' %}

with source_data as (

select * from {{ ref('model_a') }}

)

select id,
cast(field1 as {{string_type}}) as field1,
cast(CASE WHEN id > 3 THEN NULL ELSE field2 END as {{string_type}}) AS field2,
cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3,
cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4

from source_data
14 changes: 14 additions & 0 deletions tests/integration/incremental_schema_tests/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ models:
tags: [column_level_tag]
tests:
- unique

- name: incremental_append_new_columns_remove_one
columns:
- name: id
tags: [column_level_tag]
tests:
- unique

- name: incremental_append_new_columns_remove_one_target
columns:
- name: id
tags: [column_level_tag]
tests:
- unique

- name: incremental_sync_all_columns
columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ def run_incremental_append_new_columns(self):
self.list_tests_and_assert(select, exclude, expected)
self.run_tests_and_assert(select, exclude, expected, compare_source, compare_target)

def run_incremental_append_new_columns_remove_one(self):
select = 'model_a incremental_append_new_columns_remove_one incremental_append_new_columns_remove_one_target'
compare_source = 'incremental_append_new_columns_remove_one'
compare_target = 'incremental_append_new_columns_remove_one_target'
exclude = None
expected = [
'select_from_a',
'select_from_incremental_append_new_columns_remove_one',
'select_from_incremental_append_new_columns_remove_one_target',
'unique_model_a_id',
'unique_incremental_append_new_columns_remove_one_id',
'unique_incremental_append_new_columns_remove_one_target_id'
]
self.run_tests_and_assert(select, exclude, expected, compare_source, compare_target)

def run_incremental_sync_all_columns(self):
select = 'model_a incremental_sync_all_columns incremental_sync_all_columns_target'
compare_source = 'incremental_sync_all_columns'
Expand Down Expand Up @@ -116,6 +131,7 @@ def test__bigquery__run_incremental_ignore(self):
@use_profile('bigquery')
def test__bigquery__run_incremental_append_new_columns(self):
self.run_incremental_append_new_columns()
self.run_incremental_append_new_columns_remove_one()

@use_profile('bigquery')
def test__bigquery__run_incremental_sync_all_columns(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from {{ ref('incremental_append_new_columns_remove_one') }} where false
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from {{ ref('incremental_append_new_columns_remove_one_target') }} where false

0 comments on commit b8bde99

Please sign in to comment.