From 56429653dc0a7593a671638ce4e04eb9e573f930 Mon Sep 17 00:00:00 2001 From: niels Date: Sun, 22 Dec 2019 10:48:31 +0100 Subject: [PATCH 01/11] Expose location, clustered_by to dbt-spark --- dbt/include/spark/macros/adapters.sql | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index c4616ca8c..91a665bd6 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -12,6 +12,13 @@ {%- endif %} {%- endmacro -%} +{% macro location_clause() %} + {%- set path = config.get('location', validator=validation.any[basestring]) -%} + {%- if path is not none %} + location '{{ path }}' + {%- endif %} +{%- endmacro -%} + {% macro partition_cols(label, required=false) %} {%- set cols = config.get('partition_by', validator=validation.any[list, basestring]) -%} {%- if cols is not none %} @@ -27,6 +34,22 @@ {%- endif %} {%- endmacro -%} +{% macro clustered_cols(label, required=false) %} + {%- set cols = config.get('clustered_by', validator=validation.any[list, basestring]) -%} + {%- set buckets = config.get('clustered_by', validator=validation.any[int]) -%} + {%- if (cols is not none) and (buckets is not none) %} + {%- if cols is string -%} + {%- set cols = [cols] -%} + {%- endif -%} + {{ label }} ( + {%- for item in cols -%} + {{ item }} + {%- if not loop.last -%},{%- endif -%} + {%- endfor -%} + ) into {{ buckets }} buckets + {%- endif %} +{%- endmacro -%} + {% macro spark__create_table_as(temporary, relation, sql) -%} {% if temporary -%} {{ spark_create_temporary_view(relation, sql) }} @@ -34,6 +57,8 @@ create table {{ relation }} {{ file_format_clause() }} {{ partition_cols(label="partitioned by") }} + {{ clustered_cols(label="clustered by") }} + {{ location_clause() }} as {{ sql }} {%- endif %} From ad676ea932cbb6e008dfde04767a1f103a7a1fbc Mon Sep 17 00:00:00 2001 From: niels Date: Sun, 22 Dec 2019 10:54:35 +0100 Subject: [PATCH 02/11] Fixup --- dbt/include/spark/macros/adapters.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 91a665bd6..159b4baf4 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -36,7 +36,7 @@ {% macro clustered_cols(label, required=false) %} {%- set cols = config.get('clustered_by', validator=validation.any[list, basestring]) -%} - {%- set buckets = config.get('clustered_by', validator=validation.any[int]) -%} + {%- set buckets = config.get('buckets', validator=validation.any[int]) -%} {%- if (cols is not none) and (buckets is not none) %} {%- if cols is string -%} {%- set cols = [cols] -%} From 8b0d1e04b191541425979cdfd661a1fa2b2177e8 Mon Sep 17 00:00:00 2001 From: niels Date: Fri, 3 Jan 2020 09:41:32 +0100 Subject: [PATCH 03/11] Add unit-test for macros --- test/unit/test_macros.py | 108 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 test/unit/test_macros.py diff --git a/test/unit/test_macros.py b/test/unit/test_macros.py new file mode 100644 index 000000000..7228f2dae --- /dev/null +++ b/test/unit/test_macros.py @@ -0,0 +1,108 @@ +import mock +import unittest +import re +from collections import defaultdict +from jinja2 import Environment, FileSystemLoader +from dbt.context.common import _add_validation + + +class TestSparkMacros(unittest.TestCase): + + def setUp(self): + self.jinja_env = Environment(loader=FileSystemLoader('dbt/include/spark/macros'), + extensions=['jinja2.ext.do',]) + + self.config = defaultdict(lambda: None) + + self.default_context = {} + self.default_context['validation'] = mock.Mock() + self.default_context['config'] = mock.Mock(return_value='') + self.default_context['config'].get = lambda key, *args, **kwargs: self.config[key] + + + def __get_template(self, template_filename): + return self.jinja_env.get_template(template_filename, globals=self.default_context) + + + def __run_macro(self, template, name, *args): + value = getattr(template.module, name)(*args) + return re.sub(r'\s\s+', ' ', value) + + + def test_macros_load(self): + self.jinja_env.get_template('adapters.sql') + + + def test_macros_create_table_as(self): + template = self.__get_template('adapters.sql') + + self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + "create table my_table as select 1") + + + def test_macros_create_table_as_file_format(self): + template = self.__get_template('adapters.sql') + + + self.config['file_format'] = 'delta' + self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + "create table my_table using delta as select 1") + + + def test_macros_create_table_as_partition(self): + template = self.__get_template('adapters.sql') + + + self.config['partition_by'] = 'partition_1' + self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + "create table my_table partitioned by (partition_1) as select 1") + + + def test_macros_create_table_as_partitions(self): + template = self.__get_template('adapters.sql') + + + self.config['partition_by'] = ['partition_1', 'partition_2'] + self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + "create table my_table partitioned by (partition_1,partition_2) as select 1") + + + def test_macros_create_table_as_cluster(self): + template = self.__get_template('adapters.sql') + + + self.config['clustered_by'] = 'cluster_1' + self.config['buckets'] = '1' + self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + "create table my_table clustered by (cluster_1) into 1 buckets as select 1") + + + def test_macros_create_table_as_clusters(self): + template = self.__get_template('adapters.sql') + + + self.config['clustered_by'] = ['cluster_1', 'cluster_2'] + self.config['buckets'] = '1' + self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + "create table my_table clustered by (cluster_1,cluster_2) into 1 buckets as select 1") + + + def test_macros_create_table_as_location(self): + template = self.__get_template('adapters.sql') + + + self.config['location'] = '/mnt/root' + self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + "create table my_table location '/mnt/root' as select 1") + + + def test_macros_create_table_as_all(self): + template = self.__get_template('adapters.sql') + + self.config['file_format'] = 'delta' + self.config['location'] = '/mnt/root' + self.config['partition_by'] = ['partition_1', 'partition_2'] + self.config['clustered_by'] = ['cluster_1', 'cluster_2'] + self.config['buckets'] = '1' + self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + "create table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root' as select 1") From b167e5f12e68646db5d638c21d067d7266556aa7 Mon Sep 17 00:00:00 2001 From: niels Date: Thu, 9 Jan 2020 15:42:51 +0100 Subject: [PATCH 04/11] Support persist_docs --- dbt/include/spark/macros/adapters.sql | 38 ++++++++++++++++++++++++--- test/unit/test_macros.py | 23 +++++++++++++--- 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 159b4baf4..f8d9a793d 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -5,6 +5,7 @@ {{ sql }} {% endmacro %} + {% macro file_format_clause() %} {%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%} {%- if file_format is not none %} @@ -12,6 +13,7 @@ {%- endif %} {%- endmacro -%} + {% macro location_clause() %} {%- set path = config.get('location', validator=validation.any[basestring]) -%} {%- if path is not none %} @@ -19,6 +21,20 @@ {%- endif %} {%- endmacro -%} + +{% macro comment_clause() %} + {%- set raw_persist_docs = config.get('persist_docs', {}) -%} + + {%- if raw_persist_docs is mapping -%} + {%- set raw_relation = raw_persist_docs.get('relation', false) -%} + {%- if raw_relation -%} + comment '{{ model.description }}' + {% endif %} + {%- else -%} + {{ exceptions.raise_compiler_error("Invalid value provided for 'persist_docs'. Expected dict but got value: " ~ raw_persist_docs) }} + {% endif %} +{%- endmacro -%} + {% macro partition_cols(label, required=false) %} {%- set cols = config.get('partition_by', validator=validation.any[list, basestring]) -%} {%- if cols is not none %} @@ -34,6 +50,7 @@ {%- endif %} {%- endmacro -%} + {% macro clustered_cols(label, required=false) %} {%- set cols = config.get('clustered_by', validator=validation.any[list, basestring]) -%} {%- set buckets = config.get('buckets', validator=validation.any[int]) -%} @@ -50,6 +67,7 @@ {%- endif %} {%- endmacro -%} + {% macro spark__create_table_as(temporary, relation, sql) -%} {% if temporary -%} {{ spark_create_temporary_view(relation, sql) }} @@ -59,23 +77,37 @@ {{ partition_cols(label="partitioned by") }} {{ clustered_cols(label="clustered by") }} {{ location_clause() }} + {{ comment_clause() }} as {{ sql }} {%- endif %} {%- endmacro -%} + {% macro spark__create_view_as(relation, sql) -%} - create view {{ relation }} as + create view {{ relation }} + {{ comment_clause() }} + as {{ sql }} {% endmacro %} + {% macro spark__get_columns_in_relation(relation) -%} {% call statement('get_columns_in_relation', fetch_result=True) %} describe {{ relation }} {% endcall %} - {% set table = load_result('get_columns_in_relation').table %} - {{ return(sql_convert_columns_in_relation(table)) }} + {% set columns = [] %} + {% set vars = {'before_partition_info': True} %} + {% for row in load_result('get_columns_in_relation').table if vars.before_partition_info %} + {% if row[0].startswith('#') %} + {{ vars.update({'before_partition_info': False}) }} + {% else %} + {{ dbt_utils.log_info(row) }} + {{ columns.append(row) }} + {% endif %} + {% endfor %} + {{ return(sql_convert_columns_in_relation(columns)) }} {% endmacro %} diff --git a/test/unit/test_macros.py b/test/unit/test_macros.py index 7228f2dae..c08cecb3b 100644 --- a/test/unit/test_macros.py +++ b/test/unit/test_macros.py @@ -12,12 +12,14 @@ def setUp(self): self.jinja_env = Environment(loader=FileSystemLoader('dbt/include/spark/macros'), extensions=['jinja2.ext.do',]) - self.config = defaultdict(lambda: None) + self.config = {} self.default_context = {} self.default_context['validation'] = mock.Mock() - self.default_context['config'] = mock.Mock(return_value='') - self.default_context['config'].get = lambda key, *args, **kwargs: self.config[key] + self.default_context['model'] = mock.Mock() + self.default_context['exceptions'] = mock.Mock() + self.default_context['config'] = mock.Mock() + self.default_context['config'].get = lambda key, default=None, **kwargs: self.config.get(key, default) def __get_template(self, template_filename): @@ -96,6 +98,16 @@ def test_macros_create_table_as_location(self): "create table my_table location '/mnt/root' as select 1") + def test_macros_create_table_as_comment(self): + template = self.__get_template('adapters.sql') + + + self.config['persist_docs'] = {'relation': True} + self.default_context['model'].description = 'Description Test' + self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), + "create table my_table comment 'Description Test' as select 1") + + def test_macros_create_table_as_all(self): template = self.__get_template('adapters.sql') @@ -104,5 +116,8 @@ def test_macros_create_table_as_all(self): self.config['partition_by'] = ['partition_1', 'partition_2'] self.config['clustered_by'] = ['cluster_1', 'cluster_2'] self.config['buckets'] = '1' + self.config['persist_docs'] = {'relation': True} + self.default_context['model'].description = 'Description Test' + self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root' as select 1") + "create table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root' comment 'Description Test' as select 1") From 29615d45e40863adba24968d17d328627e4f064f Mon Sep 17 00:00:00 2001 From: niels Date: Tue, 4 Feb 2020 14:17:59 +0100 Subject: [PATCH 05/11] Fixup, removed log statement --- dbt/include/spark/macros/adapters.sql | 1 - 1 file changed, 1 deletion(-) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index f8d9a793d..221662163 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -103,7 +103,6 @@ {% if row[0].startswith('#') %} {{ vars.update({'before_partition_info': False}) }} {% else %} - {{ dbt_utils.log_info(row) }} {{ columns.append(row) }} {% endif %} {% endfor %} From f23901c40151599573769092745550baba434edc Mon Sep 17 00:00:00 2001 From: niels Date: Tue, 4 Feb 2020 14:32:21 +0100 Subject: [PATCH 06/11] Fixup, readme + adapter specific config --- README.md | 5 ++++- dbt/adapters/spark/impl.py | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 821b3ce7d..28f1f4152 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,10 @@ The following configurations can be supplied to models run with the dbt-spark pl | Option | Description | Required? | Example | |---------|----------------------------------------------------|-------------------------|--------------------------| | file_format | The file format to use when creating tables | Optional | `parquet` | - +| location | The created table uses the specified directory to store its data. | Optional | `/mnt/root` | +| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` | +| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` | +| buckets | The number of buckets to create while clustering | Optional | `8` | **Incremental Models** diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 30c85820d..4d12f2a11 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -15,6 +15,9 @@ class SparkAdapter(SQLAdapter): ConnectionManager = SparkConnectionManager Relation = SparkRelation + + AdapterSpecificConfigs = frozenset({"file_format", "location", "partition_by", + "clustered_by", "buckets"}) @classmethod def date_function(cls): From db598507e91f6d9e3ad6f8b565fb077391678c11 Mon Sep 17 00:00:00 2001 From: niels Date: Tue, 4 Feb 2020 14:34:41 +0100 Subject: [PATCH 07/11] Make explicit that buckets is required if clustered_by is specified --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 28f1f4152..0e08de2c5 100644 --- a/README.md +++ b/README.md @@ -84,8 +84,8 @@ The following configurations can be supplied to models run with the dbt-spark pl | file_format | The file format to use when creating tables | Optional | `parquet` | | location | The created table uses the specified directory to store its data. | Optional | `/mnt/root` | | partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` | -| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` | -| buckets | The number of buckets to create while clustering | Optional | `8` | +| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` | +| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` | **Incremental Models** From c814994df2aa5feb2a416acb64f8dd8a3e06e3b6 Mon Sep 17 00:00:00 2001 From: niels Date: Thu, 6 Feb 2020 17:02:54 +0100 Subject: [PATCH 08/11] Revert spark__get_columns_in_relation --- dbt/include/spark/macros/adapters.sql | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 221662163..389bd4edc 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -97,16 +97,8 @@ describe {{ relation }} {% endcall %} - {% set columns = [] %} - {% set vars = {'before_partition_info': True} %} - {% for row in load_result('get_columns_in_relation').table if vars.before_partition_info %} - {% if row[0].startswith('#') %} - {{ vars.update({'before_partition_info': False}) }} - {% else %} - {{ columns.append(row) }} - {% endif %} - {% endfor %} - {{ return(sql_convert_columns_in_relation(columns)) }} + {% set table = load_result('get_columns_in_relation').table %} + {{ return(sql_convert_columns_in_relation(table)) }} {% endmacro %} From 50e13780f65d7989c5217afdbe7b2a9ed80d5635 Mon Sep 17 00:00:00 2001 From: niels Date: Thu, 6 Feb 2020 17:13:07 +0100 Subject: [PATCH 09/11] Switch to location_root --- dbt/include/spark/macros/adapters.sql | 7 ++++--- test/unit/test_macros.py | 13 +++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 389bd4edc..ebf328085 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -15,9 +15,10 @@ {% macro location_clause() %} - {%- set path = config.get('location', validator=validation.any[basestring]) -%} - {%- if path is not none %} - location '{{ path }}' + {%- set location_root = config.get('location_root', validator=validation.any[basestring]) -%} + {%- set identifier = model['alias'] -%} + {%- if location_root is not none %} + location '{{ location_root }}/{{ identifier }}' {%- endif %} {%- endmacro -%} diff --git a/test/unit/test_macros.py b/test/unit/test_macros.py index c08cecb3b..eb8852ed0 100644 --- a/test/unit/test_macros.py +++ b/test/unit/test_macros.py @@ -26,8 +26,9 @@ def __get_template(self, template_filename): return self.jinja_env.get_template(template_filename, globals=self.default_context) - def __run_macro(self, template, name, *args): - value = getattr(template.module, name)(*args) + def __run_macro(self, template, name, temporary, relation, sql): + self.default_context['model'].alias = relation + value = getattr(template.module, name)(temporary, relation, sql) return re.sub(r'\s\s+', ' ', value) @@ -93,9 +94,9 @@ def test_macros_create_table_as_location(self): template = self.__get_template('adapters.sql') - self.config['location'] = '/mnt/root' + self.config['location_root'] = '/mnt/root' self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table location '/mnt/root' as select 1") + "create table my_table location '/mnt/root/my_table' as select 1") def test_macros_create_table_as_comment(self): @@ -112,7 +113,7 @@ def test_macros_create_table_as_all(self): template = self.__get_template('adapters.sql') self.config['file_format'] = 'delta' - self.config['location'] = '/mnt/root' + self.config['location_root'] = '/mnt/root' self.config['partition_by'] = ['partition_1', 'partition_2'] self.config['clustered_by'] = ['cluster_1', 'cluster_2'] self.config['buckets'] = '1' @@ -120,4 +121,4 @@ def test_macros_create_table_as_all(self): self.default_context['model'].description = 'Description Test' self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'), - "create table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root' comment 'Description Test' as select 1") + "create table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root/my_table' comment 'Description Test' as select 1") From 81c0ef05b734e55d5428d2c8a75c002e8aa7dd5b Mon Sep 17 00:00:00 2001 From: niels Date: Thu, 6 Feb 2020 20:59:12 +0100 Subject: [PATCH 10/11] Fixup, location_root --- README.md | 2 +- dbt/adapters/spark/impl.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0e08de2c5..a3339b2fb 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ The following configurations can be supplied to models run with the dbt-spark pl | Option | Description | Required? | Example | |---------|----------------------------------------------------|-------------------------|--------------------------| | file_format | The file format to use when creating tables | Optional | `parquet` | -| location | The created table uses the specified directory to store its data. | Optional | `/mnt/root` | +| location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | `/mnt/root` | | partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` | | clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` | | buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` | diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 4d12f2a11..80140bb00 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -16,7 +16,7 @@ class SparkAdapter(SQLAdapter): ConnectionManager = SparkConnectionManager Relation = SparkRelation - AdapterSpecificConfigs = frozenset({"file_format", "location", "partition_by", + AdapterSpecificConfigs = frozenset({"file_format", "location_root", "partition_by", "clustered_by", "buckets"}) @classmethod From f7f8d843a6e1c777a396a6b68732a30034d0215c Mon Sep 17 00:00:00 2001 From: niels Date: Thu, 6 Feb 2020 21:02:17 +0100 Subject: [PATCH 11/11] Fixup, pep8 --- dbt/adapters/spark/impl.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 80140bb00..ba4527968 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -15,9 +15,10 @@ class SparkAdapter(SQLAdapter): ConnectionManager = SparkConnectionManager Relation = SparkRelation - - AdapterSpecificConfigs = frozenset({"file_format", "location_root", "partition_by", - "clustered_by", "buckets"}) + + AdapterSpecificConfigs = frozenset({"file_format", "location_root", + "partition_by", "clustered_by", + "buckets"}) @classmethod def date_function(cls):