Skip to content

Commit

Permalink
migrate other adapters to use the snowflake technique
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Jan 9, 2020
1 parent f1bf300 commit 3f09b30
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 287 deletions.
56 changes: 42 additions & 14 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager
from datetime import datetime
from typing import (
Expand Down Expand Up @@ -997,25 +998,52 @@ def _catalog_filter_table(
"""
return table.where(_catalog_filter_schemas(manifest))

def _get_catalog_information_schemas(
self, manifest: Manifest
) -> List[InformationSchema]:
return list(self._get_cache_schemas(manifest).keys())
def _get_one_catalog(
self,
information_schema: InformationSchema,
schemas: Set[str],
manifest: Manifest,
) -> agate.Table:

def get_catalog(self, manifest: Manifest) -> agate.Table:
"""Get the catalog for this manifest by running the get catalog macro.
Returns an agate.Table of catalog information.
"""
information_schemas = self._get_catalog_information_schemas(manifest)
# make it a list so macros can index into it.
kwargs = {'information_schemas': information_schemas}
table = self.execute_macro(GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
release=True)
name = '.'.join([
str(information_schema.database),
'information_schema'
])

# calculate the possible schemas for a given schema name
all_schema_names: Set[str] = set()
for schema in schemas:
all_schema_names.update({schema, schema.lower(), schema.upper()})

with self.connection_named(name):
kwargs = {
'information_schema': information_schema,
'schemas': all_schema_names
}
table = self.execute_macro(GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
release=True)

results = self._catalog_filter_table(table, manifest)
return results

def get_catalog(self, manifest: Manifest) -> agate.Table:
# snowflake is super slow. split it out into the specified threads
num_threads = self.config.threads
schema_map = self._get_cache_schemas(manifest)
catalogs: agate.Table = agate.Table(rows=[])

with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(self._get_one_catalog, info, schemas, manifest)
for info, schemas in schema_map.items() if len(schemas) > 0
]
for future in as_completed(futures):
catalog = future.result()
catalogs = agate.Table.merge([catalogs, catalog])

return catalogs

def cancel_open_connections(self):
"""Cancel all open connections."""
return self.connections.cancel_open()
Expand Down
8 changes: 4 additions & 4 deletions core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,18 @@
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create view {{ relation }} as (
{{ sql }}
);
{% endmacro %}


{% macro get_catalog(information_schemas) -%}
{{ return(adapter_macro('get_catalog', information_schemas)) }}
{% macro get_catalog(information_schema, schemas) -%}
{{ return(adapter_macro('get_catalog', information_schema, schemas)) }}
{%- endmacro %}

{% macro default__get_catalog(information_schemas) -%}
{% macro default__get_catalog(information_schema, schemas) -%}

{% set typename = adapter.type() %}
{% set msg -%}
Expand Down
22 changes: 11 additions & 11 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Optional, Any
from typing import Dict, List, Optional, Any, Set

import dbt.deprecations
import dbt.exceptions
Expand All @@ -7,6 +7,7 @@
import dbt.clients.agate_helper

from dbt.adapters.base import BaseAdapter, available, RelationType
from dbt.adapters.base.impl import SchemaSearchMap
from dbt.adapters.bigquery.relation import (
BigQueryRelation, BigQueryInformationSchema
)
Expand Down Expand Up @@ -492,23 +493,22 @@ def _catalog_filter_table(
})
return super()._catalog_filter_table(table, manifest)

def _get_catalog_information_schemas(
self, manifest: Manifest
) -> List[BigQueryInformationSchema]:
def _get_cache_schemas(
self, manifest: Manifest, exec_only: bool = False
) -> SchemaSearchMap:
candidates = super()._get_cache_schemas(manifest, exec_only)
db_schemas: Dict[str, Set[str]] = {}
result = SchemaSearchMap()

candidates = super()._get_catalog_information_schemas(manifest)
information_schemas = []
db_schemas = {}
for candidate in candidates:
for candidate, schemas in candidates.items():
database = candidate.database
if database not in db_schemas:
db_schemas[database] = set(self.list_schemas(database))
if candidate.schema in db_schemas[database]:
information_schemas.append(candidate)
result[candidate] = schemas
else:
logger.debug(
'Skipping catalog for {}.{} - schema does not exist'
.format(database, candidate.schema)
)

return information_schemas
return result
Loading

0 comments on commit 3f09b30

Please sign in to comment.