Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quote config #742

Merged
merged 20 commits into from
Apr 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 51 additions & 35 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,13 @@ def close(cls, connection):
return connection

@classmethod
def list_relations(cls, profile, schema, model_name=None):
def list_relations(cls, profile, project_cfg, schema, model_name=None):
connection = cls.get_connection(profile, model_name)
credentials = connection.get('credentials', {})
client = connection.get('handle')

bigquery_dataset = cls.get_dataset(profile, schema, model_name)
bigquery_dataset = cls.get_dataset(
profile, project_cfg, schema, model_name)
all_tables = client.list_tables(bigquery_dataset)

relation_types = {
Expand All @@ -194,25 +195,31 @@ def list_relations(cls, profile, schema, model_name=None):
project=credentials.get('project'),
schema=schema,
identifier=table.table_id,
quote_policy={
'schema': True,
'identifier': True
},
type=relation_types.get(table.table_type))
for table in all_tables]

@classmethod
def drop_relation(cls, profile, relation, model_name=None):
def drop_relation(cls, profile, project_cfg, relation, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, relation.schema, model_name)
dataset = cls.get_dataset(
profile, project_cfg, relation.schema, model_name)
relation_object = dataset.table(relation.identifier)
client.delete_table(relation_object)

@classmethod
def rename(cls, profile, schema, from_name, to_name, model_name=None):
def rename(cls, profile, project_cfg, schema,
from_name, to_name, model_name=None):
raise dbt.exceptions.NotImplementedException(
'`rename` is not implemented for this adapter!')

@classmethod
def rename_relation(cls, profile, from_relation, to_relation,
def rename_relation(cls, profile, project_cfg, from_relation, to_relation,
model_name=None):
raise dbt.exceptions.NotImplementedException(
'`rename_relation` is not implemented for this adapter!')
Expand All @@ -223,11 +230,11 @@ def get_timeout(cls, conn):
return credentials.get('timeout_seconds', cls.QUERY_TIMEOUT)

@classmethod
def materialize_as_view(cls, profile, dataset, model):
def materialize_as_view(cls, profile, project_cfg, dataset, model):
model_name = model.get('name')
model_sql = model.get('injected_sql')

conn = cls.get_connection(profile, model_name)
conn = cls.get_connection(profile, project_cfg, model_name)
client = conn.get('handle')

view_ref = dataset.table(model_name)
Expand Down Expand Up @@ -258,21 +265,22 @@ def poll_until_job_completes(cls, job, timeout):
raise job.exception()

@classmethod
def make_date_partitioned_table(cls, profile, dataset_name,
def make_date_partitioned_table(cls, profile, project_cfg, dataset_name,
identifier, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, dataset_name, identifier)
dataset = cls.get_dataset(profile, project_cfg,
dataset_name, identifier)
table_ref = dataset.table(identifier)
table = google.cloud.bigquery.Table(table_ref)
table.partitioning_type = 'DAY'

return client.create_table(table)

@classmethod
def materialize_as_table(cls, profile, dataset, model, model_sql,
decorator=None):
def materialize_as_table(cls, profile, project_cfg, dataset,
model, model_sql, decorator=None):
model_name = model.get('name')

conn = cls.get_connection(profile, model_name)
Expand All @@ -298,7 +306,8 @@ def materialize_as_table(cls, profile, dataset, model, model_sql,
return "CREATE TABLE"

@classmethod
def execute_model(cls, profile, model, materialization, sql_override=None,
def execute_model(cls, profile, project_cfg, model,
materialization, sql_override=None,
decorator=None, model_name=None):

if sql_override is None:
Expand All @@ -311,13 +320,15 @@ def execute_model(cls, profile, model, materialization, sql_override=None,
model_name = model.get('name')
model_schema = model.get('schema')

dataset = cls.get_dataset(profile, model_schema, model_name)
dataset = cls.get_dataset(profile, project_cfg,
model_schema, model_name)

if materialization == 'view':
res = cls.materialize_as_view(profile, dataset, model)
res = cls.materialize_as_view(profile, project_cfg, dataset, model)
elif materialization == 'table':
res = cls.materialize_as_table(profile, dataset, model,
sql_override, decorator)
res = cls.materialize_as_table(
profile, project_cfg, dataset, model,
sql_override, decorator)
else:
msg = "Invalid relation type: '{}'".format(materialization)
raise dbt.exceptions.RuntimeException(msg, model)
Expand Down Expand Up @@ -365,41 +376,42 @@ def add_begin_query(cls, profile, name):
'`add_begin_query` is not implemented for this adapter!')

@classmethod
def create_schema(cls, profile, schema, model_name=None):
def create_schema(cls, profile, project_cfg, schema, model_name=None):
logger.debug('Creating schema "%s".', schema)

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
dataset = cls.get_dataset(profile, project_cfg, schema, model_name)
with cls.exception_handler(profile, 'create dataset', model_name):
client.create_dataset(dataset)

@classmethod
def drop_tables_in_schema(cls, profile, dataset):
def drop_tables_in_schema(cls, profile, project_cfg, dataset):
conn = cls.get_connection(profile)
client = conn.get('handle')

for table in client.list_tables(dataset):
client.delete_table(table.reference)

@classmethod
def drop_schema(cls, profile, schema, model_name=None):
def drop_schema(cls, profile, project_cfg, schema, model_name=None):
logger.debug('Dropping schema "%s".', schema)

if not cls.check_schema_exists(profile, schema, model_name):
if not cls.check_schema_exists(profile, project_cfg,
schema, model_name):
return

conn = cls.get_connection(profile)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
dataset = cls.get_dataset(profile, project_cfg, schema, model_name)
with cls.exception_handler(profile, 'drop dataset', model_name):
cls.drop_tables_in_schema(profile, dataset)
cls.drop_tables_in_schema(profile, project_cfg, dataset)
client.delete_dataset(dataset)

@classmethod
def get_existing_schemas(cls, profile, model_name=None):
def get_existing_schemas(cls, profile, project_cfg, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand All @@ -408,7 +420,8 @@ def get_existing_schemas(cls, profile, model_name=None):
return [ds.dataset_id for ds in all_datasets]

@classmethod
def get_columns_in_table(cls, profile, schema_name, table_name,
def get_columns_in_table(cls, profile, project_cfg,
schema_name, table_name,
database=None, model_name=None):

# BigQuery does not have databases -- the database parameter is here
Expand All @@ -435,7 +448,8 @@ def get_columns_in_table(cls, profile, schema_name, table_name,
return columns

@classmethod
def check_schema_exists(cls, profile, schema, model_name=None):
def check_schema_exists(cls, profile, project_cfg,
schema, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand All @@ -444,7 +458,7 @@ def check_schema_exists(cls, profile, schema, model_name=None):
return any([ds.dataset_id == schema for ds in all_datasets])

@classmethod
def get_dataset(cls, profile, dataset_name, model_name=None):
def get_dataset(cls, profile, project_cfg, dataset_name, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

Expand Down Expand Up @@ -475,13 +489,14 @@ def quote(cls, identifier):
return '`{}`'.format(identifier)

@classmethod
def quote_schema_and_table(cls, profile, schema, table, model_name=None):
return cls.render_relation(profile,
def quote_schema_and_table(cls, profile, project_cfg, schema,
table, model_name=None):
return cls.render_relation(profile, project_cfg,
cls.quote(schema),
cls.quote(table))

@classmethod
def render_relation(cls, profile, schema, table):
def render_relation(cls, profile, project_cfg, schema, table):
connection = cls.get_connection(profile)
credentials = connection.get('credentials', {})
project = credentials.get('project')
Expand Down Expand Up @@ -515,10 +530,11 @@ def _agate_to_schema(cls, agate_table, column_override):
return bq_schema

@classmethod
def load_dataframe(cls, profile, schema, table_name, agate_table,
def load_dataframe(cls, profile, project_cfg, schema,
table_name, agate_table,
column_override, model_name=None):
bq_schema = cls._agate_to_schema(agate_table, column_override)
dataset = cls.get_dataset(profile, schema, None)
dataset = cls.get_dataset(profile, project_cfg, schema, None)
table = dataset.table(table_name)
conn = cls.get_connection(profile, None)
client = conn.get('handle')
Expand All @@ -535,7 +551,7 @@ def load_dataframe(cls, profile, schema, table_name, agate_table,
cls.poll_until_job_completes(job, cls.get_timeout(conn))

@classmethod
def expand_target_column_types(cls, profile, temp_table, to_schema,
to_table, model_name=None):
def expand_target_column_types(cls, profile, project_cfg, temp_table,
to_schema, to_table, model_name=None):
# This is a no-op on BigQuery
pass
Loading