From fdf66b432e71b284f88a2775dcf260243c08a035 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 4 Dec 2019 17:58:47 +0100 Subject: [PATCH 01/14] Pull the owner from the DESCRIBE EXTENDED --- .gitignore | 1 + dbt/adapters/spark/impl.py | 105 ++++++++++++++--------- dbt/include/spark/macros/adapters.sql | 2 +- test/unit/test_adapter.py | 117 +++++++++++++++++++++----- 4 files changed, 165 insertions(+), 60 deletions(-) diff --git a/.gitignore b/.gitignore index 201bd4035..0984cae0a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ env/ *.pyc __pycache__ .tox/ +.idea/ diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 0af7bd3b9..ea7b05bbd 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,7 +1,9 @@ +from agate import Column from dbt.adapters.sql import SQLAdapter from dbt.adapters.spark import SparkRelation from dbt.adapters.spark import SparkConnectionManager import dbt.exceptions +from typing import List from dbt.logger import GLOBAL_LOGGER as logger import agate @@ -16,6 +18,19 @@ class SparkAdapter(SQLAdapter): ConnectionManager = SparkConnectionManager Relation = SparkRelation + column_names = ( + 'table_database', + 'table_schema', + 'table_name', + 'table_type', + 'table_comment', + 'table_owner', + 'column_name', + 'column_index', + 'column_type', + 'column_comment', + ) + @classmethod def date_function(cls): return 'CURRENT_TIMESTAMP()' @@ -57,7 +72,7 @@ def get_relation_type(self, relation, model_name=None): # Override that creates macros without a known type - adapter macros that # require a type will dynamically check at query-time def list_relations_without_caching(self, information_schema, schema, - model_name=None): + model_name=None) -> List[Relation]: kwargs = {'information_schema': information_schema, 'schema': schema} results = self.execute_macro( LIST_RELATIONS_MACRO_NAME, @@ -93,22 +108,56 @@ def drop_relation(self, relation, model_name=None): connection_name=model_name ) + @staticmethod + def _parse_relation(relation: Relation, table_columns: List[Column], rel_type: str) -> List[dict]: + table_owner = None + found_detailed_table_marker = False + for column in table_columns: + if column.name == '# Detailed Table Information': + found_detailed_table_marker = True + + # In case there is another column with the name Owner + if not found_detailed_table_marker: + continue + + if column.name == 'Owner': + table_owner = column.data_type + + columns = [] + for column in table_columns: + # Fixes for pseudocolumns with no type + if column.name in { + '# Partition Information', + '# col_name', + '' + }: + continue + elif column.dtype is None: + continue + elif column.name == '# Detailed Table Information': + # Loop until the detailed table information + break + + column_data = ( + relation.database, + relation.schema, + relation.name, + rel_type, + None, + table_owner, + column.name, + len(columns), + column.data_type, + None + ) + column_dict = dict(zip(SparkAdapter.column_names, column_data)) + columns.append(column_dict) + + return columns + def get_catalog(self, manifest): schemas = manifest.get_used_schemas() - column_names = ( - 'table_database', - 'table_schema', - 'table_name', - 'table_type', - 'table_comment', - 'table_owner', - 'column_name', - 'column_index', - 'column_type', - 'column_comment', - ) - columns = [] for (database_name, schema_name) in schemas: relations = self.list_relations(database_name, schema_name) @@ -116,30 +165,6 @@ def get_catalog(self, manifest): logger.debug("Getting table schema for relation {}".format(relation)) # noqa table_columns = self.get_columns_in_relation(relation) rel_type = self.get_relation_type(relation) + columns += self._parse_relation(relation, table_columns, rel_type) - for column_index, column in enumerate(table_columns): - # Fixes for pseudocolumns with no type - if column.name in ( - '# Partition Information', - '# col_name' - ): - continue - elif column.dtype is None: - continue - - column_data = ( - relation.database, - relation.schema, - relation.name, - rel_type, - None, - None, - column.name, - column_index, - column.data_type, - None, - ) - column_dict = dict(zip(column_names, column_data)) - columns.append(column_dict) - - return dbt.clients.agate_helper.table_from_data(columns, column_names) + return dbt.clients.agate_helper.table_from_data(columns, SparkAdapter.column_names) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index c4616ca8c..f1e120c1b 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -46,7 +46,7 @@ {% macro spark__get_columns_in_relation(relation) -%} {% call statement('get_columns_in_relation', fetch_result=True) %} - describe {{ relation }} + describe extended {{ relation }} {% endcall %} {% set table = load_result('get_columns_in_relation').table %} diff --git a/test/unit/test_adapter.py b/test/unit/test_adapter.py index 898578863..32f561a7b 100644 --- a/test/unit/test_adapter.py +++ b/test/unit/test_adapter.py @@ -1,12 +1,14 @@ +import dbt.flags as flags import mock import unittest -import dbt.adapters -import dbt.flags as flags +from collections import namedtuple + +from agate import Column, MappedSequence +from dbt.adapters.base import BaseRelation from pyhive import hive -from dbt.adapters.spark import SparkAdapter -import agate -from .utils import config_from_parts_or_dicts, inject_adapter +from dbt.adapters.spark import SparkAdapter +from .utils import config_from_parts_or_dicts class TestSparkAdapter(unittest.TestCase): @@ -29,13 +31,13 @@ def get_target_http(self, project): return config_from_parts_or_dicts(project, { 'outputs': { 'test': { - 'type': 'spark', - 'method': 'http', - 'schema': 'analytics', - 'host': 'myorg.sparkhost.com', - 'port': 443, - 'token': 'abc123', - 'cluster': '01234-23423-coffeetime', + 'type': 'spark', + 'method': 'http', + 'schema': 'analytics', + 'host': 'myorg.sparkhost.com', + 'port': 443, + 'token': 'abc123', + 'cluster': '01234-23423-coffeetime', } }, 'target': 'test' @@ -45,12 +47,12 @@ def get_target_thrift(self, project): return config_from_parts_or_dicts(project, { 'outputs': { 'test': { - 'type': 'spark', - 'method': 'thrift', - 'schema': 'analytics', - 'host': 'myorg.sparkhost.com', - 'port': 10001, - 'user': 'dbt' + 'type': 'spark', + 'method': 'thrift', + 'schema': 'analytics', + 'host': 'myorg.sparkhost.com', + 'port': 10001, + 'user': 'dbt' } }, 'target': 'test' @@ -60,7 +62,6 @@ def test_http_connection(self): config = self.get_target_http(self.project_cfg) adapter = SparkAdapter(config) - def hive_http_connect(thrift_transport): self.assertEqual(thrift_transport.scheme, 'https') self.assertEqual(thrift_transport.port, 443) @@ -87,3 +88,81 @@ def hive_thrift_connect(host, port, username): self.assertEqual(connection.state, 'open') self.assertNotEqual(connection.handle, None) + + def test_parse_relation(self): + rel_type = 'table' + + relation = BaseRelation.create( + database='default_database', + schema='default_schema', + identifier='mytable', + type=rel_type + ) + + # Mimics the output of Spark with a DESCRIBE TABLE EXTENDED + plain_rows = [ + ('col1', 'decimal(22,0)'), + ('col2', 'string',), + ('# Partition Information', 'data_type'), + ('# col_name', 'data_type'), + ('dt', 'date'), + ('', ''), + ('# Detailed Table Information', ''), + ('Database', relation.database), + ('Owner', 'root'), + ('Created Time', 'Wed Feb 04 18:15:00 UTC 1815'), + ('Last Access', 'Wed May 20 19:25:00 UTC 1925'), + ('Type', 'MANAGED'), + ('Provider', 'delta'), + ('Location', '/mnt/vo'), + ('Serde Library', 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'), + ('InputFormat', 'org.apache.hadoop.mapred.SequenceFileInputFormat'), + ('OutputFormat', 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'), + ('Partition Provider', 'Catalog') + ] + + input_cols = [Column(index=None, name=r[0], data_type=r[1], rows=MappedSequence( + keys=['col_name', 'data_type'], + values=r + )) for r in plain_rows] + + rows = SparkAdapter._parse_relation(relation, input_cols, rel_type) + self.assertEqual(len(rows), 3) + self.assertEqual(rows[0], { + 'table_database': relation.database, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_comment': None, + 'table_owner': 'root', + 'column_name': 'col1', + 'column_index': 0, + 'column_type': 'decimal(22,0)', + 'column_comment': None + }) + + self.assertEqual(rows[1], { + 'table_database': relation.database, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_comment': None, + 'table_owner': 'root', + 'column_name': 'col2', + 'column_index': 1, + 'column_type': 'string', + 'column_comment': None + }) + + self.assertEqual(rows[2], { + 'table_database': relation.database, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_comment': None, + 'table_owner': 'root', + 'column_name': 'dt', + 'column_index': 2, + 'column_type': 'date', + 'column_comment': None + }) From 5af786567df4299c965ca26664b72a4290502c4d Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 5 Dec 2019 09:33:07 +0100 Subject: [PATCH 02/14] Switch the order --- dbt/adapters/spark/impl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index ea7b05bbd..a13ecba73 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -132,11 +132,11 @@ def _parse_relation(relation: Relation, table_columns: List[Column], rel_type: s '' }: continue - elif column.dtype is None: - continue elif column.name == '# Detailed Table Information': # Loop until the detailed table information break + elif column.dtype is None: + continue column_data = ( relation.database, From 42487a886752b8ec01eac5d23d400e12a39fc7af Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 6 Dec 2019 16:42:47 +0100 Subject: [PATCH 03/14] We also want to look inside of the property --- dbt/adapters/spark/impl.py | 42 +++++++++++++++++------- dbt/include/spark/macros/adapters.sql | 6 ++++ test/unit/test_adapter.py | 47 +++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 12 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index a13ecba73..7ea03ff97 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -1,17 +1,19 @@ +from typing import List, Dict + +import agate +import dbt.exceptions from agate import Column from dbt.adapters.sql import SQLAdapter -from dbt.adapters.spark import SparkRelation -from dbt.adapters.spark import SparkConnectionManager -import dbt.exceptions -from typing import List - +from dbt.contracts.graph.manifest import Manifest from dbt.logger import GLOBAL_LOGGER as logger -import agate +from dbt.adapters.spark import SparkConnectionManager +from dbt.adapters.spark import SparkRelation LIST_RELATIONS_MACRO_NAME = 'list_relations_without_caching' GET_RELATION_TYPE_MACRO_NAME = 'spark_get_relation_type' DROP_RELATION_MACRO_NAME = 'drop_relation' +FETCH_TBLPROPERTIES_MACRO_NAME = 'spark_fetch_tblproperties' class SparkAdapter(SQLAdapter): @@ -109,8 +111,16 @@ def drop_relation(self, relation, model_name=None): ) @staticmethod - def _parse_relation(relation: Relation, table_columns: List[Column], rel_type: str) -> List[dict]: - table_owner = None + def _parse_relation(relation: Relation, + table_columns: List[Column], + rel_type: str, + properties: Dict[str, str] = None) -> List[dict]: + properties = properties or {} + table_owner_key = 'Owner' + + # First check if it is present in the properties + table_owner = properties.get(table_owner_key) + found_detailed_table_marker = False for column in table_columns: if column.name == '# Detailed Table Information': @@ -120,7 +130,7 @@ def _parse_relation(relation: Relation, table_columns: List[Column], rel_type: s if not found_detailed_table_marker: continue - if column.name == 'Owner': + if not table_owner and column.name == table_owner_key: table_owner = column.data_type columns = [] @@ -135,7 +145,7 @@ def _parse_relation(relation: Relation, table_columns: List[Column], rel_type: s elif column.name == '# Detailed Table Information': # Loop until the detailed table information break - elif column.dtype is None: + elif column.data_type is None: continue column_data = ( @@ -155,16 +165,24 @@ def _parse_relation(relation: Relation, table_columns: List[Column], rel_type: s return columns - def get_catalog(self, manifest): + def get_properties(self, relation: Relation) -> Dict[str, str]: + properties = self.execute_macro( + FETCH_TBLPROPERTIES_MACRO_NAME, + kwargs={'relation': relation} + ) + return {key: value for (key, value) in properties} + + def get_catalog(self, manifest: Manifest): schemas = manifest.get_used_schemas() columns = [] for (database_name, schema_name) in schemas: relations = self.list_relations(database_name, schema_name) for relation in relations: + properties = self.get_properties(relation) logger.debug("Getting table schema for relation {}".format(relation)) # noqa table_columns = self.get_columns_in_relation(relation) rel_type = self.get_relation_type(relation) - columns += self._parse_relation(relation, table_columns, rel_type) + columns += self._parse_relation(relation, table_columns, rel_type, properties) return dbt.clients.agate_helper.table_from_data(columns, SparkAdapter.column_names) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index f1e120c1b..42c41f36a 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -89,6 +89,12 @@ {% endif %} {%- endmacro %} +{% macro spark_fetch_tblproperties(relation) -%} + {% call statement('list_properties', fetch_result=True) -%} + SHOW TBLPROPERTIES {{ relation }} + {% endcall %} + {% do return(load_result('list_properties').table) %} +{%- endmacro %} {% macro spark__rename_relation(from_relation, to_relation) -%} {% call statement('rename_relation') -%} diff --git a/test/unit/test_adapter.py b/test/unit/test_adapter.py index 32f561a7b..0ec495420 100644 --- a/test/unit/test_adapter.py +++ b/test/unit/test_adapter.py @@ -166,3 +166,50 @@ def test_parse_relation(self): 'column_type': 'date', 'column_comment': None }) + + def test_parse_relation_with_properties(self): + rel_type = 'table' + + relation = BaseRelation.create( + database='default_database', + schema='default_schema', + identifier='mytable', + type=rel_type + ) + + # Mimics the output of Spark with a DESCRIBE TABLE EXTENDED + plain_rows = [ + ('col1', 'decimal(19,25)'), + ('', ''), + ('# Detailed Table Information', ''), + ('Database', relation.database), + ('Owner', 'root'), + ('Created Time', 'Wed Feb 04 18:15:00 UTC 1815'), + ('Last Access', 'Wed May 20 19:25:00 UTC 1925'), + ('Type', 'MANAGED'), + ('Provider', 'delta'), + ('Location', '/mnt/vo'), + ('Serde Library', 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'), + ('InputFormat', 'org.apache.hadoop.mapred.SequenceFileInputFormat'), + ('OutputFormat', 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'), + ('Partition Provider', 'Catalog') + ] + + input_cols = [Column(index=None, name=r[0], data_type=r[1], rows=MappedSequence( + keys=['col_name', 'data_type'], + values=r + )) for r in plain_rows] + + rows = SparkAdapter._parse_relation(relation, input_cols, rel_type, {'Owner': 'Fokko'}) + self.assertEqual(rows[0], { + 'table_database': relation.database, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': rel_type, + 'table_comment': None, + 'table_owner': 'Fokko', + 'column_name': 'col1', + 'column_index': 0, + 'column_type': 'decimal(19,25)', + 'column_comment': None + }) From 2d19025dcedf27873c060203ad1ed688aa53dddb Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sun, 16 Feb 2020 14:28:20 +0100 Subject: [PATCH 04/14] Update --- dbt/adapters/spark/impl.py | 98 ++++++++++----------------- dbt/include/spark/macros/adapters.sql | 3 +- requirements.txt | 2 + test/unit/test_adapter.py | 59 ++-------------- 4 files changed, 43 insertions(+), 119 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 505a4b376..9a178810d 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -2,7 +2,6 @@ import agate import dbt.exceptions -from agate import Column from dbt.adapters.sql import SQLAdapter from dbt.contracts.graph.manifest import Manifest from dbt.logger import GLOBAL_LOGGER as logger @@ -15,24 +14,13 @@ DROP_RELATION_MACRO_NAME = 'drop_relation' FETCH_TBLPROPERTIES_MACRO_NAME = 'spark_fetch_tblproperties' +KEY_TABLE_OWNER = 'Owner' + class SparkAdapter(SQLAdapter): ConnectionManager = SparkConnectionManager Relation = SparkRelation - column_names = frozenset({ - 'table_database', - 'table_schema', - 'table_name', - 'table_type', - 'table_comment', - 'table_owner', - 'column_name', - 'column_index', - 'column_type', - 'column_comment', - }) - AdapterSpecificConfigs = frozenset({"file_format", "location_root", "partition_by", "clustered_by", "buckets"}) @@ -65,7 +53,7 @@ def get_relation_type(self, relation, model_name=None): # Override that creates macros without a known type - adapter macros that # require a type will dynamically check at query-time def list_relations_without_caching(self, information_schema, schema, - model_name=None) -> List[Relation]: + model_name=None) -> List: kwargs = {'information_schema': information_schema, 'schema': schema} try: results = self.execute_macro( @@ -126,57 +114,37 @@ def drop_relation(self, relation, model_name=None): ) @staticmethod - def _parse_relation(relation: Relation, - table_columns: List[Column], - rel_type: str, - properties: Dict[str, str] = None) -> List[dict]: - properties = properties or {} - table_owner_key = 'Owner' - - # First check if it is present in the properties - table_owner = properties.get(table_owner_key) + def find_table_information_separator(rows): + pos = 0 + for row in rows: + if not row.name: + break + pos += 1 + return pos - found_detailed_table_marker = False - for column in table_columns: - if column.name == '# Detailed Table Information': - found_detailed_table_marker = True + @staticmethod + def parse_describe_extended(relation: Relation, table): - # In case there is another column with the name Owner - if not found_detailed_table_marker: - continue + pos = SparkAdapter.find_table_information_separator(table) - if not table_owner and column.name == table_owner_key: - table_owner = column.data_type + # Remove rows that start with a hash, they are comments + rows = [row for row in table[0:pos] if not row.name.startswith('#')] + metadata = {col.name: col.data_type for col in table[pos + 1:]} columns = [] - for column in table_columns: - # Fixes for pseudocolumns with no type - if column.name in { - '# Partition Information', - '# col_name', - '' - }: - continue - elif column.name == '# Detailed Table Information': - # Loop until the detailed table information - break - elif column.data_type is None: - continue - - column_data = ( - relation.database, - relation.schema, - relation.name, - rel_type, - None, - table_owner, - column.name, - len(columns), - column.data_type, - None - ) - column_dict = dict(zip(SparkAdapter.column_names, column_data)) - columns.append(column_dict) + for column in rows: + columns.append({ + 'table_database': relation.database, + 'table_schema': relation.schema, + 'table_name': relation.name, + 'table_type': relation.type, + 'table_comment': None, + 'table_owner': metadata.get(KEY_TABLE_OWNER), + 'column_name': column.name, + 'column_index': len(columns), + 'column_type': column.data_type, + 'column_comment': None + }) return columns @@ -195,9 +163,11 @@ def get_catalog(self, manifest: Manifest): relations = self.list_relations(database_name, schema_name) for relation in relations: properties = self.get_properties(relation) - logger.debug("Getting table schema for relation {}".format(relation)) # noqa + logger.debug("Getting table schema for relation {}", relation) table_columns = self.get_columns_in_relation(relation) rel_type = self.get_relation_type(relation) - columns += self._parse_relation(relation, table_columns, rel_type, properties) + columns += self._parse_relation( + relation, table_columns, rel_type, properties) - return dbt.clients.agate_helper.table_from_data(columns, SparkAdapter.column_names) + return dbt.clients.agate_helper.table_from_data( + columns, SparkAdapter.column_names) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index f55eb9dfc..b171044ab 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -99,8 +99,7 @@ {% endcall %} {% set table = load_result('get_columns_in_relation').table %} - {{ return(sql_convert_columns_in_relation(table)) }} - + {{ return(adapter.parse_describe_extended(table)) }} {% endmacro %} diff --git a/requirements.txt b/requirements.txt index 5cdd15fe2..6a16ef217 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ dbt-core==0.14.3 PyHive>=0.6.0,<0.7.0 thrift>=0.11.0,<0.12.0 +sasl>=0.2.1 +thrift_sasl>=0.1.0 \ No newline at end of file diff --git a/test/unit/test_adapter.py b/test/unit/test_adapter.py index 0ec495420..9e578859e 100644 --- a/test/unit/test_adapter.py +++ b/test/unit/test_adapter.py @@ -1,8 +1,7 @@ -import dbt.flags as flags -import mock import unittest -from collections import namedtuple +import dbt.flags as flags +import mock from agate import Column, MappedSequence from dbt.adapters.base import BaseRelation from pyhive import hive @@ -90,6 +89,7 @@ def hive_thrift_connect(host, port, username): self.assertNotEqual(connection.handle, None) def test_parse_relation(self): + self.maxDiff = None rel_type = 'table' relation = BaseRelation.create( @@ -106,8 +106,8 @@ def test_parse_relation(self): ('# Partition Information', 'data_type'), ('# col_name', 'data_type'), ('dt', 'date'), - ('', ''), - ('# Detailed Table Information', ''), + (None, None), + ('# Detailed Table Information', None), ('Database', relation.database), ('Owner', 'root'), ('Created Time', 'Wed Feb 04 18:15:00 UTC 1815'), @@ -126,7 +126,7 @@ def test_parse_relation(self): values=r )) for r in plain_rows] - rows = SparkAdapter._parse_relation(relation, input_cols, rel_type) + rows = SparkAdapter.parse_describe_extended(relation, input_cols) self.assertEqual(len(rows), 3) self.assertEqual(rows[0], { 'table_database': relation.database, @@ -166,50 +166,3 @@ def test_parse_relation(self): 'column_type': 'date', 'column_comment': None }) - - def test_parse_relation_with_properties(self): - rel_type = 'table' - - relation = BaseRelation.create( - database='default_database', - schema='default_schema', - identifier='mytable', - type=rel_type - ) - - # Mimics the output of Spark with a DESCRIBE TABLE EXTENDED - plain_rows = [ - ('col1', 'decimal(19,25)'), - ('', ''), - ('# Detailed Table Information', ''), - ('Database', relation.database), - ('Owner', 'root'), - ('Created Time', 'Wed Feb 04 18:15:00 UTC 1815'), - ('Last Access', 'Wed May 20 19:25:00 UTC 1925'), - ('Type', 'MANAGED'), - ('Provider', 'delta'), - ('Location', '/mnt/vo'), - ('Serde Library', 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'), - ('InputFormat', 'org.apache.hadoop.mapred.SequenceFileInputFormat'), - ('OutputFormat', 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'), - ('Partition Provider', 'Catalog') - ] - - input_cols = [Column(index=None, name=r[0], data_type=r[1], rows=MappedSequence( - keys=['col_name', 'data_type'], - values=r - )) for r in plain_rows] - - rows = SparkAdapter._parse_relation(relation, input_cols, rel_type, {'Owner': 'Fokko'}) - self.assertEqual(rows[0], { - 'table_database': relation.database, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': rel_type, - 'table_comment': None, - 'table_owner': 'Fokko', - 'column_name': 'col1', - 'column_index': 0, - 'column_type': 'decimal(19,25)', - 'column_comment': None - }) From c0ceb22da9ad24717507553d6129799b7b0d6ddd Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sun, 16 Feb 2020 20:49:47 +0100 Subject: [PATCH 05/14] First version --- dbt/adapters/spark/impl.py | 68 +++++++++++++++------------ dbt/include/spark/macros/adapters.sql | 2 +- 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 9a178810d..5f6fc0db7 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -2,6 +2,7 @@ import agate import dbt.exceptions +from dbt.adapters.base import available from dbt.adapters.sql import SQLAdapter from dbt.contracts.graph.manifest import Manifest from dbt.logger import GLOBAL_LOGGER as logger @@ -25,6 +26,19 @@ class SparkAdapter(SQLAdapter): "partition_by", "clustered_by", "buckets"}) + column_names = ( + 'table_database', + 'table_schema', + 'table_name', + 'table_type', + 'table_comment', + 'table_owner', + 'column_name', + 'column_index', + 'column_type', + 'column_comment' + ) + @classmethod def date_function(cls): return 'CURRENT_TIMESTAMP()' @@ -117,36 +131,34 @@ def drop_relation(self, relation, model_name=None): def find_table_information_separator(rows): pos = 0 for row in rows: - if not row.name: + if not row['col_name']: break pos += 1 return pos - @staticmethod - def parse_describe_extended(relation: Relation, table): - - pos = SparkAdapter.find_table_information_separator(table) + @available + def parse_describe_extended(self, relation, raw_rows: List[agate.Row]): + # Convert the Row to a dict + dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] + # Find the separator between the rows and the metadata provided by extended + pos = SparkAdapter.find_table_information_separator(dict_rows) # Remove rows that start with a hash, they are comments - rows = [row for row in table[0:pos] if not row.name.startswith('#')] - metadata = {col.name: col.data_type for col in table[pos + 1:]} - - columns = [] - for column in rows: - columns.append({ - 'table_database': relation.database, - 'table_schema': relation.schema, - 'table_name': relation.name, - 'table_type': relation.type, - 'table_comment': None, - 'table_owner': metadata.get(KEY_TABLE_OWNER), - 'column_name': column.name, - 'column_index': len(columns), - 'column_type': column.data_type, - 'column_comment': None - }) - - return columns + rows = [row for row in raw_rows[0:pos] if not row['col_name'].startswith('#')] + metadata = {col['col_name']: col['data_type'] for col in raw_rows[pos + 1:]} + + return [dict(zip(self.column_names, ( + relation.database, + relation.schema, + relation.name, + relation.type, + None, + metadata.get(KEY_TABLE_OWNER), + column['col_name'], + idx, + column['data_type'], + None + ))) for idx, column in enumerate(rows)] def get_properties(self, relation: Relation) -> Dict[str, str]: properties = self.execute_macro( @@ -162,12 +174,8 @@ def get_catalog(self, manifest: Manifest): for (database_name, schema_name) in schemas: relations = self.list_relations(database_name, schema_name) for relation in relations: - properties = self.get_properties(relation) logger.debug("Getting table schema for relation {}", relation) - table_columns = self.get_columns_in_relation(relation) - rel_type = self.get_relation_type(relation) - columns += self._parse_relation( - relation, table_columns, rel_type, properties) + columns += self.get_columns_in_relation(relation) return dbt.clients.agate_helper.table_from_data( - columns, SparkAdapter.column_names) + columns, self.column_names) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index b171044ab..5483d8783 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -99,7 +99,7 @@ {% endcall %} {% set table = load_result('get_columns_in_relation').table %} - {{ return(adapter.parse_describe_extended(table)) }} + {{ return(adapter.parse_describe_extended(relation, table)) }} {% endmacro %} From 9b6a134e356b47594af9a89d55c875d34ed4fdd7 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 17 Feb 2020 15:40:06 +0100 Subject: [PATCH 06/14] A bit of a cleanup --- dbt/adapters/spark/impl.py | 4 ++-- requirements.txt | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 5f6fc0db7..ea4b7c8ec 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -128,7 +128,7 @@ def drop_relation(self, relation, model_name=None): ) @staticmethod - def find_table_information_separator(rows): + def find_table_information_separator(rows: List[dict]) -> int: pos = 0 for row in rows: if not row['col_name']: @@ -137,7 +137,7 @@ def find_table_information_separator(rows): return pos @available - def parse_describe_extended(self, relation, raw_rows: List[agate.Row]): + def parse_describe_extended(self, relation: Relation, raw_rows: List[agate.Row]): # Convert the Row to a dict dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] # Find the separator between the rows and the metadata provided by extended diff --git a/requirements.txt b/requirements.txt index 6a16ef217..5cdd15fe2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,3 @@ dbt-core==0.14.3 PyHive>=0.6.0,<0.7.0 thrift>=0.11.0,<0.12.0 -sasl>=0.2.1 -thrift_sasl>=0.1.0 \ No newline at end of file From 03d3c19f50eece6c205d77d36c41340dd3a78324 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 17 Feb 2020 15:53:48 +0100 Subject: [PATCH 07/14] Make Flake8 happy --- dbt/adapters/spark/impl.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index ea4b7c8ec..4a11b6658 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -137,15 +137,25 @@ def find_table_information_separator(rows: List[dict]) -> int: return pos @available - def parse_describe_extended(self, relation: Relation, raw_rows: List[agate.Row]): + def parse_describe_extended( + self, + relation: Relation, + raw_rows: List[agate.Row] + ): # Convert the Row to a dict dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] - # Find the separator between the rows and the metadata provided by extended + # Find the separator between the rows and the metadata provided + # by the DESCRIBE TABLE EXTENDED statement pos = SparkAdapter.find_table_information_separator(dict_rows) # Remove rows that start with a hash, they are comments - rows = [row for row in raw_rows[0:pos] if not row['col_name'].startswith('#')] - metadata = {col['col_name']: col['data_type'] for col in raw_rows[pos + 1:]} + rows = [ + row for row in raw_rows[0:pos] + if not row['col_name'].startswith('#') + ] + metadata = { + col['col_name']: col['data_type'] for col in raw_rows[pos + 1:] + } return [dict(zip(self.column_names, ( relation.database, From 5edae899d9136cc84be24a54b61e5203e112f222 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 18 Feb 2020 08:00:56 +0100 Subject: [PATCH 08/14] Fix the failing test --- test/unit/test_adapter.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/unit/test_adapter.py b/test/unit/test_adapter.py index 9e578859e..4d2178d77 100644 --- a/test/unit/test_adapter.py +++ b/test/unit/test_adapter.py @@ -2,7 +2,7 @@ import dbt.flags as flags import mock -from agate import Column, MappedSequence +from agate import Row from dbt.adapters.base import BaseRelation from pyhive import hive @@ -121,12 +121,10 @@ def test_parse_relation(self): ('Partition Provider', 'Catalog') ] - input_cols = [Column(index=None, name=r[0], data_type=r[1], rows=MappedSequence( - keys=['col_name', 'data_type'], - values=r - )) for r in plain_rows] + input_cols = [Row(keys=['col_name', 'data_type'], values=r) for r in plain_rows] - rows = SparkAdapter.parse_describe_extended(relation, input_cols) + config = self.get_target_http(self.project_cfg) + rows = SparkAdapter(config).parse_describe_extended(relation, input_cols) self.assertEqual(len(rows), 3) self.assertEqual(rows[0], { 'table_database': relation.database, From 9d340f8f1b66b60cd97180bcc38b31ccb2d4cc5d Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 2 Mar 2020 15:00:18 +0100 Subject: [PATCH 09/14] Introduced the SparkColumn --- dbt/adapters/spark/impl.py | 45 ++++++++++----------------- dbt/adapters/spark/relation.py | 31 +++++++++++++++++- dbt/include/spark/macros/adapters.sql | 4 +-- 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 4a11b6658..9a558dc2e 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -2,18 +2,19 @@ import agate import dbt.exceptions -from dbt.adapters.base import available from dbt.adapters.sql import SQLAdapter from dbt.contracts.graph.manifest import Manifest from dbt.logger import GLOBAL_LOGGER as logger from dbt.adapters.spark import SparkConnectionManager from dbt.adapters.spark import SparkRelation +from dbt.adapters.spark.relation import SparkColumn LIST_RELATIONS_MACRO_NAME = 'list_relations_without_caching' GET_RELATION_TYPE_MACRO_NAME = 'spark_get_relation_type' DROP_RELATION_MACRO_NAME = 'drop_relation' FETCH_TBLPROPERTIES_MACRO_NAME = 'spark_fetch_tblproperties' +GET_COLUMNS_IN_RELATION_MACRO_NAME = 'get_columns_in_relation' KEY_TABLE_OWNER = 'Owner' @@ -26,19 +27,6 @@ class SparkAdapter(SQLAdapter): "partition_by", "clustered_by", "buckets"}) - column_names = ( - 'table_database', - 'table_schema', - 'table_name', - 'table_type', - 'table_comment', - 'table_owner', - 'column_name', - 'column_index', - 'column_type', - 'column_comment' - ) - @classmethod def date_function(cls): return 'CURRENT_TIMESTAMP()' @@ -136,12 +124,11 @@ def find_table_information_separator(rows: List[dict]) -> int: pos += 1 return pos - @available - def parse_describe_extended( - self, - relation: Relation, - raw_rows: List[agate.Row] - ): + def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: + raw_rows: List[agate.Row] = self.execute_macro( + GET_COLUMNS_IN_RELATION_MACRO_NAME, + kwargs={'relation': relation} + ) # Convert the Row to a dict dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] # Find the separator between the rows and the metadata provided @@ -157,18 +144,16 @@ def parse_describe_extended( col['col_name']: col['data_type'] for col in raw_rows[pos + 1:] } - return [dict(zip(self.column_names, ( + return [SparkColumn( relation.database, relation.schema, relation.name, relation.type, - None, metadata.get(KEY_TABLE_OWNER), column['col_name'], idx, - column['data_type'], - None - ))) for idx, column in enumerate(rows)] + column['data_type'] + ) for idx, column in enumerate(rows)] def get_properties(self, relation: Relation) -> Dict[str, str]: properties = self.execute_macro( @@ -177,15 +162,17 @@ def get_properties(self, relation: Relation) -> Dict[str, str]: ) return {key: value for (key, value) in properties} - def get_catalog(self, manifest: Manifest): + def get_catalog(self, manifest: Manifest) -> agate.Table: schemas = manifest.get_used_schemas() + def to_dict(d: any) -> Dict: + return d.__dict__ + columns = [] for (database_name, schema_name) in schemas: relations = self.list_relations(database_name, schema_name) for relation in relations: logger.debug("Getting table schema for relation {}", relation) - columns += self.get_columns_in_relation(relation) + columns += list(map(to_dict, self.get_columns_in_relation(relation))) - return dbt.clients.agate_helper.table_from_data( - columns, self.column_names) + return agate.Table.from_object(columns) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 20d61b592..446812367 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -1,4 +1,4 @@ -from dbt.adapters.base.relation import BaseRelation +from dbt.adapters.base.relation import BaseRelation, Column class SparkRelation(BaseRelation): @@ -45,3 +45,32 @@ class SparkRelation(BaseRelation): 'required': ['metadata', 'type', 'path', 'include_policy', 'quote_policy', 'quote_character', 'dbt_created'] } + + +class SparkColumn(Column): + + def __init__(self, + table_database: str, + table_schema: str, + table_name: str, + table_type: str, + table_owner: str, + column_name: str, + column_index: int, + column_type: str): + super(SparkColumn, self).__init__(column_name, column_type) + self.table_database = table_database + self.table_schema = table_schema + self.table_name = table_name + self.table_type = table_type + self.table_owner = table_owner + self.column_name = column_name + self.column_index = column_index + + @property + def quoted(self): + return '`{}`'.format(self.column) + + @property + def column_names(self): + return self.name diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index 5483d8783..97e072ee9 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -97,9 +97,7 @@ {% call statement('get_columns_in_relation', fetch_result=True) %} describe extended {{ relation }} {% endcall %} - - {% set table = load_result('get_columns_in_relation').table %} - {{ return(adapter.parse_describe_extended(relation, table)) }} + {% do return(load_result('get_columns_in_relation').table) %} {% endmacro %} From e8f6297bdc66e8da1d2c9dc4dcb03cf21e7dfece Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 2 Mar 2020 15:03:23 +0100 Subject: [PATCH 10/14] Less is more --- dbt/adapters/spark/relation.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 446812367..8fd3ca6f8 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -70,7 +70,3 @@ def __init__(self, @property def quoted(self): return '`{}`'.format(self.column) - - @property - def column_names(self): - return self.name From e40b52882fd610f217af79922881526c275c29f9 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 2 Mar 2020 17:13:15 +0100 Subject: [PATCH 11/14] Fix small issues --- dbt/adapters/spark/impl.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 9a558dc2e..daa242af5 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -124,11 +124,11 @@ def find_table_information_separator(rows: List[dict]) -> int: pos += 1 return pos - def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: - raw_rows: List[agate.Row] = self.execute_macro( - GET_COLUMNS_IN_RELATION_MACRO_NAME, - kwargs={'relation': relation} - ) + def parse_describe_extended( + self, + relation: Relation, + raw_rows: List[agate.Row] + ) -> List[SparkColumn]: # Convert the Row to a dict dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows] # Find the separator between the rows and the metadata provided @@ -155,6 +155,11 @@ def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]: column['data_type'] ) for idx, column in enumerate(rows)] + def get_columns_in_relation(self, + relation: Relation) -> List[SparkColumn]: + rows: List[agate.Row] = super().get_columns_in_relation(relation) + return self.parse_describe_extended(relation, rows) + def get_properties(self, relation: Relation) -> Dict[str, str]: properties = self.execute_macro( FETCH_TBLPROPERTIES_MACRO_NAME, @@ -173,6 +178,8 @@ def to_dict(d: any) -> Dict: relations = self.list_relations(database_name, schema_name) for relation in relations: logger.debug("Getting table schema for relation {}", relation) - columns += list(map(to_dict, self.get_columns_in_relation(relation))) + columns += list( + map(to_dict, self.get_columns_in_relation(relation)) + ) return agate.Table.from_object(columns) From f9745fa8f6ed3e21989a8aab3c255f526a4d62ea Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 3 Mar 2020 09:42:18 +0100 Subject: [PATCH 12/14] Fix the test --- test/unit/test_adapter.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/test/unit/test_adapter.py b/test/unit/test_adapter.py index 4d2178d77..a48569a9d 100644 --- a/test/unit/test_adapter.py +++ b/test/unit/test_adapter.py @@ -126,41 +126,47 @@ def test_parse_relation(self): config = self.get_target_http(self.project_cfg) rows = SparkAdapter(config).parse_describe_extended(relation, input_cols) self.assertEqual(len(rows), 3) - self.assertEqual(rows[0], { + self.assertEqual(rows[0].__dict__, { 'table_database': relation.database, 'table_schema': relation.schema, 'table_name': relation.name, 'table_type': rel_type, - 'table_comment': None, 'table_owner': 'root', + 'column': 'col1', 'column_name': 'col1', 'column_index': 0, - 'column_type': 'decimal(22,0)', - 'column_comment': None + 'dtype': 'decimal(22,0)', + 'numeric_scale': None, + 'numeric_precision': None, + 'char_size': None }) - self.assertEqual(rows[1], { + self.assertEqual(rows[1].__dict__, { 'table_database': relation.database, 'table_schema': relation.schema, 'table_name': relation.name, 'table_type': rel_type, - 'table_comment': None, 'table_owner': 'root', + 'column': 'col2', 'column_name': 'col2', 'column_index': 1, - 'column_type': 'string', - 'column_comment': None + 'dtype': 'string', + 'numeric_scale': None, + 'numeric_precision': None, + 'char_size': None }) - self.assertEqual(rows[2], { + self.assertEqual(rows[2].__dict__, { 'table_database': relation.database, 'table_schema': relation.schema, 'table_name': relation.name, 'table_type': rel_type, - 'table_comment': None, 'table_owner': 'root', + 'column': 'dt', 'column_name': 'dt', 'column_index': 2, - 'column_type': 'date', - 'column_comment': None + 'dtype': 'date', + 'numeric_scale': None, + 'numeric_precision': None, + 'char_size': None }) From 519a870d70532763dd949fdf3221ea74bebcd4d2 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 16 Mar 2020 19:33:32 +0100 Subject: [PATCH 13/14] Apply comments --- dbt/adapters/spark/impl.py | 4 +--- dbt/adapters/spark/relation.py | 3 +++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index daa242af5..3e118c34b 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -119,7 +119,7 @@ def drop_relation(self, relation, model_name=None): def find_table_information_separator(rows: List[dict]) -> int: pos = 0 for row in rows: - if not row['col_name']: + if not row['col_name'] or row['col_name'].startswith('#'): break pos += 1 return pos @@ -143,7 +143,6 @@ def parse_describe_extended( metadata = { col['col_name']: col['data_type'] for col in raw_rows[pos + 1:] } - return [SparkColumn( relation.database, relation.schema, @@ -181,5 +180,4 @@ def to_dict(d: any) -> Dict: columns += list( map(to_dict, self.get_columns_in_relation(relation)) ) - return agate.Table.from_object(columns) diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 8fd3ca6f8..9edbff51a 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -70,3 +70,6 @@ def __init__(self, @property def quoted(self): return '`{}`'.format(self.column) + + def __repr__(self): + return "".format(self.name, self.data_type) From 7f060337caa2f79723cacf435d962aef43ef3d53 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Mon, 16 Mar 2020 20:03:29 +0100 Subject: [PATCH 14/14] Fix the unit test :) --- test/unit/test_adapter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/unit/test_adapter.py b/test/unit/test_adapter.py index a48569a9d..4ddd99c84 100644 --- a/test/unit/test_adapter.py +++ b/test/unit/test_adapter.py @@ -103,6 +103,7 @@ def test_parse_relation(self): plain_rows = [ ('col1', 'decimal(22,0)'), ('col2', 'string',), + ('dt', 'date'), ('# Partition Information', 'data_type'), ('# col_name', 'data_type'), ('dt', 'date'),