Skip to content

Commit

Permalink
dbt_clone macros, materialization and tests to dbt-spark (#816)
Browse files Browse the repository at this point in the history
* dbt_clone macros, materialization and tests to dbt-spark

* slight reorg of macro

* add file_format to profile_config_update

* change pointer back to main

* add commonly used pytest skip
  • Loading branch information
McKnight-42 authored Jul 11, 2023
1 parent b685297 commit a6dc99f
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230627-155913.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: add dbt-spark portion of dbt_clone functionality
time: 2023-06-27T15:59:13.605278-05:00
custom:
Author: McKnight-42 aranke
Issue: "815"
76 changes: 76 additions & 0 deletions dbt/include/spark/macros/materializations/clone.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{% macro spark__can_clone_table() %}
{{ return(True) }}
{% endmacro %}

{% macro spark__create_or_replace_clone(this_relation, defer_relation) %}
create or replace table {{ this_relation }} shallow clone {{ defer_relation }}
{% endmacro %}

{%- materialization clone, adapter='spark' -%}

{%- set relations = {'relations': []} -%}

{%- if not defer_relation -%}
-- nothing to do
{{ log("No relation found in state manifest for " ~ model.unique_id, info=True) }}
{{ return(relations) }}
{%- endif -%}

{%- set existing_relation = load_cached_relation(this) -%}

{%- if existing_relation and not flags.FULL_REFRESH -%}
-- noop!
{{ log("Relation " ~ existing_relation ~ " already exists", info=True) }}
{{ return(relations) }}
{%- endif -%}

{%- set other_existing_relation = load_cached_relation(defer_relation) -%}
{%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%}

-- If this is a database that can do zero-copy cloning of tables, and the other relation is a table, then this will be a table
-- Otherwise, this will be a view

{% set can_clone_table = can_clone_table() %}

{%- if file_format != 'delta' -%}
{% set invalid_format_msg -%}
Invalid file format: {{ file_format }}
shallow clone requires file_format be set to 'delta'
{%- endset %}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{%- elif other_existing_relation and other_existing_relation.type == 'table' and can_clone_table -%}

{%- set target_relation = this.incorporate(type='table') -%}
{% if existing_relation is not none and not existing_relation.is_table %}
{{ log("Dropping relation " ~ existing_relation ~ " because it is of type " ~ existing_relation.type) }}
{{ drop_relation_if_exists(existing_relation) }}
{% endif %}

-- as a general rule, data platforms that can clone tables can also do atomic 'create or replace'
{% call statement('main') %}
{{ create_or_replace_clone(target_relation, defer_relation) }}
{% endcall %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}

{{ return({'relations': [target_relation]}) }}

{%- else -%}

{%- set target_relation = this.incorporate(type='view') -%}

-- reuse the view materialization
-- TODO: support actual dispatch for materialization macros
-- Tracking ticket: https://github.com/dbt-labs/dbt-core/issues/7799
{% set search_name = "materialization_view_" ~ adapter.type() %}
{% if not search_name in context %}
{% set search_name = "materialization_view_default" %}
{% endif %}
{% set materialization_macro = context[search_name] %}
{% set relations = materialization_macro() %}
{{ return(relations) }}
{% endif %}

{%- endmaterialization -%}
101 changes: 101 additions & 0 deletions tests/functional/adapter/dbt_clone/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
seed_csv = """id,name
1,Alice
2,Bob
"""

table_model_sql = """
{{ config(materialized='table') }}
select * from {{ ref('ephemeral_model') }}
-- establish a macro dependency to trigger state:modified.macros
-- depends on: {{ my_macro() }}
"""

view_model_sql = """
{{ config(materialized='view') }}
select * from {{ ref('seed') }}
-- establish a macro dependency that trips infinite recursion if not handled
-- depends on: {{ my_infinitely_recursive_macro() }}
"""

ephemeral_model_sql = """
{{ config(materialized='ephemeral') }}
select * from {{ ref('view_model') }}
"""

exposures_yml = """
version: 2
exposures:
- name: my_exposure
type: application
depends_on:
- ref('view_model')
owner:
email: test@example.com
"""

schema_yml = """
version: 2
models:
- name: view_model
columns:
- name: id
tests:
- unique:
severity: error
- not_null
- name: name
"""

get_schema_name_sql = """
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is not none -%}
{{ return(default_schema ~ '_' ~ custom_schema_name|trim) }}
-- put seeds into a separate schema in "prod", to verify that cloning in "dev" still works
{%- elif target.name == 'default' and node.resource_type == 'seed' -%}
{{ return(default_schema ~ '_' ~ 'seeds') }}
{%- else -%}
{{ return(default_schema) }}
{%- endif -%}
{%- endmacro %}
"""

snapshot_sql = """
{% snapshot my_cool_snapshot %}
{{
config(
target_database=database,
target_schema=schema,
unique_key='id',
strategy='check',
check_cols=['id'],
)
}}
select * from {{ ref('view_model') }}
{% endsnapshot %}
"""
macros_sql = """
{% macro my_macro() %}
{% do log('in a macro' ) %}
{% endmacro %}
"""

infinite_macros_sql = """
{# trigger infinite recursion if not handled #}
{% macro my_infinitely_recursive_macro() %}
{{ return(adapter.dispatch('my_infinitely_recursive_macro')()) }}
{% endmacro %}
{% macro default__my_infinitely_recursive_macro() %}
{% if unmet_condition %}
{{ my_infinitely_recursive_macro() }}
{% else %}
{{ return('') }}
{% endif %}
{% endmacro %}
"""

custom_can_clone_tables_false_macros_sql = """
{% macro can_clone_table() %}
{{ return(False) }}
{% endmacro %}
"""
80 changes: 80 additions & 0 deletions tests/functional/adapter/dbt_clone/test_dbt_clone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import pytest
from dbt.tests.adapter.dbt_clone.test_dbt_clone import BaseClonePossible
from tests.functional.adapter.dbt_clone.fixtures import (
seed_csv,
table_model_sql,
view_model_sql,
ephemeral_model_sql,
exposures_yml,
schema_yml,
snapshot_sql,
get_schema_name_sql,
macros_sql,
infinite_macros_sql,
)


@pytest.mark.skip_profile("apache_spark", "spark_session")
class TestSparkBigqueryClonePossible(BaseClonePossible):
@pytest.fixture(scope="class")
def models(self):
return {
"table_model.sql": table_model_sql,
"view_model.sql": view_model_sql,
"ephemeral_model.sql": ephemeral_model_sql,
"schema.yml": schema_yml,
"exposures.yml": exposures_yml,
}

@pytest.fixture(scope="class")
def macros(self):
return {
"macros.sql": macros_sql,
"infinite_macros.sql": infinite_macros_sql,
"get_schema_name.sql": get_schema_name_sql,
}

@pytest.fixture(scope="class")
def seeds(self):
return {
"seed.csv": seed_csv,
}

@pytest.fixture(scope="class")
def snapshots(self):
return {
"snapshot.sql": snapshot_sql,
}

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+file_format": "delta",
},
"seeds": {
"test": {
"quote_columns": False,
},
"+file_format": "delta",
},
"snapshots": {
"+file_format": "delta",
},
}

@pytest.fixture(autouse=True)
def clean_up(self, project):
yield
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=f"{project.test_schema}_seeds"
)
project.adapter.drop_schema(relation)

relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
project.adapter.drop_schema(relation)

pass

0 comments on commit a6dc99f

Please sign in to comment.