diff --git a/dbt/include/athena/macros/adapters.sql b/dbt/include/athena/macros/adapters.sql deleted file mode 100644 index a02bb339..00000000 --- a/dbt/include/athena/macros/adapters.sql +++ /dev/null @@ -1,125 +0,0 @@ -{% macro set_table_classification(relation, default_value) -%} - {%- set format = config.get('format', default=default_value) -%} - - {% call statement('set_table_classification', auto_begin=False) -%} - alter table {{ relation }} set tblproperties ('classification' = '{{ format }}') - {%- endcall %} -{%- endmacro %} - -{% macro athena__create_table_as(temporary, relation, sql) -%} - {%- set external_location = config.get('external_location', default=none) -%} - {%- set partitioned_by = config.get('partitioned_by', default=none) -%} - {%- set bucketed_by = config.get('bucketed_by', default=none) -%} - {%- set bucket_count = config.get('bucket_count', default=none) -%} - {%- set field_delimiter = config.get('field_delimiter', default=none) -%} - {%- set format = config.get('format', default='parquet') -%} - - create table - {{ relation }} - - with ( - {%- if external_location is not none and not temporary %} - external_location='{{ external_location }}', - {%- endif %} - {%- if partitioned_by is not none %} - partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }}, - {%- endif %} - {%- if bucketed_by is not none %} - bucketed_by=ARRAY{{ bucketed_by | tojson | replace('\"', '\'') }}, - {%- endif %} - {%- if bucket_count is not none %} - bucket_count={{ bucket_count }}, - {%- endif %} - {%- if field_delimiter is not none %} - field_delimiter='{{ field_delimiter }}', - {%- endif %} - format='{{ format }}' - ) - as - {{ sql }} -{% endmacro %} - -{% macro athena__create_view_as(relation, sql) -%} - create or replace view - {{ relation }} - as - {{ sql }} -{% endmacro %} - -{% macro athena__list_schemas(database) -%} - {% call statement('list_schemas', fetch_result=True) %} - select - distinct schema_name - - from {{ information_schema_name(database) }}.schemata - {% endcall %} - {{ return(load_result('list_schemas').table) }} -{% endmacro %} - -{% macro athena__list_relations_without_caching(schema_relation) %} - {% call statement('list_relations_without_caching', fetch_result=True) -%} - WITH views AS ( - select - table_catalog as database, - table_name as name, - table_schema as schema - from {{ schema_relation.information_schema() }}.views - where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}') - ), tables AS ( - select - table_catalog as database, - table_name as name, - table_schema as schema - - from {{ schema_relation.information_schema() }}.tables - where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}') - - -- Views appear in both `tables` and `views`, so excluding them from tables - EXCEPT - - select * from views - ) - select views.*, 'view' AS table_type FROM views - UNION ALL - select tables.*, 'table' AS table_type FROM tables - {% endcall %} - {% do return(load_result('list_relations_without_caching').table) %} -{% endmacro %} - -{% macro athena__get_columns_in_relation(relation) -%} - {% call statement('get_columns_in_relation', fetch_result=True) %} - - select - column_name, - data_type, - null as character_maximum_length, - null as numeric_precision, - null as numeric_scale - - from {{ relation.information_schema('columns') }} - where LOWER(table_name) = LOWER('{{ relation.identifier }}') - {% if relation.schema %} - and LOWER(table_schema) = LOWER('{{ relation.schema }}') - {% endif %} - order by ordinal_position - - {% endcall %} - - {% set table = load_result('get_columns_in_relation').table %} - {% do return(sql_convert_columns_in_relation(table)) %} -{% endmacro %} - -{% macro athena__drop_relation(relation) -%} - {% if config.get('incremental_strategy') == 'insert_overwrite' %} - {%- do adapter.clean_up_table(relation.schema, relation.table) -%} - {% endif %} - {% call statement('drop_relation', auto_begin=False) -%} - drop {{ relation.type }} if exists {{ relation }} - {%- endcall %} -{% endmacro %} - -{% macro athena__current_timestamp() -%} - -- pyathena converts time zoned timestamps to strings so lets avoid them - -- now() - cast(now() as timestamp) -{%- endmacro %} diff --git a/dbt/include/athena/macros/adapters/columns.sql b/dbt/include/athena/macros/adapters/columns.sql new file mode 100644 index 00000000..2c62f4d7 --- /dev/null +++ b/dbt/include/athena/macros/adapters/columns.sql @@ -0,0 +1,22 @@ +{% macro athena__get_columns_in_relation(relation) -%} + {% call statement('get_columns_in_relation', fetch_result=True) %} + + select + column_name, + data_type, + null as character_maximum_length, + null as numeric_precision, + null as numeric_scale + + from {{ relation.information_schema('columns') }} + where LOWER(table_name) = LOWER('{{ relation.identifier }}') + {% if relation.schema %} + and LOWER(table_schema) = LOWER('{{ relation.schema }}') + {% endif %} + order by ordinal_position + + {% endcall %} + + {% set table = load_result('get_columns_in_relation').table %} + {% do return(sql_convert_columns_in_relation(table)) %} +{% endmacro %} diff --git a/dbt/include/athena/macros/adapters/freshness.sql b/dbt/include/athena/macros/adapters/freshness.sql new file mode 100644 index 00000000..b980d220 --- /dev/null +++ b/dbt/include/athena/macros/adapters/freshness.sql @@ -0,0 +1,5 @@ +{% macro athena__current_timestamp() -%} + -- pyathena converts time zoned timestamps to strings so lets avoid them + -- now() + cast(now() as timestamp) +{%- endmacro %} diff --git a/dbt/include/athena/macros/catalog.sql b/dbt/include/athena/macros/adapters/metadata.sql similarity index 68% rename from dbt/include/athena/macros/catalog.sql rename to dbt/include/athena/macros/adapters/metadata.sql index 73b96d8b..a6e9f1c9 100644 --- a/dbt/include/athena/macros/catalog.sql +++ b/dbt/include/athena/macros/adapters/metadata.sql @@ -1,4 +1,3 @@ - {% macro athena__get_catalog(information_schema, schemas) -%} {%- set query -%} select * from ( @@ -77,3 +76,45 @@ {{ return(run_query(query)) }} {%- endmacro %} + + +{% macro athena__list_schemas(database) -%} + {% call statement('list_schemas', fetch_result=True) %} + select + distinct schema_name + + from {{ information_schema_name(database) }}.schemata + {% endcall %} + {{ return(load_result('list_schemas').table) }} +{% endmacro %} + + +{% macro athena__list_relations_without_caching(schema_relation) %} + {% call statement('list_relations_without_caching', fetch_result=True) -%} + WITH views AS ( + select + table_catalog as database, + table_name as name, + table_schema as schema + from {{ schema_relation.information_schema() }}.views + where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}') + ), tables AS ( + select + table_catalog as database, + table_name as name, + table_schema as schema + + from {{ schema_relation.information_schema() }}.tables + where LOWER(table_schema) = LOWER('{{ schema_relation.schema }}') + + -- Views appear in both `tables` and `views`, so excluding them from tables + EXCEPT + + select * from views + ) + select views.*, 'view' AS table_type FROM views + UNION ALL + select tables.*, 'table' AS table_type FROM tables + {% endcall %} + {% do return(load_result('list_relations_without_caching').table) %} +{% endmacro %} diff --git a/dbt/include/athena/macros/adapters/relation.sql b/dbt/include/athena/macros/adapters/relation.sql new file mode 100644 index 00000000..7e73aca7 --- /dev/null +++ b/dbt/include/athena/macros/adapters/relation.sql @@ -0,0 +1,8 @@ +{% macro athena__drop_relation(relation) -%} + {% if config.get('incremental_strategy') == 'insert_overwrite' %} + {%- do adapter.clean_up_table(relation.schema, relation.table) -%} + {% endif %} + {% call statement('drop_relation', auto_begin=False) -%} + drop {{ relation.type }} if exists {{ relation }} + {%- endcall %} +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/helpers.sql b/dbt/include/athena/macros/materializations/models/helpers.sql new file mode 100644 index 00000000..74148a9d --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/helpers.sql @@ -0,0 +1,7 @@ +{% macro set_table_classification(relation, default_value) -%} + {%- set format = config.get('format', default=default_value) -%} + + {% call statement('set_table_classification', auto_begin=False) -%} + alter table {{ relation }} set tblproperties ('classification' = '{{ format }}') + {%- endcall %} +{%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/helpers.sql b/dbt/include/athena/macros/materializations/models/incremental/helpers.sql new file mode 100644 index 00000000..3f6b1f59 --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/incremental/helpers.sql @@ -0,0 +1,54 @@ +{% macro validate_get_incremental_strategy(raw_strategy) %} + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + Expected one of: 'append', 'insert_overwrite' + {%- endset %} + + {% if raw_strategy not in ['append', 'insert_overwrite'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {% endif %} + + {% do return(raw_strategy) %} +{% endmacro %} + +{% macro incremental_insert(tmp_relation, target_relation, statement_name="main") %} + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + + insert into {{ target_relation }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ tmp_relation }} + ); +{%- endmacro %} + +{% macro delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %} + {%- set partitioned_keys = partitioned_by | tojson | replace('\"', '') | replace('[', '') | replace(']', '') -%} + {% call statement('get_partitions', fetch_result=True) %} + select distinct {{partitioned_keys}} from {{ tmp_relation }}; + {% endcall %} + {%- set table = load_result('get_partitions').table -%} + {%- set rows = table.rows -%} + {%- set partitions = [] -%} + {%- for row in rows -%} + {%- set single_partition = [] -%} + {%- for col in row -%} + {%- set column_type = adapter.convert_type(table, loop.index0) -%} + {%- if column_type == 'integer' -%} + {%- set value = col|string -%} + {%- elif column_type == 'string' -%} + {%- set value = "'" + col + "'" -%} + {%- elif column_type == 'date' -%} + {%- set value = "'" + col|string + "'" -%} + {%- else -%} + {%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%} + {%- endif -%} + {%- do single_partition.append(partitioned_by[loop.index0] + '=' + value) -%} + {%- endfor -%} + {%- set single_partition_expression = single_partition | join(' and ') -%} + {%- do partitions.append('(' + single_partition_expression + ')') -%} + {%- endfor -%} + {%- for i in range(partitions | length) %} + {%- do adapter.clean_up_partitions(target_relation.schema, target_relation.table, partitions[i]) -%} + {%- endfor -%} +{%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/incremental.sql b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql similarity index 53% rename from dbt/include/athena/macros/materializations/incremental.sql rename to dbt/include/athena/macros/materializations/models/incremental/incremental.sql index 8c82d1e9..f20fee5e 100644 --- a/dbt/include/athena/macros/materializations/incremental.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql @@ -1,58 +1,3 @@ -{% macro validate_get_incremental_strategy(raw_strategy) %} - {% set invalid_strategy_msg -%} - Invalid incremental strategy provided: {{ raw_strategy }} - Expected one of: 'append', 'insert_overwrite' - {%- endset %} - - {% if raw_strategy not in ['append', 'insert_overwrite'] %} - {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} - {% endif %} - - {% do return(raw_strategy) %} -{% endmacro %} - -{% macro incremental_insert(tmp_relation, target_relation, statement_name="main") %} - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - - insert into {{ target_relation }} ({{ dest_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ tmp_relation }} - ); -{%- endmacro %} - -{% macro delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %} - {%- set partitioned_keys = partitioned_by | tojson | replace('\"', '') | replace('[', '') | replace(']', '') -%} - {% call statement('get_partitions', fetch_result=True) %} - select distinct {{partitioned_keys}} from {{ tmp_relation }}; - {% endcall %} - {%- set table = load_result('get_partitions').table -%} - {%- set rows = table.rows -%} - {%- set partitions = [] -%} - {%- for row in rows -%} - {%- set single_partition = [] -%} - {%- for col in row -%} - {%- set column_type = adapter.convert_type(table, loop.index0) -%} - {%- if column_type == 'integer' -%} - {%- set value = col|string -%} - {%- elif column_type == 'string' -%} - {%- set value = "'" + col + "'" -%} - {%- elif column_type == 'date' -%} - {%- set value = "'" + col|string + "'" -%} - {%- else -%} - {%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%} - {%- endif -%} - {%- do single_partition.append(partitioned_by[loop.index0] + '=' + value) -%} - {%- endfor -%} - {%- set single_partition_expression = single_partition | join(' and ') -%} - {%- do partitions.append('(' + single_partition_expression + ')') -%} - {%- endfor -%} - {%- for i in range(partitions | length) %} - {%- do adapter.clean_up_partitions(target_relation.schema, target_relation.table, partitions[i]) -%} - {%- endfor -%} -{%- endmacro %} - {% materialization incremental, adapter='athena' -%} {% set unique_key = config.get('unique_key') %} diff --git a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql new file mode 100644 index 00000000..504ba148 --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql @@ -0,0 +1,32 @@ +{% macro athena__create_table_as(temporary, relation, sql) -%} + {%- set external_location = config.get('external_location', default=none) -%} + {%- set partitioned_by = config.get('partitioned_by', default=none) -%} + {%- set bucketed_by = config.get('bucketed_by', default=none) -%} + {%- set bucket_count = config.get('bucket_count', default=none) -%} + {%- set field_delimiter = config.get('field_delimiter', default=none) -%} + {%- set format = config.get('format', default='parquet') -%} + + create table + {{ relation }} + + with ( + {%- if external_location is not none and not temporary %} + external_location='{{ external_location }}', + {%- endif %} + {%- if partitioned_by is not none %} + partitioned_by=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }}, + {%- endif %} + {%- if bucketed_by is not none %} + bucketed_by=ARRAY{{ bucketed_by | tojson | replace('\"', '\'') }}, + {%- endif %} + {%- if bucket_count is not none %} + bucket_count={{ bucket_count }}, + {%- endif %} + {%- if field_delimiter is not none %} + field_delimiter='{{ field_delimiter }}', + {%- endif %} + format='{{ format }}' + ) + as + {{ sql }} +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/table.sql b/dbt/include/athena/macros/materializations/models/table/table.sql similarity index 99% rename from dbt/include/athena/macros/materializations/table.sql rename to dbt/include/athena/macros/materializations/models/table/table.sql index b0e1e2c3..ad425f00 100644 --- a/dbt/include/athena/macros/materializations/table.sql +++ b/dbt/include/athena/macros/materializations/models/table/table.sql @@ -1,4 +1,3 @@ - {% materialization table, adapter='athena' -%} {%- set identifier = model['alias'] -%} diff --git a/dbt/include/athena/macros/materializations/view.sql b/dbt/include/athena/macros/materializations/models/view/create_or_replace_view.sql similarity index 83% rename from dbt/include/athena/macros/materializations/view.sql rename to dbt/include/athena/macros/materializations/models/view/create_or_replace_view.sql index bcc0d6d0..bf7c0699 100644 --- a/dbt/include/athena/macros/materializations/view.sql +++ b/dbt/include/athena/macros/materializations/models/view/create_or_replace_view.sql @@ -38,13 +38,3 @@ {{ return({'relations': [target_relation]}) }} {% endmacro %} - - -{% materialization view, adapter='athena' -%} - {% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %} - - {% set target_relation = this.incorporate(type='view') %} - {% do persist_docs(target_relation, model) %} - - {% do return(to_return) %} -{%- endmaterialization %} diff --git a/dbt/include/athena/macros/materializations/models/view/create_view_as.sql b/dbt/include/athena/macros/materializations/models/view/create_view_as.sql new file mode 100644 index 00000000..5871a739 --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/view/create_view_as.sql @@ -0,0 +1,6 @@ +{% macro athena__create_view_as(relation, sql) -%} + create or replace view + {{ relation }} + as + {{ sql }} +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/view/view.sql b/dbt/include/athena/macros/materializations/models/view/view.sql new file mode 100644 index 00000000..3b1a4a89 --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/view/view.sql @@ -0,0 +1,8 @@ +{% materialization view, adapter='athena' -%} + {% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %} + + {% set target_relation = this.incorporate(type='view') %} + {% do persist_docs(target_relation, model) %} + + {% do return(to_return) %} +{%- endmaterialization %} diff --git a/dbt/include/athena/macros/materializations/seed.sql b/dbt/include/athena/macros/materializations/seeds/helpers.sql similarity index 99% rename from dbt/include/athena/macros/materializations/seed.sql rename to dbt/include/athena/macros/materializations/seeds/helpers.sql index c1cb4afe..bbc0e0e0 100644 --- a/dbt/include/athena/macros/materializations/seed.sql +++ b/dbt/include/athena/macros/materializations/seeds/helpers.sql @@ -1,4 +1,3 @@ - {% macro default__reset_csv_table(model, full_refresh, old_relation, agate_table) %} {% set sql = "" %} -- No truncate in Athena so always drop CSV table and recreate diff --git a/dbt/include/athena/macros/materializations/snapshot.sql b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql similarity index 99% rename from dbt/include/athena/macros/materializations/snapshot.sql rename to dbt/include/athena/macros/materializations/snapshots/snapshot.sql index f1464d12..bd384ab4 100644 --- a/dbt/include/athena/macros/materializations/snapshot.sql +++ b/dbt/include/athena/macros/materializations/snapshots/snapshot.sql @@ -1,4 +1,3 @@ - {% materialization snapshot, adapter='athena' -%} {{ exceptions.raise_not_implemented( 'snapshot materialization not implemented for '+adapter.type())