Skip to content

Commit

Permalink
Merge branch 'development' into feature/model-aliasing
Browse files Browse the repository at this point in the history
  • Loading branch information
abelsonlive authored Feb 22, 2018
2 parents b11ebb0 + 4eb75ec commit 9fd89d7
Show file tree
Hide file tree
Showing 53 changed files with 1,298 additions and 521 deletions.
211 changes: 143 additions & 68 deletions dbt/adapters/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import google.cloud.bigquery

import time
import uuid


class BigQueryAdapter(PostgresAdapter):
Expand All @@ -27,7 +26,8 @@ class BigQueryAdapter(PostgresAdapter):
"execute_model",
"drop",
"execute",
"quote_schema_and_table"
"quote_schema_and_table",
"make_date_partitioned_table"
]

SCOPE = ('https://www.googleapis.com/auth/bigquery',
Expand All @@ -40,7 +40,9 @@ class BigQueryAdapter(PostgresAdapter):
def handle_error(cls, error, message, sql):
logger.debug(message.format(sql=sql))
logger.debug(error)
error_msg = "\n".join([error['message'] for error in error.errors])
error_msg = "\n".join(
[item['message'] for item in error.errors])

raise dbt.exceptions.DatabaseException(error_msg)

@classmethod
Expand Down Expand Up @@ -148,27 +150,33 @@ def query_for_existing(cls, profile, schemas, model_name=None):
if not isinstance(schemas, (list, tuple)):
schemas = [schemas]

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

all_tables = []
for schema in schemas:
dataset = cls.get_dataset(profile, schema, model_name)
all_tables.extend(dataset.list_tables())
all_tables.extend(client.list_tables(dataset))

relation_type_lookup = {
relation_types = {
'TABLE': 'table',
'VIEW': 'view',
'EXTERNAL': 'external'
}

existing = [(table.name, relation_type_lookup.get(table.table_type))
existing = [(table.table_id, relation_types.get(table.table_type))
for table in all_tables]

return dict(existing)

@classmethod
def drop(cls, profile, schema, relation, relation_type, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
relation_object = dataset.table(relation)
relation_object.delete()
client.delete_table(relation_object)

@classmethod
def rename(cls, profile, schema, from_name, to_name, model_name=None):
Expand All @@ -181,19 +189,22 @@ def get_timeout(cls, conn):
return credentials.get('timeout_seconds', cls.QUERY_TIMEOUT)

@classmethod
def materialize_as_view(cls, profile, dataset, model_name, model_sql):
view = dataset.table(model_name)
def materialize_as_view(cls, profile, dataset, model):
model_name = model.get('name')
model_sql = model.get('injected_sql')

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

view_ref = dataset.table(model_name)
view = google.cloud.bigquery.Table(view_ref)
view.view_query = model_sql
view.view_use_legacy_sql = False

logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql))

with cls.exception_handler(profile, model_sql, model_name, model_name):
view.create()

if view.created is None:
msg = "Error creating view {}".format(model_name)
raise dbt.exceptions.RuntimeException(msg)
client.create_table(view)

return "CREATE VIEW"

Expand All @@ -213,29 +224,51 @@ def poll_until_job_completes(cls, job, timeout):
raise job.exception()

@classmethod
def materialize_as_table(cls, profile, dataset, model_name, model_sql):
def make_date_partitioned_table(cls, profile, dataset_name, identifier,
model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

table = dataset.table(model_name)
job_id = 'dbt-create-{}-{}'.format(model_name, uuid.uuid4())
job = client.run_async_query(job_id, model_sql)
job.use_legacy_sql = False
job.destination = table
job.write_disposition = 'WRITE_TRUNCATE'
job.begin()
dataset = cls.get_dataset(profile, dataset_name, identifier)
table_ref = dataset.table(identifier)
table = google.cloud.bigquery.Table(table_ref)
table.partitioning_type = 'DAY'

cls.release_connection(profile, model_name)
return client.create_table(table)

logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql))
@classmethod
def materialize_as_table(cls, profile, dataset, model, model_sql,
decorator=None):
model_name = model.get('name')

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

if decorator is None:
table_name = model_name
else:
table_name = "{}${}".format(model_name, decorator)

table_ref = dataset.table(table_name)
job_config = google.cloud.bigquery.QueryJobConfig()
job_config.destination = table_ref
job_config.write_disposition = 'WRITE_TRUNCATE'

logger.debug("Model SQL ({}):\n{}".format(table_name, model_sql))
query_job = client.query(model_sql, job_config=job_config)

# this waits for the job to complete
with cls.exception_handler(profile, model_sql, model_name, model_name):
cls.poll_until_job_completes(job, cls.get_timeout(conn))
query_job.result(timeout=cls.get_timeout(conn))

return "CREATE TABLE"

@classmethod
def execute_model(cls, profile, model, materialization, model_name=None):
def execute_model(cls, profile, model, materialization, sql_override=None,
decorator=None, model_name=None):

if sql_override is None:
sql_override = model.get('injected_sql')

if flags.STRICT_MODE:
connection = cls.get_connection(profile, model.get('name'))
Expand All @@ -244,55 +277,42 @@ def execute_model(cls, profile, model, materialization, model_name=None):

model_name = model.get('name')
model_schema = model.get('schema')
model_sql = model.get('injected_sql')

dataset = cls.get_dataset(profile, model_schema, model_name)

if materialization == 'view':
res = cls.materialize_as_view(profile, dataset, model_name,
model_sql)
res = cls.materialize_as_view(profile, dataset, model)
elif materialization == 'table':
res = cls.materialize_as_table(profile, dataset, model_name,
model_sql)
res = cls.materialize_as_table(profile, dataset, model,
sql_override, decorator)
else:
msg = "Invalid relation type: '{}'".format(materialization)
raise dbt.exceptions.RuntimeException(msg, model)

return res

@classmethod
def fetch_query_results(cls, query):
all_rows = []

rows = query.rows
token = query.page_token

while True:
all_rows.extend(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(page_token=token)
return all_rows

@classmethod
def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

query = client.run_sync_query(sql)
query.timeout_ms = cls.get_timeout(conn) * 1000
query.use_legacy_sql = False

debug_message = "Fetching data for query {}:\n{}"
logger.debug(debug_message.format(model_name, sql))

query.run()
job_config = google.cloud.bigquery.QueryJobConfig()
job_config.use_legacy_sql = False
query_job = client.query(sql, job_config)

# this blocks until the query has completed
with cls.exception_handler(profile, 'create dataset', model_name):
iterator = query_job.result()

res = []
if fetch:
res = cls.fetch_query_results(query)
res = list(iterator)

status = 'ERROR' if query.errors else 'OK'
# If we get here, the query succeeded
status = 'OK'
return status, res

@classmethod
Expand All @@ -308,15 +328,20 @@ def add_begin_query(cls, profile, name):
def create_schema(cls, profile, schema, model_name=None):
logger.debug('Creating schema "%s".', schema)

dataset = cls.get_dataset(profile, schema, model_name)
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
with cls.exception_handler(profile, 'create dataset', model_name):
dataset.create()
client.create_dataset(dataset)

@classmethod
def drop_tables_in_schema(cls, dataset):
for table in dataset.list_tables():
table.delete()
def drop_tables_in_schema(cls, profile, dataset):
conn = cls.get_connection(profile)
client = conn.get('handle')

for table in client.list_tables(dataset):
client.delete_table(table.reference)

@classmethod
def drop_schema(cls, profile, schema, model_name=None):
Expand All @@ -325,21 +350,22 @@ def drop_schema(cls, profile, schema, model_name=None):
if not cls.check_schema_exists(profile, schema, model_name):
return

dataset = cls.get_dataset(profile, schema, model_name)
conn = cls.get_connection(profile)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
with cls.exception_handler(profile, 'drop dataset', model_name):
cls.drop_tables_in_schema(dataset)
dataset.delete()
cls.drop_tables_in_schema(profile, dataset)
client.delete_dataset(dataset)

@classmethod
def get_existing_schemas(cls, profile, model_name=None):
conn = cls.get_connection(profile, model_name)

client = conn.get('handle')

with cls.exception_handler(profile, 'list dataset', model_name):
all_datasets = client.list_datasets()
return [ds.name for ds in all_datasets]
return [ds.dataset_id for ds in all_datasets]

@classmethod
def get_columns_in_table(cls, profile, schema_name, table_name,
Expand All @@ -350,20 +376,19 @@ def get_columns_in_table(cls, profile, schema_name, table_name,
@classmethod
def check_schema_exists(cls, profile, schema, model_name=None):
conn = cls.get_connection(profile, model_name)

client = conn.get('handle')

with cls.exception_handler(profile, 'get dataset', model_name):
all_datasets = client.list_datasets()
return any([ds.name == schema for ds in all_datasets])
return any([ds.dataset_id == schema for ds in all_datasets])

@classmethod
def get_dataset(cls, profile, dataset_name, model_name=None):
conn = cls.get_connection(profile, model_name)

client = conn.get('handle')
dataset = client.dataset(dataset_name)
return dataset

dataset_ref = client.dataset(dataset_name)
return google.cloud.bigquery.Dataset(dataset_ref)

@classmethod
def warning_on_hooks(cls, hook_type):
Expand All @@ -372,7 +397,8 @@ def warning_on_hooks(cls, hook_type):
dbt.ui.printer.COLOR_FG_YELLOW)

@classmethod
def add_query(cls, profile, sql, model_name=None, auto_begin=True):
def add_query(cls, profile, sql, model_name=None, auto_begin=True,
bindings=None):
if model_name in ['on-run-start', 'on-run-end']:
cls.warning_on_hooks(model_name)
else:
Expand All @@ -395,3 +421,52 @@ def quote_schema_and_table(cls, profile, schema, table, model_name=None):
return '{}.{}.{}'.format(cls.quote(project),
cls.quote(schema),
cls.quote(table))

@classmethod
def convert_text_type(cls, agate_table, col_idx):
return "string"

@classmethod
def convert_number_type(cls, agate_table, col_idx):
import agate
decimals = agate_table.aggregate(agate.MaxPrecision(col_idx))
return "float64" if decimals else "int64"

@classmethod
def convert_boolean_type(cls, agate_table, col_idx):
return "bool"

@classmethod
def convert_datetime_type(cls, agate_table, col_idx):
return "datetime"

@classmethod
def create_csv_table(cls, profile, schema, table_name, agate_table):
pass

@classmethod
def reset_csv_table(cls, profile, schema, table_name, agate_table,
full_refresh=False):
cls.drop(profile, schema, table_name, "table")

@classmethod
def _agate_to_schema(cls, agate_table):
bq_schema = []
for idx, col_name in enumerate(agate_table.column_names):
type_ = cls.convert_agate_type(agate_table, idx)
bq_schema.append(
google.cloud.bigquery.SchemaField(col_name, type_))
return bq_schema

@classmethod
def load_csv_rows(cls, profile, schema, table_name, agate_table):
bq_schema = cls._agate_to_schema(agate_table)
dataset = cls.get_dataset(profile, schema, None)
table = dataset.table(table_name, schema=bq_schema)
conn = cls.get_connection(profile, None)
client = conn.get('handle')
with open(agate_table.original_abspath, "rb") as f:
job = table.upload_from_file(f, "CSV", rewind=True,
client=client, skip_leading_rows=1)
with cls.exception_handler(profile, "LOAD TABLE"):
cls.poll_until_job_completes(job, cls.get_timeout(conn))
Loading

0 comments on commit 9fd89d7

Please sign in to comment.