Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BQ partition configs support #3386

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
@@ -1,160 +1,169 @@

{% macro dbt_bigquery_validate_get_incremental_strategy(config) %}
{#-- Find and validate the incremental strategy #}
{%- set strategy = config.get("incremental_strategy", default="merge") -%}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ strategy }}
Expected one of: 'merge', 'insert_overwrite'
{%- endset %}
{% if strategy not in ['merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}

{% do return(strategy) %}
{#-- Find and validate the incremental strategy #}
{%- set strategy = config.get("incremental_strategy", default="merge") -%}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ strategy }}
Expected one of: 'merge', 'insert_overwrite'
{%- endset %}
{% if strategy not in ['merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}

{% do return(strategy) %}
{% endmacro %}


{% macro bq_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns) %}

{% if partitions is not none and partitions != [] %} {# static #}
{% if partitions is not none and partitions != [] %} {# static #}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
)
{%- endset %}

{%- set source_sql -%}
(
{{sql}}
)
{%- endset -%}

{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}

{% else %} {# dynamic #}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}

{%- set source_sql -%}
(
select * from {{ tmp_relation }}
)
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
declare _dbt_max_partition {{ partition_by.data_type }} default (
select max({{ partition_by.field }}) from {{ this }}
where {{ partition_by.field }} is not null
);
{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
)
{%- endset %}

-- 1. create a temp table
{{ create_table_as(True, tmp_relation, sql) }}
{%- set source_sql -%}
(
{{sql}}
)
{%- endset -%}

-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct {{ partition_by.render() }})
from {{ tmp_relation }}
);
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=true) }}

{#
TODO: include_sql_header is a hack; consider a better approach that includes
the sql_header at the materialization-level instead
#}
-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }};
{% else %} {# dynamic #}

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}
{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}

{% endif %}
{%- set source_sql -%}
(
select * from {{ tmp_relation }}
)
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
declare _dbt_max_partition {{ partition_by.data_type }} default (
select max({{ partition_by.field }}) from {{ this }}
where {{ partition_by.field }} is not null
);

-- 1. create a temp table
{{ create_table_as(True, tmp_relation, sql) }}

-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct {{ partition_by.render() }})
from {{ tmp_relation }}
);

{#
TODO: include_sql_header is a hack; consider a better approach that includes
the sql_header at the materialization-level instead
#}
-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header=false) }};

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}

{% endif %}

{% endmacro %}


{% materialization incremental, adapter='bigquery' -%}

{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) %}
{%- set tmp_relation = make_temp_relation(this) %}

{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%}

{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{%- set partitions = config.get('partitions', none) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}

{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if strategy == 'insert_overwrite' %}

{% set missing_partition_msg -%}
The 'insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% if partition_by is none %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) %}
{%- set tmp_relation = make_temp_relation(this) %}

{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%}

{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{%- set partitions = config.get('partitions', none) -%}
{%- set is_partition_filter_required = config.get('require_partition_filter', false) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}

{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if strategy == 'insert_overwrite' %}

{% set missing_partition_msg -%}
The 'insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% if partition_by is none %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% set build_sql = bq_insert_overwrite(
tmp_relation,
target_relation,
sql,
unique_key,
partition_by,
partitions,
dest_columns) %}

{% else %}
{#-- wrap sql in parens to make it a subquery --#}
{%- set source_sql -%}
(
{{ sql }}
)
{%- endset -%}

{% set predicates = [] %}
{% if is_partition_filter_required %}
{%- set partition_filter -%}
(DBT_INTERNAL_DEST.{{ partition_by.field }} is not null or DBT_INTERNAL_DEST.{{ partition_by.field }} is null)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure if there was a better way to get the DBT_INTERNAL_DEST alias in there than just typing it out. I tried {{ partition_by.render(alias='DBT_INTERNAL_DEST') }} but that renders to timestamp_trunc(my_partition_col, date) when partitioning by date on a timestamp col, and BigQuery doesn't like that as a partition filter.

Copy link
Contributor

@jtcohen6 jtcohen6 May 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and BigQuery doesn't like that as a partition filter

Really? It works as a partition by expression, but not as a partition-pruning filter? That's... too bad.

In any case, the approach you've taken here seems fine by me

{%- endset -%}
{% do predicates.append(partition_filter) %}
{% endif %}

{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns, predicates) %}

{% set build_sql = bq_insert_overwrite(
tmp_relation,
target_relation,
sql,
unique_key,
partition_by,
partitions,
dest_columns) %}

{% else %}
{#-- wrap sql in parens to make it a subquery --#}
{%- set source_sql -%}
(
{{sql}}
)
{%- endset -%}

{% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns) %}

{% endif %}
{% endif %}

{% endif %}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{% endcall %}
{%- call statement('main') -%}
{{ build_sql }}
{% endcall %}

{{ run_hooks(post_hooks) }}
{{ run_hooks(post_hooks) }}

{% set target_relation = this.incorporate(type='table') %}
{% set target_relation = this.incorporate(type='table') %}

{% do persist_docs(target_relation, model) %}
{% do persist_docs(target_relation, model) %}

{{ return({'relations': [target_relation]}) }}
{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}