From f1bf30075704ed342dac933967aec6d60faab0e3 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Fri, 20 Dec 2019 07:30:03 -0700 Subject: [PATCH] Attempt getting snowflake to go faster and use less ram - Thread information schema queries, one per db - Pass schema list into catalog queries and filter on that in SQL - break the existing interface for get_catalog (sorry, not sorry) --- core/dbt/adapters/base/impl.py | 4 +- .../snowflake/dbt/adapters/snowflake/impl.py | 58 +++++++- .../dbt/include/snowflake/macros/catalog.sql | 132 +++++++++--------- 3 files changed, 125 insertions(+), 69 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index cbc06311553..5f7a09383f8 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -3,7 +3,7 @@ from datetime import datetime from typing import ( Optional, Tuple, Callable, Container, FrozenSet, Type, Dict, Any, List, - Mapping, Iterator, Union + Mapping, Iterator, Union, Set ) import agate @@ -117,7 +117,7 @@ def _relation_name(rel: Optional[BaseRelation]) -> str: return str(rel) -class SchemaSearchMap(dict): +class SchemaSearchMap(Dict[InformationSchema, Set[str]]): """A utility class to keep track of what information_schema tables to search for what schemas """ diff --git a/plugins/snowflake/dbt/adapters/snowflake/impl.py b/plugins/snowflake/dbt/adapters/snowflake/impl.py index a69a7d147b0..3e9bdfb0dfe 100644 --- a/plugins/snowflake/dbt/adapters/snowflake/impl.py +++ b/plugins/snowflake/dbt/adapters/snowflake/impl.py @@ -1,10 +1,18 @@ -from typing import Mapping, Any, Optional +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Mapping, Any, Optional, Set +import agate + +from dbt.adapters.base.relation import InformationSchema from dbt.adapters.sql import SQLAdapter from dbt.adapters.snowflake import SnowflakeConnectionManager from dbt.adapters.snowflake import SnowflakeRelation -from dbt.utils import filter_null_values +from dbt.contracts.graph.manifest import Manifest from dbt.exceptions import RuntimeException +from dbt.utils import filter_null_values + + +GET_CATALOG_MACRO_NAME = 'snowflake_get_catalog' class SnowflakeAdapter(SQLAdapter): @@ -69,6 +77,52 @@ def pre_model_hook(self, config: Mapping[str, Any]) -> Optional[str]: self._use_warehouse(warehouse) return previous + def _get_one_catalog( + self, + information_schema: InformationSchema, + schemas: Set[str], + manifest: Manifest, + ) -> agate.Table: + + name = '.'.join([ + str(information_schema.database), + 'information_schema' + ]) + + # calculate the possible schemas for a given schema name + all_schema_names: Set[str] = set() + for schema in schemas: + all_schema_names.update({schema, schema.lower(), schema.upper()}) + + with self.connection_named(name): + kwargs = { + 'information_schema': information_schema, + 'schemas': all_schema_names + } + table = self.execute_macro(GET_CATALOG_MACRO_NAME, + kwargs=kwargs, + release=True) + + results = self._catalog_filter_table(table, manifest) + return results + + def get_catalog(self, manifest: Manifest) -> agate.Table: + # snowflake is super slow. split it out into the specified threads + num_threads = self.config.threads + schema_map = self._get_cache_schemas(manifest) + catalogs: agate.Table = agate.Table(rows=[]) + + with ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [ + executor.submit(self._get_one_catalog, info, schemas, manifest) + for info, schemas in schema_map.items() if len(schemas) > 0 + ] + for future in as_completed(futures): + catalog = future.result() + catalogs = agate.Table.merge([catalogs, catalog]) + + return catalogs + def post_model_hook( self, config: Mapping[str, Any], context: Optional[str] ) -> None: diff --git a/plugins/snowflake/dbt/include/snowflake/macros/catalog.sql b/plugins/snowflake/dbt/include/snowflake/macros/catalog.sql index fe68cd2e46e..c1a4b4c4827 100644 --- a/plugins/snowflake/dbt/include/snowflake/macros/catalog.sql +++ b/plugins/snowflake/dbt/include/snowflake/macros/catalog.sql @@ -1,68 +1,70 @@ -{% macro snowflake__get_catalog(information_schemas) -%} - - {%- call statement('catalog', fetch_result=True) -%} - {% for information_schema in information_schemas %} - - ( - with tables as ( - - select - table_catalog as "table_database", - table_schema as "table_schema", - table_name as "table_name", - table_type as "table_type", - - -- note: this is the _role_ that owns the table - table_owner as "table_owner", - - 'Clustering Key' as "stats:clustering_key:label", - clustering_key as "stats:clustering_key:value", - 'The key used to cluster this table' as "stats:clustering_key:description", - (clustering_key is not null) as "stats:clustering_key:include", - - 'Row Count' as "stats:row_count:label", - row_count as "stats:row_count:value", - 'An approximate count of rows in this table' as "stats:row_count:description", - (row_count is not null) as "stats:row_count:include", - - 'Approximate Size' as "stats:bytes:label", - bytes as "stats:bytes:value", - 'Approximate size of the table as reported by Snowflake' as "stats:bytes:description", - (bytes is not null) as "stats:bytes:include" - - from {{ information_schema }}.tables - - ), - - columns as ( - - select - table_catalog as "table_database", - table_schema as "table_schema", - table_name as "table_name", - null as "table_comment", - - column_name as "column_name", - ordinal_position as "column_index", - data_type as "column_type", - null as "column_comment" - - from {{ information_schema }}.columns - - ) - - select * - from tables - join columns using ("table_database", "table_schema", "table_name") - where "table_schema" != 'INFORMATION_SCHEMA' - order by "column_index" - ) - {% if not loop.last %} union all {% endif %} - - {% endfor %} - {%- endcall -%} - - {{ return(load_result('catalog').table) }} +{% macro snowflake__get_catalog() %} +{# snowflake has a different argspec, because it filters by schema inside the database #} +{% do exceptions.raise_compiler_error('get_catalog is not valid on snowflake, use snowflake_get_catalog') %} +{% endmacro %} + + +{% macro snowflake_get_catalog(information_schema, schemas) -%} + {% set query %} + with tables as ( + + select + table_catalog as "table_database", + table_schema as "table_schema", + table_name as "table_name", + table_type as "table_type", + + -- note: this is the _role_ that owns the table + table_owner as "table_owner", + + 'Clustering Key' as "stats:clustering_key:label", + clustering_key as "stats:clustering_key:value", + 'The key used to cluster this table' as "stats:clustering_key:description", + (clustering_key is not null) as "stats:clustering_key:include", + + 'Row Count' as "stats:row_count:label", + row_count as "stats:row_count:value", + 'An approximate count of rows in this table' as "stats:row_count:description", + (row_count is not null) as "stats:row_count:include", + + 'Approximate Size' as "stats:bytes:label", + bytes as "stats:bytes:value", + 'Approximate size of the table as reported by Snowflake' as "stats:bytes:description", + (bytes is not null) as "stats:bytes:include" + + from {{ information_schema }}.tables + + ), + + columns as ( + + select + table_catalog as "table_database", + table_schema as "table_schema", + table_name as "table_name", + null as "table_comment", + + column_name as "column_name", + ordinal_position as "column_index", + data_type as "column_type", + null as "column_comment" + + from {{ information_schema }}.columns + ) + + select * + from tables + join columns using ("table_database", "table_schema", "table_name") + where "table_schema" != 'INFORMATION_SCHEMA' + and ( + {%- for schema in schemas -%} + "table_schema" = '{{ schema }}'{%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) + order by "column_index" + {%- endset -%} + + {{ return(run_query(query)) }} {%- endmacro %}