Skip to content

Commit

Permalink
Attempt getting snowflake to go faster and use less ram
Browse files Browse the repository at this point in the history
- Thread information schema queries, one per db
- Pass schema list into catalog queries and filter on that in SQL
- break the existing interface for get_catalog (sorry, not sorry)
  • Loading branch information
Jacob Beck committed Jan 9, 2020
1 parent a348c3f commit f1bf300
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 69 deletions.
4 changes: 2 additions & 2 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime
from typing import (
Optional, Tuple, Callable, Container, FrozenSet, Type, Dict, Any, List,
Mapping, Iterator, Union
Mapping, Iterator, Union, Set
)

import agate
Expand Down Expand Up @@ -117,7 +117,7 @@ def _relation_name(rel: Optional[BaseRelation]) -> str:
return str(rel)


class SchemaSearchMap(dict):
class SchemaSearchMap(Dict[InformationSchema, Set[str]]):
"""A utility class to keep track of what information_schema tables to
search for what schemas
"""
Expand Down
58 changes: 56 additions & 2 deletions plugins/snowflake/dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from typing import Mapping, Any, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Mapping, Any, Optional, Set

import agate

from dbt.adapters.base.relation import InformationSchema
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.snowflake import SnowflakeConnectionManager
from dbt.adapters.snowflake import SnowflakeRelation
from dbt.utils import filter_null_values
from dbt.contracts.graph.manifest import Manifest
from dbt.exceptions import RuntimeException
from dbt.utils import filter_null_values


GET_CATALOG_MACRO_NAME = 'snowflake_get_catalog'


class SnowflakeAdapter(SQLAdapter):
Expand Down Expand Up @@ -69,6 +77,52 @@ def pre_model_hook(self, config: Mapping[str, Any]) -> Optional[str]:
self._use_warehouse(warehouse)
return previous

def _get_one_catalog(
self,
information_schema: InformationSchema,
schemas: Set[str],
manifest: Manifest,
) -> agate.Table:

name = '.'.join([
str(information_schema.database),
'information_schema'
])

# calculate the possible schemas for a given schema name
all_schema_names: Set[str] = set()
for schema in schemas:
all_schema_names.update({schema, schema.lower(), schema.upper()})

with self.connection_named(name):
kwargs = {
'information_schema': information_schema,
'schemas': all_schema_names
}
table = self.execute_macro(GET_CATALOG_MACRO_NAME,
kwargs=kwargs,
release=True)

results = self._catalog_filter_table(table, manifest)
return results

def get_catalog(self, manifest: Manifest) -> agate.Table:
# snowflake is super slow. split it out into the specified threads
num_threads = self.config.threads
schema_map = self._get_cache_schemas(manifest)
catalogs: agate.Table = agate.Table(rows=[])

with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(self._get_one_catalog, info, schemas, manifest)
for info, schemas in schema_map.items() if len(schemas) > 0
]
for future in as_completed(futures):
catalog = future.result()
catalogs = agate.Table.merge([catalogs, catalog])

return catalogs

def post_model_hook(
self, config: Mapping[str, Any], context: Optional[str]
) -> None:
Expand Down
132 changes: 67 additions & 65 deletions plugins/snowflake/dbt/include/snowflake/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -1,68 +1,70 @@

{% macro snowflake__get_catalog(information_schemas) -%}

{%- call statement('catalog', fetch_result=True) -%}
{% for information_schema in information_schemas %}

(
with tables as (

select
table_catalog as "table_database",
table_schema as "table_schema",
table_name as "table_name",
table_type as "table_type",

-- note: this is the _role_ that owns the table
table_owner as "table_owner",

'Clustering Key' as "stats:clustering_key:label",
clustering_key as "stats:clustering_key:value",
'The key used to cluster this table' as "stats:clustering_key:description",
(clustering_key is not null) as "stats:clustering_key:include",

'Row Count' as "stats:row_count:label",
row_count as "stats:row_count:value",
'An approximate count of rows in this table' as "stats:row_count:description",
(row_count is not null) as "stats:row_count:include",

'Approximate Size' as "stats:bytes:label",
bytes as "stats:bytes:value",
'Approximate size of the table as reported by Snowflake' as "stats:bytes:description",
(bytes is not null) as "stats:bytes:include"

from {{ information_schema }}.tables

),

columns as (

select
table_catalog as "table_database",
table_schema as "table_schema",
table_name as "table_name",
null as "table_comment",

column_name as "column_name",
ordinal_position as "column_index",
data_type as "column_type",
null as "column_comment"

from {{ information_schema }}.columns

)

select *
from tables
join columns using ("table_database", "table_schema", "table_name")
where "table_schema" != 'INFORMATION_SCHEMA'
order by "column_index"
)
{% if not loop.last %} union all {% endif %}

{% endfor %}
{%- endcall -%}

{{ return(load_result('catalog').table) }}
{% macro snowflake__get_catalog() %}
{# snowflake has a different argspec, because it filters by schema inside the database #}
{% do exceptions.raise_compiler_error('get_catalog is not valid on snowflake, use snowflake_get_catalog') %}
{% endmacro %}


{% macro snowflake_get_catalog(information_schema, schemas) -%}
{% set query %}
with tables as (

select
table_catalog as "table_database",
table_schema as "table_schema",
table_name as "table_name",
table_type as "table_type",

-- note: this is the _role_ that owns the table
table_owner as "table_owner",

'Clustering Key' as "stats:clustering_key:label",
clustering_key as "stats:clustering_key:value",
'The key used to cluster this table' as "stats:clustering_key:description",
(clustering_key is not null) as "stats:clustering_key:include",

'Row Count' as "stats:row_count:label",
row_count as "stats:row_count:value",
'An approximate count of rows in this table' as "stats:row_count:description",
(row_count is not null) as "stats:row_count:include",

'Approximate Size' as "stats:bytes:label",
bytes as "stats:bytes:value",
'Approximate size of the table as reported by Snowflake' as "stats:bytes:description",
(bytes is not null) as "stats:bytes:include"

from {{ information_schema }}.tables

),

columns as (

select
table_catalog as "table_database",
table_schema as "table_schema",
table_name as "table_name",
null as "table_comment",

column_name as "column_name",
ordinal_position as "column_index",
data_type as "column_type",
null as "column_comment"

from {{ information_schema }}.columns
)

select *
from tables
join columns using ("table_database", "table_schema", "table_name")
where "table_schema" != 'INFORMATION_SCHEMA'
and (
{%- for schema in schemas -%}
"table_schema" = '{{ schema }}'{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
order by "column_index"
{%- endset -%}

{{ return(run_query(query)) }}

{%- endmacro %}

0 comments on commit f1bf300

Please sign in to comment.