diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 74845422b..2ea774a05 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -23,7 +23,8 @@ GET_COLUMNS_IN_RELATION_MACRO_NAME = 'get_columns_in_relation' LIST_SCHEMAS_MACRO_NAME = 'list_schemas' -LIST_RELATIONS_MACRO_NAME = 'list_relations_without_caching' +LIST_TABLES_MACRO_NAME = 'spark__list_tables_without_caching' +LIST_VIEWS_MACRO_NAME = 'spark__list_views_without_caching' DROP_RELATION_MACRO_NAME = 'drop_relation' FETCH_TBL_PROPERTIES_MACRO_NAME = 'fetch_tbl_properties' @@ -126,10 +127,14 @@ def add_schema_to_cache(self, schema) -> str: def list_relations_without_caching( self, schema_relation: SparkRelation ) -> List[SparkRelation]: - kwargs = {'schema_relation': schema_relation} + kwargs = {'relation': schema_relation} try: - results = self.execute_macro( - LIST_RELATIONS_MACRO_NAME, + tables = self.execute_macro( + LIST_TABLES_MACRO_NAME, + kwargs=kwargs + ) + views = self.execute_macro( + LIST_VIEWS_MACRO_NAME, kwargs=kwargs ) except dbt.exceptions.RuntimeException as e: @@ -140,29 +145,40 @@ def list_relations_without_caching( description = "Error while retrieving information about" logger.debug(f"{description} {schema_relation}: {e.msg}") return [] - + + relations = [] - for row in results: - if len(row) != 4: - raise dbt.exceptions.RuntimeException( - f'Invalid value from "show table extended ...", ' - f'got {len(row)} values, expected 4' - ) - _schema, name, _, information = row - rel_type = RelationType.View \ - if 'Type: VIEW' in information else RelationType.Table - is_delta = 'Provider: delta' in information - is_hudi = 'Provider: hudi' in information + for tbl in tables: + rel_type = ('view' if tbl['tableName'] in views.columns["viewName"].values() else 'table') relation = self.Relation.create( - schema=_schema, - identifier=name, + schema=tbl['database'], + identifier=tbl['tableName'], type=rel_type, - information=information, - is_delta=is_delta, - is_hudi=is_hudi, ) relations.append(relation) +# relations = [] +# for row in results: +# if len(row) != 4: +# raise dbt.exceptions.RuntimeException( +# f'Invalid value from "show table extended ...", ' +# f'got {len(row)} values, expected 4' +# ) +# _schema, name, _, information = row +# rel_type = RelationType.View \ +# if 'Type: VIEW' in information else RelationType.Table +# is_delta = 'Provider: delta' in information +# is_hudi = 'Provider: hudi' in information +# relation = self.Relation.create( +# schema=_schema, +# identifier=name, +# type=rel_type, +# information=information, +# is_delta=is_delta, +# is_hudi=is_hudi, +# ) +# relations.append(relation) + return relations def get_relation( diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 2542af811..e2de76b36 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -141,12 +141,20 @@ {% do return(load_result('get_columns_in_relation').table) %} {% endmacro %} -{% macro spark__list_relations_without_caching(relation) %} - {% call statement('list_relations_without_caching', fetch_result=True) -%} - show table extended in {{ relation }} like '*' +{% macro spark__list_tables_without_caching(relation) %} + {% call statement('list_tables_without_caching', fetch_result=True) -%} + show tables in {{ relation.schema }} {% endcall %} - {% do return(load_result('list_relations_without_caching').table) %} + {% do return(load_result('list_tables_without_caching').table) %} +{% endmacro %} + +{% macro spark__list_views_without_caching(relation) %} + {% call statement('list_views_without_caching', fetch_result=True) -%} + show views in {{ relation.schema }} + {% endcall %} + + {% do return(load_result('list_views_without_caching').table) %} {% endmacro %} {% macro spark__list_schemas(database) -%}