Skip to content

Commit

Permalink
fix NPE while get_columns_in_relation
Browse files Browse the repository at this point in the history
  • Loading branch information
TalkWIthKeyboard committed May 14, 2022
1 parent e830b24 commit 2672057
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,14 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
None,
)

updated_relation = self._set_relation_information(cached_relation)
return self._get_spark_columns(updated_relation)
if not cached_relation:
updated_relation = self.cache.add(self._get_updated_relation(relation))
else:
updated_relation = self._set_relation_information(relation)

def _set_relation_information(
self, relation: SparkRelation
) -> SparkRelation:
"""Update the information of the relation, or return it if it already exists."""
if relation.has_information():
return relation
return self._get_spark_columns(updated_relation)

def _get_updated_relation(self, relation: BaseRelation) -> SparkRelation:
metadata = None
columns = []

Expand All @@ -282,8 +280,8 @@ def _set_relation_information(
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if (
"Table or view not found" in errmsg or
"NoSuchTableException" in errmsg
"Table or view not found" in errmsg or
"NoSuchTableException" in errmsg
):
pass
else:
Expand All @@ -294,7 +292,7 @@ def _set_relation_information(
if x.name not in self.HUDI_METADATA_COLUMNS]

provider = metadata[KEY_TABLE_PROVIDER]
new_relation = self.Relation.create(
return self.Relation.create(
database=None,
schema=relation.schema,
identifier=relation.identifier,
Expand All @@ -306,8 +304,15 @@ def _set_relation_information(
columns={x.column: x.dtype for x in columns}
)

self.cache.upsert_relation(new_relation)
return new_relation
def _set_relation_information(self, relation: SparkRelation) -> SparkRelation:
"""Update the information of the relation, or return it if it already exists."""
if relation.has_information():
return relation

updated_relation = self._get_updated_relation(relation)

self.cache.upsert_relation(updated_relation)
return updated_relation

@staticmethod
def _get_spark_columns(
Expand Down

0 comments on commit 2672057

Please sign in to comment.