Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix catalog generation #77

Merged
merged 1 commit into from
Apr 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import agate
import dbt.exceptions
import dbt
from dbt.adapters.base.relation import SchemaSearchMap
from dbt.adapters.sql import SQLAdapter
from dbt.contracts.graph.manifest import Manifest
from dbt.node_types import NodeType

from dbt.adapters.spark import SparkConnectionManager
from dbt.adapters.spark import SparkRelation
Expand Down Expand Up @@ -267,19 +268,55 @@ def _massage_column_for_catalog(
dct['table_database'] = dct['table_schema']
return dct

def get_catalog(self, manifest: Manifest) -> agate.Table:
schemas = manifest.get_used_schemas()
columns = []
for database, schema in schemas:
relations = self.list_relations(database, schema)
for relation in relations:
def _get_catalog_for_relations(self, database: str, schema: str):
with self.connection_named(f'{database}.{schema}'):
columns = []
for relation in self.list_relations(database, schema):
logger.debug("Getting table schema for relation {}", relation)
columns.extend(
self._massage_column_for_catalog(col)
for col in self.get_columns_in_relation(relation)
)
return agate.Table.from_object(columns)

def _get_cache_schemas(self, manifest, exec_only=False):
info_schema_name_map = SchemaSearchMap()
for node in manifest.nodes.values():
if exec_only and node.resource_type not in NodeType.executable():
continue
relation = self.Relation.create(
database=node.database,
schema=node.schema,
identifier='information_schema',
quote_policy=self.config.quoting,
)
key = relation.information_schema_only()
info_schema_name_map[key] = {node.schema}
return info_schema_name_map

def _get_one_catalog(
self, information_schema, schemas, manifest,
) -> agate.Table:
name = f'{information_schema.database}.information_schema'

if len(schemas) != 1:
dbt.exceptions.raise_compiler_error(
'Expected only one schema in spark _get_one_catalog'
)

database = information_schema.database
schema = list(schemas)[0]

with self.connection_named(name):
columns = []
for relation in self.list_relations(database, schema):
logger.debug("Getting table schema for relation {}", relation)
columns.extend(
self._massage_column_for_catalog(col)
for col in self.get_columns_in_relation(relation)
)
return agate.Table.from_object(columns)

def check_schema_exists(self, database, schema):
results = self.execute_macro(
LIST_SCHEMAS_MACRO_NAME,
Expand Down