Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy load of relation metadata in get relation #1152

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 60 additions & 78 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
Union,
Type,
Tuple,
Callable,
Set,
FrozenSet,
TYPE_CHECKING,
Expand Down Expand Up @@ -171,15 +170,47 @@ def _get_relation_information(self, row: "agate.Row") -> RelationInfo:

return _schema, name, information

def _get_relation_information_using_describe(self, row: "agate.Row") -> RelationInfo:
"""Relation info fetched using SHOW TABLES and an auxiliary DESCRIBE statement"""
def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]:
"""Distinct Spark compute engines may not support the same SQL featureset. Thus, we must
try different methods to fetch relation information."""
try:
_schema, name, _ = row
except ValueError:
raise DbtRuntimeError(
f'Invalid value from "show tables ...", got {len(row)} values, expected 3'
relations = []
kwargs = {"schema_relation": schema_relation}
# Iceberg behavior: 3-row result of relations obtained
show_table_rows = self.execute_macro(
LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs
)
for row in show_table_rows:
_schema, name, _ = row
information = ""

rel_type: RelationType = (
RelationType.View if "Type: VIEW" in information else RelationType.Table
)
is_delta: bool = "Provider: delta" in information
is_hudi: bool = "Provider: hudi" in information
is_iceberg: bool = "Provider: iceberg" in information

relation: BaseRelation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
relations.append(relation)
return relations
except DbtRuntimeError as e:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []

def _get_relation_information_using_describe(self, relation: BaseRelation) -> RelationInfo:
"""Relation info fetched using SHOW TABLES and an auxiliary DESCRIBE statement"""
_schema = relation.schema
name = relation.identifier
table_name = f"{_schema}.{name}"
try:
table_results = self.execute_macro(
Expand All @@ -194,84 +225,35 @@ def _get_relation_information_using_describe(self, row: "agate.Row") -> Relation
info_type, info_value, _ = info_row
if not info_type.startswith("#"):
information += f"{info_type}: {info_value}\n"

return _schema, name, information

def _build_spark_relation_list(
self,
row_list: "agate.Table",
relation_info_func: Callable[["agate.Row"], RelationInfo],
) -> List[BaseRelation]:
"""Aggregate relations with format metadata included."""
relations = []
for row in row_list:
_schema, name, information = relation_info_func(row)

rel_type: RelationType = (
RelationType.View if "Type: VIEW" in information else RelationType.Table
)
is_delta: bool = "Provider: delta" in information
is_hudi: bool = "Provider: hudi" in information
is_iceberg: bool = "Provider: iceberg" in information

relation: BaseRelation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
relations.append(relation)

return relations

def list_relations_without_caching(self, schema_relation: BaseRelation) -> List[BaseRelation]:
"""Distinct Spark compute engines may not support the same SQL featureset. Thus, we must
try different methods to fetch relation information."""

kwargs = {"schema_relation": schema_relation}

try:
# Default compute engine behavior: show tables extended
show_table_extended_rows = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
return self._build_spark_relation_list(
row_list=show_table_extended_rows,
relation_info_func=self._get_relation_information,
)
except DbtRuntimeError as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
return []
# Iceberg compute engine behavior: show table
elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg:
# this happens with spark-iceberg with v2 iceberg tables
# https://issues.apache.org/jira/browse/SPARK-33393
try:
# Iceberg behavior: 3-row result of relations obtained
show_table_rows = self.execute_macro(
LIST_RELATIONS_SHOW_TABLES_MACRO_NAME, kwargs=kwargs
)
return self._build_spark_relation_list(
row_list=show_table_rows,
relation_info_func=self._get_relation_information_using_describe,
)
except DbtRuntimeError as e:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []
else:
logger.debug(
f"Error while retrieving information about {schema_relation}: {errmsg}"
)
return []
def set_relation_information(self, relation: BaseRelation) -> BaseRelation:
if relation.information:
return relation
_schema, name, information = self._get_relation_information_using_describe(relation)
rel_type: RelationType = (
RelationType.View if "Type: VIEW" in information else RelationType.Table
)
is_delta: bool = "Provider: delta" in information
is_hudi: bool = "Provider: hudi" in information
is_iceberg: bool = "Provider: iceberg" in information
updated_relation: BaseRelation = self.Relation.create(
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
return updated_relation

def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]:
if not self.Relation.get_default_include_policy().database:
database = None # type: ignore

return super().get_relation(database, schema, identifier)
relation = super().get_relation(database, schema, identifier)
return self.set_relation_information(relation) if relation else None

def parse_describe_extended(
self, relation: BaseRelation, raw_rows: AttrDict
Expand Down
Loading