Skip to content

Commit

Permalink
Bq date partitioning (dbt-labs#641)
Browse files Browse the repository at this point in the history
* first cut of date partitioning

* cleanup, implement partitioning in materialization

* update requirements.txt

* wip for date partitioning with range

* log data

* arg handling, logging, cleanup + view compat for new bq version

* add partitioning tests, compatibility with bq 0.29.0 release

* pep8

* fix for strange error in appveyor

* debug appveyor...

* dumb

* debugging weird bq adapter use in pg test

* do not use read_project in bq tests

* cleanup connections, initialize bq tests

* remove debug lines

* fix integration tests (actually)

* warning for view creation which clobbers tables

* add query timeout example for bq

* no need to release connections in the adapter

* partition_date interface change (wip)

* list of dates for bq dp tables

* tiny fixes for crufty dbt_project.yml files

* rm debug line

* fix tests
  • Loading branch information
drewbanin authored Feb 12, 2018
1 parent 0372fef commit 4eb75ec
Show file tree
Hide file tree
Showing 21 changed files with 363 additions and 94 deletions.
156 changes: 89 additions & 67 deletions dbt/adapters/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import google.cloud.bigquery

import time
import uuid


class BigQueryAdapter(PostgresAdapter):
Expand All @@ -27,7 +26,8 @@ class BigQueryAdapter(PostgresAdapter):
"execute_model",
"drop",
"execute",
"quote_schema_and_table"
"quote_schema_and_table",
"make_date_partitioned_table"
]

SCOPE = ('https://www.googleapis.com/auth/bigquery',
Expand Down Expand Up @@ -150,27 +150,33 @@ def query_for_existing(cls, profile, schemas, model_name=None):
if not isinstance(schemas, (list, tuple)):
schemas = [schemas]

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

all_tables = []
for schema in schemas:
dataset = cls.get_dataset(profile, schema, model_name)
all_tables.extend(dataset.list_tables())
all_tables.extend(client.list_tables(dataset))

relation_type_lookup = {
relation_types = {
'TABLE': 'table',
'VIEW': 'view',
'EXTERNAL': 'external'
}

existing = [(table.name, relation_type_lookup.get(table.table_type))
existing = [(table.table_id, relation_types.get(table.table_type))
for table in all_tables]

return dict(existing)

@classmethod
def drop(cls, profile, schema, relation, relation_type, model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
relation_object = dataset.table(relation)
relation_object.delete()
client.delete_table(relation_object)

@classmethod
def rename(cls, profile, schema, from_name, to_name, model_name=None):
Expand All @@ -183,19 +189,22 @@ def get_timeout(cls, conn):
return credentials.get('timeout_seconds', cls.QUERY_TIMEOUT)

@classmethod
def materialize_as_view(cls, profile, dataset, model_name, model_sql):
view = dataset.table(model_name)
def materialize_as_view(cls, profile, dataset, model):
model_name = model.get('name')
model_sql = model.get('injected_sql')

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

view_ref = dataset.table(model_name)
view = google.cloud.bigquery.Table(view_ref)
view.view_query = model_sql
view.view_use_legacy_sql = False

logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql))

with cls.exception_handler(profile, model_sql, model_name, model_name):
view.create()

if view.created is None:
msg = "Error creating view {}".format(model_name)
raise dbt.exceptions.RuntimeException(msg)
client.create_table(view)

return "CREATE VIEW"

Expand All @@ -215,86 +224,94 @@ def poll_until_job_completes(cls, job, timeout):
raise job.exception()

@classmethod
def materialize_as_table(cls, profile, dataset, model_name, model_sql):
def make_date_partitioned_table(cls, profile, dataset_name, identifier,
model_name=None):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

table = dataset.table(model_name)
job_id = 'dbt-create-{}-{}'.format(model_name, uuid.uuid4())
job = client.run_async_query(job_id, model_sql)
job.use_legacy_sql = False
job.destination = table
job.write_disposition = 'WRITE_TRUNCATE'
job.begin()
dataset = cls.get_dataset(profile, dataset_name, identifier)
table_ref = dataset.table(identifier)
table = google.cloud.bigquery.Table(table_ref)
table.partitioning_type = 'DAY'

cls.release_connection(profile, model_name)
return client.create_table(table)

logger.debug("Model SQL ({}):\n{}".format(model_name, model_sql))
@classmethod
def materialize_as_table(cls, profile, dataset, model, model_sql,
decorator=None):
model_name = model.get('name')

conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

if decorator is None:
table_name = model_name
else:
table_name = "{}${}".format(model_name, decorator)

table_ref = dataset.table(table_name)
job_config = google.cloud.bigquery.QueryJobConfig()
job_config.destination = table_ref
job_config.write_disposition = 'WRITE_TRUNCATE'

logger.debug("Model SQL ({}):\n{}".format(table_name, model_sql))
query_job = client.query(model_sql, job_config=job_config)

# this waits for the job to complete
with cls.exception_handler(profile, model_sql, model_name, model_name):
cls.poll_until_job_completes(job, cls.get_timeout(conn))
query_job.result(timeout=cls.get_timeout(conn))

return "CREATE TABLE"

@classmethod
def execute_model(cls, profile, model, materialization, model_name=None):
def execute_model(cls, profile, model, materialization, sql_override=None,
decorator=None, model_name=None):

if sql_override is None:
sql_override = model.get('injected_sql')

if flags.STRICT_MODE:
connection = cls.get_connection(profile, model.get('name'))
validate_connection(connection)
cls.release_connection(profile, model.get('name'))

model_name = model.get('name')
model_schema = model.get('schema')
model_sql = model.get('injected_sql')

dataset = cls.get_dataset(profile, model_schema, model_name)

if materialization == 'view':
res = cls.materialize_as_view(profile, dataset, model_name,
model_sql)
res = cls.materialize_as_view(profile, dataset, model)
elif materialization == 'table':
res = cls.materialize_as_table(profile, dataset, model_name,
model_sql)
res = cls.materialize_as_table(profile, dataset, model,
sql_override, decorator)
else:
msg = "Invalid relation type: '{}'".format(materialization)
raise dbt.exceptions.RuntimeException(msg, model)

return res

@classmethod
def fetch_query_results(cls, query):
all_rows = []

rows = query.rows
token = query.page_token

while True:
all_rows.extend(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(page_token=token)
return all_rows

@classmethod
def execute(cls, profile, sql, model_name=None, fetch=False, **kwargs):
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

query = client.run_sync_query(sql)
query.timeout_ms = cls.get_timeout(conn) * 1000
query.use_legacy_sql = False

debug_message = "Fetching data for query {}:\n{}"
logger.debug(debug_message.format(model_name, sql))

query.run()
job_config = google.cloud.bigquery.QueryJobConfig()
job_config.use_legacy_sql = False
query_job = client.query(sql, job_config)

# this blocks until the query has completed
with cls.exception_handler(profile, 'create dataset', model_name):
iterator = query_job.result()

res = []
if fetch:
res = cls.fetch_query_results(query)
res = list(iterator)

status = 'ERROR' if query.errors else 'OK'
# If we get here, the query succeeded
status = 'OK'
return status, res

@classmethod
Expand All @@ -310,15 +327,20 @@ def add_begin_query(cls, profile, name):
def create_schema(cls, profile, schema, model_name=None):
logger.debug('Creating schema "%s".', schema)

dataset = cls.get_dataset(profile, schema, model_name)
conn = cls.get_connection(profile, model_name)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
with cls.exception_handler(profile, 'create dataset', model_name):
dataset.create()
client.create_dataset(dataset)

@classmethod
def drop_tables_in_schema(cls, dataset):
for table in dataset.list_tables():
table.delete()
def drop_tables_in_schema(cls, profile, dataset):
conn = cls.get_connection(profile)
client = conn.get('handle')

for table in client.list_tables(dataset):
client.delete_table(table.reference)

@classmethod
def drop_schema(cls, profile, schema, model_name=None):
Expand All @@ -327,21 +349,22 @@ def drop_schema(cls, profile, schema, model_name=None):
if not cls.check_schema_exists(profile, schema, model_name):
return

dataset = cls.get_dataset(profile, schema, model_name)
conn = cls.get_connection(profile)
client = conn.get('handle')

dataset = cls.get_dataset(profile, schema, model_name)
with cls.exception_handler(profile, 'drop dataset', model_name):
cls.drop_tables_in_schema(dataset)
dataset.delete()
cls.drop_tables_in_schema(profile, dataset)
client.delete_dataset(dataset)

@classmethod
def get_existing_schemas(cls, profile, model_name=None):
conn = cls.get_connection(profile, model_name)

client = conn.get('handle')

with cls.exception_handler(profile, 'list dataset', model_name):
all_datasets = client.list_datasets()
return [ds.name for ds in all_datasets]
return [ds.dataset_id for ds in all_datasets]

@classmethod
def get_columns_in_table(cls, profile, schema_name, table_name,
Expand All @@ -352,20 +375,19 @@ def get_columns_in_table(cls, profile, schema_name, table_name,
@classmethod
def check_schema_exists(cls, profile, schema, model_name=None):
conn = cls.get_connection(profile, model_name)

client = conn.get('handle')

with cls.exception_handler(profile, 'get dataset', model_name):
all_datasets = client.list_datasets()
return any([ds.name == schema for ds in all_datasets])
return any([ds.dataset_id == schema for ds in all_datasets])

@classmethod
def get_dataset(cls, profile, dataset_name, model_name=None):
conn = cls.get_connection(profile, model_name)

client = conn.get('handle')
dataset = client.dataset(dataset_name)
return dataset

dataset_ref = client.dataset(dataset_name)
return google.cloud.bigquery.Dataset(dataset_ref)

@classmethod
def warning_on_hooks(cls, hook_type):
Expand Down
20 changes: 18 additions & 2 deletions dbt/context/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import os
import pytz
import voluptuous

from dbt.adapters.factory import get_adapter
Expand All @@ -17,6 +16,12 @@
from dbt.logger import GLOBAL_LOGGER as logger # noqa


# These modules are added to the context. Consider alternative
# approaches which will extend well to potentially many modules
import pytz
import datetime


class DatabaseWrapper(object):
"""
Wrapper for runtime database interaction. Should only call adapter
Expand Down Expand Up @@ -248,6 +253,15 @@ def tojson(value, default=None):
return default


def try_or_compiler_error(model):
def impl(message_if_exception, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
dbt.exceptions.raise_compiler_error(message_if_exception, model)
return impl


def _return(value):
raise dbt.exceptions.MacroReturn(value)

Expand Down Expand Up @@ -291,6 +305,7 @@ def generate(model, project, flat_graph, provider=None):
"model": model,
"modules": {
"pytz": pytz,
"datetime": datetime
},
"post_hooks": post_hooks,
"pre_hooks": pre_hooks,
Expand All @@ -302,7 +317,8 @@ def generate(model, project, flat_graph, provider=None):
"fromjson": fromjson,
"tojson": tojson,
"target": target,
"this": dbt.utils.Relation(profile, adapter, model, use_temp=True)
"this": dbt.utils.Relation(profile, adapter, model, use_temp=True),
"try_or_compiler_error": try_or_compiler_error(model)
})

context = _add_tracking(context)
Expand Down
8 changes: 6 additions & 2 deletions dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,13 @@ def __str__(self, prefix="! "):
if self.node is not None:
node_string = " in {}".format(self.node_to_string(self.node))

if hasattr(self.msg, 'split'):
split_msg = self.msg.split("\n")
else:
split_msg = basestring(self.msg).split("\n")

lines = ["{}{}".format(self.type + ' Error',
node_string)] + \
self.msg.split("\n")
node_string)] + split_msg

lines += self.process_stack()

Expand Down
4 changes: 4 additions & 0 deletions dbt/include/global_project/macros/etc/bigquery.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

{% macro date_sharded_table(base_name) %}
{{ return(base_name ~ "[DBT__PARTITION_DATE]") }}
{% endmacro %}
Loading

0 comments on commit 4eb75ec

Please sign in to comment.