diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f1db87aa..e6e1560c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/126)) ### Under the hood +- Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history. - Add changelog, issue templates ([#119](https://github.com/fishtown-analytics/dbt-spark/pull/119), [#120](https://github.com/fishtown-analytics/dbt-spark/pull/120)) ### Fixes @@ -11,6 +12,7 @@ ### Contributors - [@danielvdende](https://github.com/danielvdende) ([#132](https://github.com/fishtown-analytics/dbt-spark/pull/132)) +- [@Fokko](https://github.com/Fokko) ([#125](https://github.com/fishtown-analytics/dbt-spark/pull/125)) ## dbt-spark 0.18.1.1 (November 13, 2020) diff --git a/dbt/adapters/spark/column.py b/dbt/adapters/spark/column.py index 2f5e851dc..d8292f6ef 100644 --- a/dbt/adapters/spark/column.py +++ b/dbt/adapters/spark/column.py @@ -2,6 +2,7 @@ from typing import TypeVar, Optional, Dict, Any from dbt.adapters.base.column import Column +from hologram import JsonDict Self = TypeVar('Self', bound='SparkColumn') @@ -54,7 +55,9 @@ def convert_table_stats(raw_stats: Optional[str]) -> Dict[str, Any]: table_stats[f'stats:{key}:include'] = True return table_stats - def to_dict(self, omit_none=False): + def to_dict( + self, omit_none: bool = True, validate: bool = False + ) -> JsonDict: original_dict = super().to_dict(omit_none=omit_none) # If there are stats, merge them into the root of the dict original_stats = original_dict.pop('table_stats') diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 15f89f7df..0e8ec7b78 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -2,6 +2,7 @@ from dataclasses import dataclass from typing import Optional, List, Dict, Any, Union, Iterable import agate +from dbt.contracts.relation import RelationType import dbt import dbt.exceptions @@ -131,11 +132,14 @@ def list_relations_without_caching( f'got {len(row)} values, expected 4' ) _schema, name, _, information = row - rel_type = ('view' if 'Type: VIEW' in information else 'table') + rel_type = RelationType.View \ + if 'Type: VIEW' in information else RelationType.Table + is_delta = 'Provider: delta' in information relation = self.Relation.create( schema=_schema, identifier=name, - type=rel_type + type=rel_type, + is_delta=is_delta ) relations.append(relation) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 4aa06f820..507f51d3b 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -1,3 +1,5 @@ +from typing import Optional + from dataclasses import dataclass from dbt.adapters.base.relation import BaseRelation, Policy @@ -23,6 +25,7 @@ class SparkRelation(BaseRelation): quote_policy: SparkQuotePolicy = SparkQuotePolicy() include_policy: SparkIncludePolicy = SparkIncludePolicy() quote_character: str = '`' + is_delta: Optional[bool] = None def __post_init__(self): if self.database != self.schema and self.database: diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 02253fe5e..a45b0d1ae 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -77,7 +77,11 @@ {% if temporary -%} {{ create_temporary_view(relation, sql) }} {%- else -%} - create table {{ relation }} + {% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %} + create or replace table {{ relation }} + {% else %} + create table {{ relation }} + {% endif %} {{ file_format_clause() }} {{ partition_cols(label="partitioned by") }} {{ clustered_cols(label="clustered by") }} diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index 78214641b..cd43253e1 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -75,26 +75,34 @@ {%- set strategy_name = config.get('strategy') -%} {%- set unique_key = config.get('unique_key') %} {%- set file_format = config.get('file_format', 'parquet') -%} - - {% set invalid_format_msg -%} - Invalid file format: {{ file_format }} - Snapshot functionality requires file_format be set to 'delta' - {%- endset %} - + + {% set target_relation_exists, target_relation = get_or_create_relation( + database=none, + schema=model.schema, + identifier=target_table, + type='table') -%} + {%- if file_format != 'delta' -%} + {% set invalid_format_msg -%} + Invalid file format: {{ file_format }} + Snapshot functionality requires file_format be set to 'delta' + {%- endset %} {% do exceptions.raise_compiler_error(invalid_format_msg) %} {% endif %} + {%- if target_relation_exists -%} + {%- if not target_relation.is_delta -%} + {% set invalid_format_msg -%} + The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' + {%- endset %} + {% do exceptions.raise_compiler_error(invalid_format_msg) %} + {% endif %} + {% endif %} + {% if not adapter.check_schema_exists(model.database, model.schema) %} {% do create_schema(model.database, model.schema) %} {% endif %} - {% set target_relation_exists, target_relation = get_or_create_relation( - database=none, - schema=model.schema, - identifier=target_table, - type='table') -%} - {%- if not target_relation.is_table -%} {% do exceptions.relation_wrong_type(target_relation, 'table') %} {%- endif -%} diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index d772a5548..adfdb7a3c 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -11,7 +11,9 @@ {{ run_hooks(pre_hooks) }} -- setup: if the target relation already exists, drop it - {% if old_relation -%} + -- in case if the existing and future table is delta, we want to do a + -- create or replace table instead of dropping, so we don't have the table unavailable + {% if old_relation and not (old_relation.is_delta and config.get('file_format', validator=validation.any[basestring]) == 'delta') -%} {{ adapter.drop_relation(old_relation) }} {%- endif %} diff --git a/test/unit/test_macros.py b/test/unit/test_macros.py index 325e80b6c..5c5e3f8cf 100644 --- a/test/unit/test_macros.py +++ b/test/unit/test_macros.py @@ -8,104 +8,86 @@ class TestSparkMacros(unittest.TestCase): def setUp(self): self.jinja_env = Environment(loader=FileSystemLoader('dbt/include/spark/macros'), - extensions=['jinja2.ext.do',]) + extensions=['jinja2.ext.do', ]) self.config = {} - - self.default_context = {} - self.default_context['validation'] = mock.Mock() - self.default_context['model'] = mock.Mock() - self.default_context['exceptions'] = mock.Mock() - self.default_context['config'] = mock.Mock() + self.default_context = { + 'validation': mock.Mock(), + 'model': mock.Mock(), + 'exceptions': mock.Mock(), + 'config': mock.Mock() + } self.default_context['config'].get = lambda key, default=None, **kwargs: self.config.get(key, default) - def __get_template(self, template_filename): return self.jinja_env.get_template(template_filename, globals=self.default_context) - def __run_macro(self, template, name, temporary, relation, sql): self.default_context['model'].alias = relation value = getattr(template.module, name)(temporary, relation, sql) return re.sub(r'\s\s+', ' ', value) - def test_macros_load(self): self.jinja_env.get_template('adapters.sql') - def test_macros_create_table_as(self): template = self.__get_template('adapters.sql') + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table as select 1") - + self.assertEqual(sql, "create table my_table as select 1") def test_macros_create_table_as_file_format(self): template = self.__get_template('adapters.sql') - self.config['file_format'] = 'delta' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table using delta as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create or replace table my_table using delta as select 1") def test_macros_create_table_as_partition(self): template = self.__get_template('adapters.sql') - self.config['partition_by'] = 'partition_1' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table partitioned by (partition_1) as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table partitioned by (partition_1) as select 1") def test_macros_create_table_as_partitions(self): template = self.__get_template('adapters.sql') - self.config['partition_by'] = ['partition_1', 'partition_2'] - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table partitioned by (partition_1,partition_2) as select 1") - def test_macros_create_table_as_cluster(self): template = self.__get_template('adapters.sql') - self.config['clustered_by'] = 'cluster_1' self.config['buckets'] = '1' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table clustered by (cluster_1) into 1 buckets as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table clustered by (cluster_1) into 1 buckets as select 1") def test_macros_create_table_as_clusters(self): template = self.__get_template('adapters.sql') - self.config['clustered_by'] = ['cluster_1', 'cluster_2'] self.config['buckets'] = '1' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table clustered by (cluster_1,cluster_2) into 1 buckets as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table clustered by (cluster_1,cluster_2) into 1 buckets as select 1") def test_macros_create_table_as_location(self): template = self.__get_template('adapters.sql') - self.config['location_root'] = '/mnt/root' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table location '/mnt/root/my_table' as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table location '/mnt/root/my_table' as select 1") def test_macros_create_table_as_comment(self): template = self.__get_template('adapters.sql') - self.config['persist_docs'] = {'relation': True} self.default_context['model'].description = 'Description Test' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table comment 'Description Test' as select 1") - + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual(sql, "create table my_table comment 'Description Test' as select 1") def test_macros_create_table_as_all(self): template = self.__get_template('adapters.sql') @@ -118,5 +100,8 @@ def test_macros_create_table_as_all(self): self.config['persist_docs'] = {'relation': True} self.default_context['model'].description = 'Description Test' - self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1") + sql = self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1').strip() + self.assertEqual( + sql, + "create or replace table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1" + )