Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose location, clustered_by to dbt-spark #43

Merged
merged 11 commits into from
Feb 6, 2020
63 changes: 60 additions & 3 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,36 @@
{{ sql }}
{% endmacro %}


{% macro file_format_clause() %}
{%- set file_format = config.get('file_format', validator=validation.any[basestring]) -%}
{%- if file_format is not none %}
using {{ file_format }}
{%- endif %}
{%- endmacro -%}


{% macro location_clause() %}
{%- set path = config.get('location', validator=validation.any[basestring]) -%}
{%- if path is not none %}
location '{{ path }}'
NielsZeilemaker marked this conversation as resolved.
Show resolved Hide resolved
{%- endif %}
{%- endmacro -%}


{% macro comment_clause() %}
{%- set raw_persist_docs = config.get('persist_docs', {}) -%}

{%- if raw_persist_docs is mapping -%}
{%- set raw_relation = raw_persist_docs.get('relation', false) -%}
{%- if raw_relation -%}
comment '{{ model.description }}'
{% endif %}
{%- else -%}
{{ exceptions.raise_compiler_error("Invalid value provided for 'persist_docs'. Expected dict but got value: " ~ raw_persist_docs) }}
{% endif %}
{%- endmacro -%}

{% macro partition_cols(label, required=false) %}
{%- set cols = config.get('partition_by', validator=validation.any[list, basestring]) -%}
{%- if cols is not none %}
Expand All @@ -27,30 +50,64 @@
{%- endif %}
{%- endmacro -%}


{% macro clustered_cols(label, required=false) %}
{%- set cols = config.get('clustered_by', validator=validation.any[list, basestring]) -%}
{%- set buckets = config.get('buckets', validator=validation.any[int]) -%}
{%- if (cols is not none) and (buckets is not none) %}
{%- if cols is string -%}
{%- set cols = [cols] -%}
{%- endif -%}
{{ label }} (
{%- for item in cols -%}
{{ item }}
{%- if not loop.last -%},{%- endif -%}
{%- endfor -%}
) into {{ buckets }} buckets
{%- endif %}
{%- endmacro -%}


{% macro spark__create_table_as(temporary, relation, sql) -%}
{% if temporary -%}
{{ spark_create_temporary_view(relation, sql) }}
{%- else -%}
create table {{ relation }}
{{ file_format_clause() }}
{{ partition_cols(label="partitioned by") }}
{{ clustered_cols(label="clustered by") }}
{{ location_clause() }}
{{ comment_clause() }}
as
{{ sql }}
{%- endif %}
{%- endmacro -%}


{% macro spark__create_view_as(relation, sql) -%}
create view {{ relation }} as
create view {{ relation }}
{{ comment_clause() }}
as
{{ sql }}
{% endmacro %}


{% macro spark__get_columns_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
describe {{ relation }}
{% endcall %}

{% set table = load_result('get_columns_in_relation').table %}
{{ return(sql_convert_columns_in_relation(table)) }}
{% set columns = [] %}
NielsZeilemaker marked this conversation as resolved.
Show resolved Hide resolved
{% set vars = {'before_partition_info': True} %}
{% for row in load_result('get_columns_in_relation').table if vars.before_partition_info %}
{% if row[0].startswith('#') %}
{{ vars.update({'before_partition_info': False}) }}
{% else %}
{{ dbt_utils.log_info(row) }}
NielsZeilemaker marked this conversation as resolved.
Show resolved Hide resolved
{{ columns.append(row) }}
{% endif %}
{% endfor %}
{{ return(sql_convert_columns_in_relation(columns)) }}

{% endmacro %}

Expand Down
123 changes: 123 additions & 0 deletions test/unit/test_macros.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import mock
import unittest
import re
from collections import defaultdict
from jinja2 import Environment, FileSystemLoader
from dbt.context.common import _add_validation


class TestSparkMacros(unittest.TestCase):

def setUp(self):
self.jinja_env = Environment(loader=FileSystemLoader('dbt/include/spark/macros'),
extensions=['jinja2.ext.do',])

self.config = {}

self.default_context = {}
self.default_context['validation'] = mock.Mock()
self.default_context['model'] = mock.Mock()
self.default_context['exceptions'] = mock.Mock()
self.default_context['config'] = mock.Mock()
self.default_context['config'].get = lambda key, default=None, **kwargs: self.config.get(key, default)


def __get_template(self, template_filename):
return self.jinja_env.get_template(template_filename, globals=self.default_context)


def __run_macro(self, template, name, *args):
value = getattr(template.module, name)(*args)
return re.sub(r'\s\s+', ' ', value)


def test_macros_load(self):
self.jinja_env.get_template('adapters.sql')


def test_macros_create_table_as(self):
template = self.__get_template('adapters.sql')

self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'),
"create table my_table as select 1")


def test_macros_create_table_as_file_format(self):
template = self.__get_template('adapters.sql')


self.config['file_format'] = 'delta'
self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'),
"create table my_table using delta as select 1")


def test_macros_create_table_as_partition(self):
template = self.__get_template('adapters.sql')


self.config['partition_by'] = 'partition_1'
self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'),
"create table my_table partitioned by (partition_1) as select 1")


def test_macros_create_table_as_partitions(self):
template = self.__get_template('adapters.sql')


self.config['partition_by'] = ['partition_1', 'partition_2']
self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'),
"create table my_table partitioned by (partition_1,partition_2) as select 1")


def test_macros_create_table_as_cluster(self):
template = self.__get_template('adapters.sql')


self.config['clustered_by'] = 'cluster_1'
self.config['buckets'] = '1'
self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'),
"create table my_table clustered by (cluster_1) into 1 buckets as select 1")


def test_macros_create_table_as_clusters(self):
template = self.__get_template('adapters.sql')


self.config['clustered_by'] = ['cluster_1', 'cluster_2']
self.config['buckets'] = '1'
self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'),
"create table my_table clustered by (cluster_1,cluster_2) into 1 buckets as select 1")


def test_macros_create_table_as_location(self):
template = self.__get_template('adapters.sql')


self.config['location'] = '/mnt/root'
self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'),
"create table my_table location '/mnt/root' as select 1")
NielsZeilemaker marked this conversation as resolved.
Show resolved Hide resolved


def test_macros_create_table_as_comment(self):
template = self.__get_template('adapters.sql')


self.config['persist_docs'] = {'relation': True}
self.default_context['model'].description = 'Description Test'
self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'),
"create table my_table comment 'Description Test' as select 1")


def test_macros_create_table_as_all(self):
template = self.__get_template('adapters.sql')

self.config['file_format'] = 'delta'
self.config['location'] = '/mnt/root'
self.config['partition_by'] = ['partition_1', 'partition_2']
self.config['clustered_by'] = ['cluster_1', 'cluster_2']
self.config['buckets'] = '1'
self.config['persist_docs'] = {'relation': True}
self.default_context['model'].description = 'Description Test'

self.assertEqual(self.__run_macro(template, 'spark__create_table_as', False, 'my_table', 'select 1'),
"create table my_table using delta partitioned by (partition_1,partition_2) clustered by (cluster_1,cluster_2) into 1 buckets location '/mnt/root' comment 'Description Test' as select 1")