Skip to content

Commit

Permalink
Add incremental materializations
Browse files Browse the repository at this point in the history
  • Loading branch information
stu-k committed Jul 28, 2022
1 parent c1360c9 commit 1f4799c
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 19 deletions.
2 changes: 2 additions & 0 deletions dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
dbt invocation.
--#}
{{ py_write_table(compiled_code=compiled_code, target_relation=relation.quote(database=False, schema=False, identifier=False)) }}
{%- else -%}
{% do exceptions.raise_compiler_error("bigquery__create_table_as macro didn't get supported language, it got %s" % language) %}
{%- endif -%}

{%- endmacro -%}
Expand Down
67 changes: 51 additions & 16 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
{% macro declare_dbt_max_partition(relation, partition_by, sql) %}
{% macro declare_dbt_max_partition(relation, partition_by, complied_code, language='sql') %}

{% if '_dbt_max_partition' in sql %}
{#-- TODO: revisit partitioning with python models --#}
{%- if '_dbt_max_partition' in complied_code and language == 'sql' -%}

declare _dbt_max_partition {{ partition_by.data_type }} default (
select max({{ partition_by.field }}) from {{ this }}
where {{ partition_by.field }} is not null
);

{% endif %}
{%- endif -%}

{% endmacro %}

Expand Down Expand Up @@ -74,7 +75,9 @@
{{ declare_dbt_max_partition(this, partition_by, sql) }}

-- 1. create a temp table
{{ create_table_as(True, tmp_relation, sql) }}
{%- call statement('main') -%}
{{ create_table_as(True, tmp_relation, compiled_code, language) }}
{%- endcall -%}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}
Expand Down Expand Up @@ -139,6 +142,7 @@

{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{%- set language = model['language'] %}

{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) %}
Expand All @@ -160,43 +164,74 @@
{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% else %}
{%- if language == 'python' and strategy == 'insert_overwrite' -%}
{#-- This lets us move forward assuming no python will be directly templated into a query --#}
{%- set python_unsupported_msg -%}
The 'insert_overwrite' strategy is not yet supported for python models.
{%- endset %}
{% do exceptions.raise_compiler_error(python_unsupported_msg) %}
{%- endif -%}

{% set tmp_relation_exists = false %}
{% if on_schema_change != 'ignore' %} {# Check first, since otherwise we may not build a temp table #}
{% do run_query(
declare_dbt_max_partition(this, partition_by, sql) + create_table_as(True, tmp_relation, sql)
) %}
{% if on_schema_change != 'ignore' or language == 'python' %}
{#-- Check first, since otherwise we may not build a temp table --#}
{#-- Python always needs to create a temp table --#}
{%- call statement('create_tmp_relation', language=language) -%}
{{ declare_dbt_max_partition(this, partition_by, compiled_code, language) +
create_table_as(True, tmp_relation, compiled_code, language)
}}
{%- endcall -%}
{% set tmp_relation_exists = true %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% endif %}

{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{% set build_sql = bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}

{% endif %}
{%- call statement('main') -%}
{{ build_sql }}
{% endcall %}

{%- if language == 'python' and tmp_relation -%}
{#--
This is yucky.
See note in dbt-spark/dbt/include/spark/macros/adapters.sql
re: python models and temporary views.

{%- call statement('main') -%}
{{ build_sql }}
{% endcall %}
Also, why does not either drop_relation or adapter.drop_relation work here?!
--#}
{{ adapter.drop_relation(tmp_relation) }}
{%- endif -%}

{% endif %}

{{ run_hooks(post_hooks) }}

Expand Down
9 changes: 6 additions & 3 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
import pytest
from dbt.tests.util import run_dbt, write_file
from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests
import dbt.tests.adapter.python_model.test_python_model as dbt_tests

class TestPythonIncrementalMatsBigQuery(dbt_tests.BasePythonIncrementalTests):
pass

@pytest.skip("dataproc not enabled on testing account", allow_module_level=True)
class TestPythonModelSpark(BasePythonModelTests):
class TestPythonModelSpark(dbt_tests.BasePythonModelTests):
pass

models__simple_python_model = """
Expand Down Expand Up @@ -44,4 +47,4 @@ def test_changing_schema(self,project, logs_dir):
# validate #5510 log_code_execution works
assert "On model.test.simple_python_model:" in log
assert "return spark.createDataFrame(data, schema=['test1', 'test3'])" in log
assert "Execution status: OK in" in log
assert "Execution status: OK in" in log

0 comments on commit 1f4799c

Please sign in to comment.