Skip to content

Commit

Permalink
Merge pull request #1 from ryanClift-sd/schema_query_adjustment_show_…
Browse files Browse the repository at this point in the history
…tables_and_views

Changes to the metadata query gathering for spark
  • Loading branch information
ryanClift-sd authored Mar 21, 2022
2 parents d7f1d38 + f827413 commit baebd6c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
58 changes: 37 additions & 21 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

GET_COLUMNS_IN_RELATION_MACRO_NAME = 'get_columns_in_relation'
LIST_SCHEMAS_MACRO_NAME = 'list_schemas'
LIST_RELATIONS_MACRO_NAME = 'list_relations_without_caching'
LIST_TABLES_MACRO_NAME = 'spark__list_tables_without_caching'
LIST_VIEWS_MACRO_NAME = 'spark__list_views_without_caching'
DROP_RELATION_MACRO_NAME = 'drop_relation'
FETCH_TBL_PROPERTIES_MACRO_NAME = 'fetch_tbl_properties'

Expand Down Expand Up @@ -126,10 +127,14 @@ def add_schema_to_cache(self, schema) -> str:
def list_relations_without_caching(
self, schema_relation: SparkRelation
) -> List[SparkRelation]:
kwargs = {'schema_relation': schema_relation}
kwargs = {'relation': schema_relation}
try:
results = self.execute_macro(
LIST_RELATIONS_MACRO_NAME,
tables = self.execute_macro(
LIST_TABLES_MACRO_NAME,
kwargs=kwargs
)
views = self.execute_macro(
LIST_VIEWS_MACRO_NAME,
kwargs=kwargs
)
except dbt.exceptions.RuntimeException as e:
Expand All @@ -140,29 +145,40 @@ def list_relations_without_caching(
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []



relations = []
for row in results:
if len(row) != 4:
raise dbt.exceptions.RuntimeException(
f'Invalid value from "show table extended ...", '
f'got {len(row)} values, expected 4'
)
_schema, name, _, information = row
rel_type = RelationType.View \
if 'Type: VIEW' in information else RelationType.Table
is_delta = 'Provider: delta' in information
is_hudi = 'Provider: hudi' in information
for tbl in tables:
rel_type = ('view' if tbl['tableName'] in views.columns["viewName"].values() else 'table')
relation = self.Relation.create(
schema=_schema,
identifier=name,
schema=tbl['database'],
identifier=tbl['tableName'],
type=rel_type,
information=information,
is_delta=is_delta,
is_hudi=is_hudi,
)
relations.append(relation)

# relations = []
# for row in results:
# if len(row) != 4:
# raise dbt.exceptions.RuntimeException(
# f'Invalid value from "show table extended ...", '
# f'got {len(row)} values, expected 4'
# )
# _schema, name, _, information = row
# rel_type = RelationType.View \
# if 'Type: VIEW' in information else RelationType.Table
# is_delta = 'Provider: delta' in information
# is_hudi = 'Provider: hudi' in information
# relation = self.Relation.create(
# schema=_schema,
# identifier=name,
# type=rel_type,
# information=information,
# is_delta=is_delta,
# is_hudi=is_hudi,
# )
# relations.append(relation)

return relations

def get_relation(
Expand Down
16 changes: 12 additions & 4 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,20 @@
{% do return(load_result('get_columns_in_relation').table) %}
{% endmacro %}

{% macro spark__list_relations_without_caching(relation) %}
{% call statement('list_relations_without_caching', fetch_result=True) -%}
show table extended in {{ relation }} like '*'
{% macro spark__list_tables_without_caching(relation) %}
{% call statement('list_tables_without_caching', fetch_result=True) -%}
show tables in {{ relation.schema }}
{% endcall %}

{% do return(load_result('list_relations_without_caching').table) %}
{% do return(load_result('list_tables_without_caching').table) %}
{% endmacro %}

{% macro spark__list_views_without_caching(relation) %}
{% call statement('list_views_without_caching', fetch_result=True) -%}
show views in {{ relation.schema }}
{% endcall %}

{% do return(load_result('list_views_without_caching').table) %}
{% endmacro %}

{% macro spark__list_schemas(database) -%}
Expand Down

0 comments on commit baebd6c

Please sign in to comment.