Skip to content

Commit

Permalink
[Backport] #204, #207 (#209)
Browse files Browse the repository at this point in the history
* fixed get_columns_in_relation for open source delta table (#207)

* fixed get_columns_in_relation for open source delta table

* fixed E501 linting error and added change log

* fix issue parsing structs (#204)

* fix issue parsing structs

* include contributor in changelog

* better error explanation

Co-authored-by: Jeremy Cohen <jeremy@fishtownanalytics.com>

Co-authored-by: Hariharan Banukumar <hariharan.banukumar@gmail.com>
Co-authored-by: Sergio <ingscc00@gmail.com>
  • Loading branch information
3 people authored Aug 23, 2021
1 parent e46e4e2 commit c74b4f3
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 8 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
## dbt-spark 0.21.0 (Release TBD)

### Fixes
- Enhanced get_columns_in_relation method to handle a bug in open source deltalake which doesnt return schema details in `show table extended in databasename like '*'` query output. This impacts dbt snapshots if file format is open source deltalake ([#207](https://github.com/dbt-labs/dbt-spark/pull/207))
- Parse properly columns when there are struct fields to avoid considering inner fields: Issue ([#202](https://github.com/dbt-labs/dbt-spark/issues/202))

### Contributors
- [@harryharanb](https://github.com/harryharanb) ([#207](https://github.com/dbt-labs/dbt-spark/pull/207))
- [@SCouto](https://github.com/Scouto) ([#204](https://github.com/dbt-labs/dbt-spark/pull/204))

## dbt-spark 0.21.0b2 (August 20, 2021)

### Fixes
- Add pyodbc import error message to dbt.exceptions.RuntimeException to get more detailed information when running `dbt debug` ([#192](https://github.com/dbt-labs/dbt-spark/pull/192))
- Add support for ODBC Server Side Parameters, allowing options that need to be set with the `SET` statement to be used ([#201](https://github.com/dbt-labs/dbt-spark/pull/201))
Expand All @@ -22,6 +32,7 @@
### Contributors
- [@ali-tny](https://github.com/ali-tny) ([#197](https://github.com/fishtown-analytics/dbt-spark/pull/197))


## dbt-spark 0.20.0 (July 12, 2021)

## dbt-spark 0.20.0rc2 (July 7, 2021)
Expand Down
13 changes: 9 additions & 4 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class SparkAdapter(SQLAdapter):
'stats:rows:include',
)
INFORMATION_COLUMNS_REGEX = re.compile(
r"\|-- (.*): (.*) \(nullable = (.*)\b", re.MULTILINE)
r"^ \|-- (.*): (.*) \(nullable = (.*)\b", re.MULTILINE)
INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE)
INFORMATION_STATISTICS_REGEX = re.compile(
r"^Statistics: (.*)$", re.MULTILINE)
Expand Down Expand Up @@ -212,11 +212,16 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
for cached_relation in cached_relations
if str(cached_relation) == str(relation)),
None)
if cached_relation is None or cached_relation.information is None:
columns = []
if cached_relation and cached_relation.information:
columns = self.parse_columns_from_information(cached_relation)
if not columns:
# in open source delta 'show table extended' query output doesnt
# return relation's schema. if columns are empty from cache,
# use get_columns_in_relation spark macro
# which would execute 'describe extended tablename' query
rows: List[agate.Row] = super().get_columns_in_relation(relation)
columns = self.parse_describe_extended(relation, rows)
else:
columns = self.parse_columns_from_information(cached_relation)
return columns

def parse_columns_from_information(
Expand Down
86 changes: 82 additions & 4 deletions test/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def test_parse_relation(self):
('col1', 'decimal(22,0)'),
('col2', 'string',),
('dt', 'date'),
('struct_col', 'struct<struct_inner_col:string>'),
('# Partition Information', 'data_type'),
('# col_name', 'data_type'),
('dt', 'date'),
Expand All @@ -299,7 +300,7 @@ def test_parse_relation(self):
config = self._get_target_http(self.project_cfg)
rows = SparkAdapter(config).parse_describe_extended(
relation, input_cols)
self.assertEqual(len(rows), 3)
self.assertEqual(len(rows), 4)
self.assertEqual(rows[0].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
Expand Down Expand Up @@ -342,6 +343,20 @@ def test_parse_relation(self):
'char_size': None
})

self.assertEqual(rows[3].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'struct_col',
'column_index': 3,
'dtype': 'struct<struct_inner_col:string>',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None
})

def test_parse_relation_with_integer_owner(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.Table
Expand Down Expand Up @@ -507,6 +522,8 @@ def test_parse_columns_from_information_with_table_type_and_delta_provider(self)
" |-- col1: decimal(22,0) (nullable = true)\n"
" |-- col2: string (nullable = true)\n"
" |-- dt: date (nullable = true)\n"
" |-- struct_col: struct (nullable = true)\n"
" | |-- struct_inner_col: string (nullable = true)\n"
)
relation = SparkRelation.create(
schema='default_schema',
Expand All @@ -518,7 +535,7 @@ def test_parse_columns_from_information_with_table_type_and_delta_provider(self)
config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(
relation)
self.assertEqual(len(columns), 3)
self.assertEqual(len(columns), 4)
self.assertEqual(columns[0].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
Expand All @@ -538,6 +555,25 @@ def test_parse_columns_from_information_with_table_type_and_delta_provider(self)
'stats:bytes:value': 123456789,
})

self.assertEqual(columns[3].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'struct_col',
'column_index': 3,
'dtype': 'struct',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None,

'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 123456789,
})

def test_parse_columns_from_information_with_view_type(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.View
Expand Down Expand Up @@ -571,6 +607,8 @@ def test_parse_columns_from_information_with_view_type(self):
" |-- col1: decimal(22,0) (nullable = true)\n"
" |-- col2: string (nullable = true)\n"
" |-- dt: date (nullable = true)\n"
" |-- struct_col: struct (nullable = true)\n"
" | |-- struct_inner_col: string (nullable = true)\n"
)
relation = SparkRelation.create(
schema='default_schema',
Expand All @@ -582,7 +620,7 @@ def test_parse_columns_from_information_with_view_type(self):
config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(
relation)
self.assertEqual(len(columns), 3)
self.assertEqual(len(columns), 4)
self.assertEqual(columns[1].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
Expand All @@ -597,6 +635,20 @@ def test_parse_columns_from_information_with_view_type(self):
'char_size': None
})

self.assertEqual(columns[3].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'struct_col',
'column_index': 3,
'dtype': 'struct',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None
})

def test_parse_columns_from_information_with_table_type_and_parquet_provider(self):
self.maxDiff = None
rel_type = SparkRelation.get_relation_type.Table
Expand All @@ -619,6 +671,8 @@ def test_parse_columns_from_information_with_table_type_and_parquet_provider(sel
" |-- col1: decimal(22,0) (nullable = true)\n"
" |-- col2: string (nullable = true)\n"
" |-- dt: date (nullable = true)\n"
" |-- struct_col: struct (nullable = true)\n"
" | |-- struct_inner_col: string (nullable = true)\n"
)
relation = SparkRelation.create(
schema='default_schema',
Expand All @@ -630,7 +684,7 @@ def test_parse_columns_from_information_with_table_type_and_parquet_provider(sel
config = self._get_target_http(self.project_cfg)
columns = SparkAdapter(config).parse_columns_from_information(
relation)
self.assertEqual(len(columns), 3)
self.assertEqual(len(columns), 4)
self.assertEqual(columns[2].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
Expand All @@ -655,3 +709,27 @@ def test_parse_columns_from_information_with_table_type_and_parquet_provider(sel
'stats:rows:value': 12345678
})

self.assertEqual(columns[3].to_column_dict(omit_none=False), {
'table_database': None,
'table_schema': relation.schema,
'table_name': relation.name,
'table_type': rel_type,
'table_owner': 'root',
'column': 'struct_col',
'column_index': 3,
'dtype': 'struct',
'numeric_scale': None,
'numeric_precision': None,
'char_size': None,

'stats:bytes:description': '',
'stats:bytes:include': True,
'stats:bytes:label': 'bytes',
'stats:bytes:value': 1234567890,

'stats:rows:description': '',
'stats:rows:include': True,
'stats:rows:label': 'rows',
'stats:rows:value': 12345678
})

0 comments on commit c74b4f3

Please sign in to comment.