From c74b4f3d87d871c3128caae061e7c2b16376f896 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Mon, 23 Aug 2021 14:58:43 +0200 Subject: [PATCH] [Backport] #204, #207 (#209) * fixed get_columns_in_relation for open source delta table (#207) * fixed get_columns_in_relation for open source delta table * fixed E501 linting error and added change log * fix issue parsing structs (#204) * fix issue parsing structs * include contributor in changelog * better error explanation Co-authored-by: Jeremy Cohen Co-authored-by: Hariharan Banukumar Co-authored-by: Sergio --- CHANGELOG.md | 11 +++++ dbt/adapters/spark/impl.py | 13 ++++-- test/unit/test_adapter.py | 86 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 102 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 011a8f2ef..c6f8c272f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ ## dbt-spark 0.21.0 (Release TBD) +### Fixes +- Enhanced get_columns_in_relation method to handle a bug in open source deltalake which doesnt return schema details in `show table extended in databasename like '*'` query output. This impacts dbt snapshots if file format is open source deltalake ([#207](https://github.com/dbt-labs/dbt-spark/pull/207)) +- Parse properly columns when there are struct fields to avoid considering inner fields: Issue ([#202](https://github.com/dbt-labs/dbt-spark/issues/202)) + +### Contributors +- [@harryharanb](https://github.com/harryharanb) ([#207](https://github.com/dbt-labs/dbt-spark/pull/207)) +- [@SCouto](https://github.com/Scouto) ([#204](https://github.com/dbt-labs/dbt-spark/pull/204)) + +## dbt-spark 0.21.0b2 (August 20, 2021) + ### Fixes - Add pyodbc import error message to dbt.exceptions.RuntimeException to get more detailed information when running `dbt debug` ([#192](https://github.com/dbt-labs/dbt-spark/pull/192)) - Add support for ODBC Server Side Parameters, allowing options that need to be set with the `SET` statement to be used ([#201](https://github.com/dbt-labs/dbt-spark/pull/201)) @@ -22,6 +32,7 @@ ### Contributors - [@ali-tny](https://github.com/ali-tny) ([#197](https://github.com/fishtown-analytics/dbt-spark/pull/197)) + ## dbt-spark 0.20.0 (July 12, 2021) ## dbt-spark 0.20.0rc2 (July 7, 2021) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index f8e72449a..6acbcd446 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -64,7 +64,7 @@ class SparkAdapter(SQLAdapter): 'stats:rows:include', ) INFORMATION_COLUMNS_REGEX = re.compile( - r"\|-- (.*): (.*) \(nullable = (.*)\b", re.MULTILINE) + r"^ \|-- (.*): (.*) \(nullable = (.*)\b", re.MULTILINE) INFORMATION_OWNER_REGEX = re.compile(r"^Owner: (.*)$", re.MULTILINE) INFORMATION_STATISTICS_REGEX = re.compile( r"^Statistics: (.*)$", re.MULTILINE) @@ -212,11 +212,16 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: for cached_relation in cached_relations if str(cached_relation) == str(relation)), None) - if cached_relation is None or cached_relation.information is None: + columns = [] + if cached_relation and cached_relation.information: + columns = self.parse_columns_from_information(cached_relation) + if not columns: + # in open source delta 'show table extended' query output doesnt + # return relation's schema. if columns are empty from cache, + # use get_columns_in_relation spark macro + # which would execute 'describe extended tablename' query rows: List[agate.Row] = super().get_columns_in_relation(relation) columns = self.parse_describe_extended(relation, rows) - else: - columns = self.parse_columns_from_information(cached_relation) return columns def parse_columns_from_information( diff --git a/test/unit/test_adapter.py b/test/unit/test_adapter.py index ddfbeddb2..f87a89b2b 100644 --- a/test/unit/test_adapter.py +++ b/test/unit/test_adapter.py @@ -275,6 +275,7 @@ def test_parse_relation(self): ('col1', 'decimal(22,0)'), ('col2', 'string',), ('dt', 'date'), + ('struct_col', 'struct'), ('# Partition Information', 'data_type'), ('# col_name', 'data_type'), ('dt', 'date'), @@ -299,7 +300,7 @@ def test_parse_relation(self): config = self._get_target_http(self.project_cfg) rows = SparkAdapter(config).parse_describe_extended( relation, input_cols) - self.assertEqual(len(rows), 3) + self.assertEqual(len(rows), 4) self.assertEqual(rows[0].to_column_dict(omit_none=False), { 'table_database': None, 'table_schema': relation.schema, @@ -342,6 +343,20 @@ def test_parse_relation(self): 'char_size': None }) + self.assertEqual(rows[3].to_column_dict(omit_none=False), { + 'table_database': None, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_owner': 'root', + 'column': 'struct_col', + 'column_index': 3, + 'dtype': 'struct', + 'numeric_scale': None, + 'numeric_precision': None, + 'char_size': None + }) + def test_parse_relation_with_integer_owner(self): self.maxDiff = None rel_type = SparkRelation.get_relation_type.Table @@ -507,6 +522,8 @@ def test_parse_columns_from_information_with_table_type_and_delta_provider(self) " |-- col1: decimal(22,0) (nullable = true)\n" " |-- col2: string (nullable = true)\n" " |-- dt: date (nullable = true)\n" + " |-- struct_col: struct (nullable = true)\n" + " | |-- struct_inner_col: string (nullable = true)\n" ) relation = SparkRelation.create( schema='default_schema', @@ -518,7 +535,7 @@ def test_parse_columns_from_information_with_table_type_and_delta_provider(self) config = self._get_target_http(self.project_cfg) columns = SparkAdapter(config).parse_columns_from_information( relation) - self.assertEqual(len(columns), 3) + self.assertEqual(len(columns), 4) self.assertEqual(columns[0].to_column_dict(omit_none=False), { 'table_database': None, 'table_schema': relation.schema, @@ -538,6 +555,25 @@ def test_parse_columns_from_information_with_table_type_and_delta_provider(self) 'stats:bytes:value': 123456789, }) + self.assertEqual(columns[3].to_column_dict(omit_none=False), { + 'table_database': None, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_owner': 'root', + 'column': 'struct_col', + 'column_index': 3, + 'dtype': 'struct', + 'numeric_scale': None, + 'numeric_precision': None, + 'char_size': None, + + 'stats:bytes:description': '', + 'stats:bytes:include': True, + 'stats:bytes:label': 'bytes', + 'stats:bytes:value': 123456789, + }) + def test_parse_columns_from_information_with_view_type(self): self.maxDiff = None rel_type = SparkRelation.get_relation_type.View @@ -571,6 +607,8 @@ def test_parse_columns_from_information_with_view_type(self): " |-- col1: decimal(22,0) (nullable = true)\n" " |-- col2: string (nullable = true)\n" " |-- dt: date (nullable = true)\n" + " |-- struct_col: struct (nullable = true)\n" + " | |-- struct_inner_col: string (nullable = true)\n" ) relation = SparkRelation.create( schema='default_schema', @@ -582,7 +620,7 @@ def test_parse_columns_from_information_with_view_type(self): config = self._get_target_http(self.project_cfg) columns = SparkAdapter(config).parse_columns_from_information( relation) - self.assertEqual(len(columns), 3) + self.assertEqual(len(columns), 4) self.assertEqual(columns[1].to_column_dict(omit_none=False), { 'table_database': None, 'table_schema': relation.schema, @@ -597,6 +635,20 @@ def test_parse_columns_from_information_with_view_type(self): 'char_size': None }) + self.assertEqual(columns[3].to_column_dict(omit_none=False), { + 'table_database': None, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_owner': 'root', + 'column': 'struct_col', + 'column_index': 3, + 'dtype': 'struct', + 'numeric_scale': None, + 'numeric_precision': None, + 'char_size': None + }) + def test_parse_columns_from_information_with_table_type_and_parquet_provider(self): self.maxDiff = None rel_type = SparkRelation.get_relation_type.Table @@ -619,6 +671,8 @@ def test_parse_columns_from_information_with_table_type_and_parquet_provider(sel " |-- col1: decimal(22,0) (nullable = true)\n" " |-- col2: string (nullable = true)\n" " |-- dt: date (nullable = true)\n" + " |-- struct_col: struct (nullable = true)\n" + " | |-- struct_inner_col: string (nullable = true)\n" ) relation = SparkRelation.create( schema='default_schema', @@ -630,7 +684,7 @@ def test_parse_columns_from_information_with_table_type_and_parquet_provider(sel config = self._get_target_http(self.project_cfg) columns = SparkAdapter(config).parse_columns_from_information( relation) - self.assertEqual(len(columns), 3) + self.assertEqual(len(columns), 4) self.assertEqual(columns[2].to_column_dict(omit_none=False), { 'table_database': None, 'table_schema': relation.schema, @@ -655,3 +709,27 @@ def test_parse_columns_from_information_with_table_type_and_parquet_provider(sel 'stats:rows:value': 12345678 }) + self.assertEqual(columns[3].to_column_dict(omit_none=False), { + 'table_database': None, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_owner': 'root', + 'column': 'struct_col', + 'column_index': 3, + 'dtype': 'struct', + 'numeric_scale': None, + 'numeric_precision': None, + 'char_size': None, + + 'stats:bytes:description': '', + 'stats:bytes:include': True, + 'stats:bytes:label': 'bytes', + 'stats:bytes:value': 1234567890, + + 'stats:rows:description': '', + 'stats:rows:include': True, + 'stats:rows:label': 'rows', + 'stats:rows:value': 12345678 + }) +