From 1a46e8ef63e276e87159fabb6df41db663385b6a Mon Sep 17 00:00:00 2001
From: Torkjel Hongve
Date: Tue, 8 Nov 2022 12:38:14 +0100
Subject: [PATCH 01/13] Support configuring GCP Dataproc serverless jobs
This adds a `dataproc_batch` key for specifying the Dataproc Batch
configuration. At runtime this is used to populate the
google.cloud.dataproc_v1.types.Batch object before it is submitted to
the Dataproc service.
To avoid having to add explicit support for every option offered by the
service, and having to chase after a moving target as Google's API evolves,
this key accepts arbitrary yaml, which is mapped to the Batch object on
a best effort basis.
Signed-off-by: Torkjel Hongve |
---
dbt/adapters/bigquery/connections.py | 17 +++++++++--
dbt/adapters/bigquery/python_submissions.py | 33 +++++++++++++++++++++
2 files changed, 48 insertions(+), 2 deletions(-)
diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py
index e4643f660..20439746c 100644
--- a/dbt/adapters/bigquery/connections.py
+++ b/dbt/adapters/bigquery/connections.py
@@ -1,7 +1,9 @@
import json
import re
from contextlib import contextmanager
-from dataclasses import dataclass
+from dataclasses import dataclass, field
+from mashumaro.helper import pass_through
+
from functools import lru_cache
import agate
from requests.exceptions import ConnectionError
@@ -35,7 +37,7 @@
from dbt.events.types import SQLQuery
from dbt.version import __version__ as dbt_version
-from dbt.dataclass_schema import StrEnum
+from dbt.dataclass_schema import ExtensibleDbtClassMixin, StrEnum
logger = AdapterLogger("BigQuery")
@@ -91,6 +93,10 @@ class BigQueryAdapterResponse(AdapterResponse):
job_id: Optional[str] = None
slot_ms: Optional[int] = None
+@dataclass
+class DataprocBatchConfig(ExtensibleDbtClassMixin):
+ def __init__(self, batch_config):
+ self.batch_config = batch_config
@dataclass
class BigQueryCredentials(Credentials):
@@ -124,6 +130,13 @@ class BigQueryCredentials(Credentials):
dataproc_cluster_name: Optional[str] = None
gcs_bucket: Optional[str] = None
+ dataproc_batch: DataprocBatchConfig = field(
+ metadata={
+ "serialization_strategy": pass_through,
+ },
+ default = None
+ )
+
scopes: Optional[Tuple[str, ...]] = (
"https://www.googleapis.com/auth/bigquery",
"https://www.googleapis.com/auth/cloud-platform",
diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py
index 43b8201d1..fa629dc72 100644
--- a/dbt/adapters/bigquery/python_submissions.py
+++ b/dbt/adapters/bigquery/python_submissions.py
@@ -119,6 +119,31 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient:
client_options=self.client_options, credentials=self.GoogleCredentials
)
+ def _configure_batch(self, configDict, target):
+ for key, value in configDict.items():
+ if hasattr(target, key):
+ attr = getattr(target, key)
+
+ # Basic types we just set as-is.
+ if type(value) in [str, int, float]:
+ setattr(target, key, type(attr)(value))
+
+ # For lists, we assume target to be a a protobuf repeated field.
+ # The types must match.
+ elif isinstance(value, list):
+ for v in value:
+ attr.append(v)
+
+ elif isinstance(value, dict):
+ # The target is a protobuf map. Cast to expected type and set.
+ if "ScalarMapContainer" in type(attr).__name__:
+ for k, v in value.items():
+ attr[k] = type(attr[k])(v)
+
+ # Target is another configuraiton object. Recurse.
+ else:
+ self._configure_batch(value, attr)
+
def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
# create the Dataproc Serverless job config
batch = dataproc_v1.Batch()
@@ -137,6 +162,14 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
"spark.executor.instances": "2",
}
parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}"
+
+ if self.credential.dataproc_batch:
+ try:
+ self._configure_batch(self.credential.dataproc_batch, batch)
+ except Exception as e:
+ docurl = "https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.Batch"
+ raise ValueError(f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}") from e
+
request = dataproc_v1.CreateBatchRequest(
parent=parent,
batch=batch,
From 7eff828f994de23b6d739c520a2492264460682e Mon Sep 17 00:00:00 2001
From: Torkjel Hongve |
Date: Wed, 9 Nov 2022 09:45:44 +0100
Subject: [PATCH 02/13] Fixes and tests
- Make dataproc_batch key optional.
- Unit tests
- Move configuration of the `google.cloud.dataproc_v1.Batch` object
to a separate function.
Signed-off-by: Torkjel Hongve |
---
dbt/adapters/bigquery/connections.py | 2 +-
dbt/adapters/bigquery/python_submissions.py | 103 +++++++++++---------
tests/unit/test_bigquery_adapter.py | 54 ++++++++++
tests/unit/test_configure_dataproc_batch.py | 49 ++++++++++
4 files changed, 159 insertions(+), 49 deletions(-)
create mode 100644 tests/unit/test_configure_dataproc_batch.py
diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py
index 20439746c..d6164095f 100644
--- a/dbt/adapters/bigquery/connections.py
+++ b/dbt/adapters/bigquery/connections.py
@@ -130,7 +130,7 @@ class BigQueryCredentials(Credentials):
dataproc_cluster_name: Optional[str] = None
gcs_bucket: Optional[str] = None
- dataproc_batch: DataprocBatchConfig = field(
+ dataproc_batch: Optional[DataprocBatchConfig] = field(
metadata={
"serialization_strategy": pass_through,
},
diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py
index fa629dc72..be3c526ea 100644
--- a/dbt/adapters/bigquery/python_submissions.py
+++ b/dbt/adapters/bigquery/python_submissions.py
@@ -119,57 +119,10 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient:
client_options=self.client_options, credentials=self.GoogleCredentials
)
- def _configure_batch(self, configDict, target):
- for key, value in configDict.items():
- if hasattr(target, key):
- attr = getattr(target, key)
-
- # Basic types we just set as-is.
- if type(value) in [str, int, float]:
- setattr(target, key, type(attr)(value))
-
- # For lists, we assume target to be a a protobuf repeated field.
- # The types must match.
- elif isinstance(value, list):
- for v in value:
- attr.append(v)
-
- elif isinstance(value, dict):
- # The target is a protobuf map. Cast to expected type and set.
- if "ScalarMapContainer" in type(attr).__name__:
- for k, v in value.items():
- attr[k] = type(attr[k])(v)
-
- # Target is another configuraiton object. Recurse.
- else:
- self._configure_batch(value, attr)
-
def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
- # create the Dataproc Serverless job config
- batch = dataproc_v1.Batch()
- batch.pyspark_batch.main_python_file_uri = self.gcs_location
- # how to keep this up to date?
- # we should probably also open this up to be configurable
- jar_file_uri = self.parsed_model["config"].get(
- "jar_file_uri",
- "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar",
- )
- batch.pyspark_batch.jar_file_uris = [jar_file_uri]
- # should we make all of these spark/dataproc properties configurable?
- # https://cloud.google.com/dataproc-serverless/docs/concepts/properties
- # https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#runtimeconfig
- batch.runtime_config.properties = {
- "spark.executor.instances": "2",
- }
+ batch = self._configure_batch()
parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}"
- if self.credential.dataproc_batch:
- try:
- self._configure_batch(self.credential.dataproc_batch, batch)
- except Exception as e:
- docurl = "https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.Batch"
- raise ValueError(f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}") from e
-
request = dataproc_v1.CreateBatchRequest(
parent=parent,
batch=batch,
@@ -189,3 +142,57 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
# .blob(f"{matches.group(2)}.000000000")
# .download_as_string()
# )
+
+ def _configure_batch(self):
+ # create the Dataproc Serverless job config
+ batch = dataproc_v1.Batch()
+
+ # Apply defaults
+ batch.pyspark_batch.main_python_file_uri = self.gcs_location
+ jar_file_uri = self.parsed_model["config"].get(
+ "jar_file_uri",
+ "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar",
+ )
+ batch.pyspark_batch.jar_file_uris = [jar_file_uri]
+
+ # https://cloud.google.com/dataproc-serverless/docs/concepts/properties
+ # https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#runtimeconfig
+ batch.runtime_config.properties = {
+ "spark.executor.instances": "2",
+ }
+
+ # Apply configuration from dataproc_batch key, possibly overriding defaults.
+ if self.credential.dataproc_batch:
+ try:
+ self._configure_batch_from_config(self.credential.dataproc_batch, batch)
+ except Exception as e:
+ docurl = "https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.Batch"
+ raise ValueError(f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}") from e
+
+ return batch
+
+ @classmethod
+ def _configure_batch_from_config(cls, configDict, target):
+ for key, value in configDict.items():
+ if hasattr(target, key):
+ attr = getattr(target, key)
+
+ # Basic types we just set as-is.
+ if type(value) in [str, int, float]:
+ setattr(target, key, type(attr)(value))
+
+ # For lists, we assume target to be a a protobuf repeated field.
+ # The types must match.
+ elif isinstance(value, list):
+ for v in value:
+ attr.append(v)
+
+ elif isinstance(value, dict):
+ # The target is a protobuf map. Cast to expected type and set.
+ if "ScalarMapContainer" in type(attr).__name__:
+ for k, v in value.items():
+ attr[k] = type(attr[k])(v)
+
+ # Target is another configuration object. Recurse.
+ else:
+ cls._configure_batch_from_config(value, attr)
diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py
index 83a2eb3a6..f447fb2dd 100644
--- a/tests/unit/test_bigquery_adapter.py
+++ b/tests/unit/test_bigquery_adapter.py
@@ -107,6 +107,43 @@ def setUp(self):
'threads': 1,
'location': 'Solar Station',
},
+ 'dataproc-serverless-configured' : {
+ 'type': 'bigquery',
+ 'method': 'oauth',
+ 'schema': 'dummy_schema',
+ 'threads': 1,
+ 'gcs_bucket': 'dummy-bucket',
+ 'dataproc_region': 'europe-west1',
+ 'submission_method': 'serverless',
+ 'dataproc_batch': {
+ 'environment_config' : {
+ 'execution_config' : {
+ 'service_account': 'dbt@dummy-project.iam.gserviceaccount.com',
+ 'subnetwork_uri': 'dataproc',
+ 'network_tags': [ "foo", "bar" ]
+ }
+ },
+ 'labels': {
+ 'dbt': 'rocks',
+ 'number': 1
+ },
+ 'runtime_config': {
+ 'properties': {
+ 'spark.executor.instances': 4,
+ 'spark.driver.memory': '1g'
+ }
+ }
+ }
+ },
+ 'dataproc-serverless-default' : {
+ 'type': 'bigquery',
+ 'method': 'oauth',
+ 'schema': 'dummy_schema',
+ 'threads': 1,
+ 'gcs_bucket': 'dummy-bucket',
+ 'dataproc_region': 'europe-west1',
+ 'submission_method': 'serverless'
+ }
},
'target': 'oauth',
}
@@ -183,6 +220,23 @@ def test_acquire_connection_oauth_validations(self, mock_open_connection):
connection.handle
mock_open_connection.assert_called_once()
+ @patch('dbt.adapters.bigquery.BigQueryConnectionManager.open', return_value=_bq_conn())
+ def test_acquire_connection_dataproc_serverless(self, mock_open_connection):
+ adapter = self.get_adapter('dataproc-serverless-configured')
+ try:
+ connection = adapter.acquire_connection('dummy')
+ self.assertEqual(connection.type, 'bigquery')
+
+ except dbt.exceptions.ValidationException as e:
+ self.fail('got ValidationException: {}'.format(str(e)))
+
+ except BaseException as e:
+ raise
+
+ mock_open_connection.assert_not_called()
+ connection.handle
+ mock_open_connection.assert_called_once()
+
@patch('dbt.adapters.bigquery.BigQueryConnectionManager.open', return_value=_bq_conn())
def test_acquire_connection_service_account_validations(self, mock_open_connection):
adapter = self.get_adapter('service_account')
diff --git a/tests/unit/test_configure_dataproc_batch.py b/tests/unit/test_configure_dataproc_batch.py
new file mode 100644
index 000000000..449b2ae92
--- /dev/null
+++ b/tests/unit/test_configure_dataproc_batch.py
@@ -0,0 +1,49 @@
+from dbt.adapters.bigquery.python_submissions import ServerlessDataProcHelper
+from google.cloud import dataproc_v1
+
+from .test_bigquery_adapter import BaseTestBigQueryAdapter
+
+# Test application of dataproc_batch configuration to a
+# google.cloud.dataproc_v1.Batch object.
+# This reuses the machinery from BaseTestBigQueryAdapter to get hold of the
+# parsed credentials
+class TestConfigureDataprocBatch(BaseTestBigQueryAdapter):
+
+ def test_configure_dataproc_serverless_batch(self):
+ adapter = self.get_adapter('dataproc-serverless-configured')
+ credentials = adapter.acquire_connection('dummy').credentials
+ self.assertIsNotNone(credentials)
+
+ batchConfig = credentials.dataproc_batch
+ self.assertIsNotNone(batchConfig)
+
+ raw_batch_config = self.raw_profile['outputs']['dataproc-serverless-configured']['dataproc_batch']
+ raw_environment_config = raw_batch_config['environment_config']
+ raw_execution_config = raw_environment_config['execution_config']
+ raw_labels: dict[str, any] = raw_batch_config['labels']
+ raw_rt_config = raw_batch_config['runtime_config']
+
+ raw_batch_config = self.raw_profile['outputs']['dataproc-serverless-configured']['dataproc_batch']
+
+ batch = dataproc_v1.Batch()
+
+ ServerlessDataProcHelper._configure_batch_from_config(batchConfig, batch)
+
+ # google's protobuf types expose maps as dict[str, str]
+ to_str_values = lambda d: dict([(k, str(v)) for (k, v) in d.items()])
+
+ self.assertEqual(batch.environment_config.execution_config.service_account, raw_execution_config['service_account'])
+ self.assertFalse(batch.environment_config.execution_config.network_uri)
+ self.assertEqual(batch.environment_config.execution_config.subnetwork_uri, raw_execution_config['subnetwork_uri'])
+ self.assertEqual(batch.environment_config.execution_config.network_tags, raw_execution_config['network_tags'])
+ self.assertEqual(batch.labels, to_str_values(raw_labels))
+ self.assertEquals(batch.runtime_config.properties, to_str_values(raw_rt_config['properties']))
+
+
+ def test_default_dataproc_serverless_batch(self):
+ adapter = self.get_adapter('dataproc-serverless-default')
+ credentials = adapter.acquire_connection('dummy').credentials
+ self.assertIsNotNone(credentials)
+
+ batchConfig = credentials.dataproc_batch
+ self.assertIsNone(batchConfig)
From c0ed8623361015aa2736acb14ea0bea3caa7ef7b Mon Sep 17 00:00:00 2001
From: Torkjel Hongve |
Date: Mon, 14 Nov 2022 08:18:21 +0100
Subject: [PATCH 03/13] Do not reinvent protobuf parsing.
---
dbt/adapters/bigquery/python_submissions.py | 27 +++------------------
dev-requirements.txt | 1 +
tests/unit/test_bigquery_adapter.py | 4 +--
3 files changed, 6 insertions(+), 26 deletions(-)
diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py
index be3c526ea..599444ab3 100644
--- a/dbt/adapters/bigquery/python_submissions.py
+++ b/dbt/adapters/bigquery/python_submissions.py
@@ -6,6 +6,7 @@
from google.api_core import retry
from google.api_core.client_options import ClientOptions
from google.cloud import storage, dataproc_v1 # type: ignore
+from google.protobuf.json_format import ParseDict
OPERATION_RETRY_TIME = 10
@@ -172,27 +173,5 @@ def _configure_batch(self):
return batch
@classmethod
- def _configure_batch_from_config(cls, configDict, target):
- for key, value in configDict.items():
- if hasattr(target, key):
- attr = getattr(target, key)
-
- # Basic types we just set as-is.
- if type(value) in [str, int, float]:
- setattr(target, key, type(attr)(value))
-
- # For lists, we assume target to be a a protobuf repeated field.
- # The types must match.
- elif isinstance(value, list):
- for v in value:
- attr.append(v)
-
- elif isinstance(value, dict):
- # The target is a protobuf map. Cast to expected type and set.
- if "ScalarMapContainer" in type(attr).__name__:
- for k, v in value.items():
- attr[k] = type(attr[k])(v)
-
- # Target is another configuration object. Recurse.
- else:
- cls._configure_batch_from_config(value, attr)
+ def _configure_batch_from_config(cls, config_dict, target):
+ ParseDict(config_dict, target._pb)
diff --git a/dev-requirements.txt b/dev-requirements.txt
index 676703d3e..1cceff9f6 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -26,3 +26,4 @@ wheel
# For dataproc running
google-cloud-storage[dataproc]>=2.4.0
google-cloud-dataproc[dataproc]>=4.0.3
+protobuf[dataproc]
diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py
index f447fb2dd..2546fb00b 100644
--- a/tests/unit/test_bigquery_adapter.py
+++ b/tests/unit/test_bigquery_adapter.py
@@ -125,11 +125,11 @@ def setUp(self):
},
'labels': {
'dbt': 'rocks',
- 'number': 1
+ 'number': '1'
},
'runtime_config': {
'properties': {
- 'spark.executor.instances': 4,
+ 'spark.executor.instances': '4',
'spark.driver.memory': '1g'
}
}
From 131b2a367327fe215cb819fbe1ff1830358f40bc Mon Sep 17 00:00:00 2001
From: Torkjel Hongve |
Date: Tue, 15 Nov 2022 07:52:24 +0100
Subject: [PATCH 04/13] ws
---
dbt/adapters/bigquery/connections.py | 4 +++-
dbt/adapters/bigquery/python_submissions.py | 4 +++-
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py
index d6164095f..9a003124c 100644
--- a/dbt/adapters/bigquery/connections.py
+++ b/dbt/adapters/bigquery/connections.py
@@ -93,11 +93,13 @@ class BigQueryAdapterResponse(AdapterResponse):
job_id: Optional[str] = None
slot_ms: Optional[int] = None
+
@dataclass
class DataprocBatchConfig(ExtensibleDbtClassMixin):
def __init__(self, batch_config):
self.batch_config = batch_config
+
@dataclass
class BigQueryCredentials(Credentials):
method: BigQueryConnectionMethod
@@ -134,7 +136,7 @@ class BigQueryCredentials(Credentials):
metadata={
"serialization_strategy": pass_through,
},
- default = None
+ default=None
)
scopes: Optional[Tuple[str, ...]] = (
diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py
index 599444ab3..50faf8e27 100644
--- a/dbt/adapters/bigquery/python_submissions.py
+++ b/dbt/adapters/bigquery/python_submissions.py
@@ -168,7 +168,9 @@ def _configure_batch(self):
self._configure_batch_from_config(self.credential.dataproc_batch, batch)
except Exception as e:
docurl = "https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.Batch"
- raise ValueError(f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}") from e
+ raise ValueError(
+ f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}"
+ ) from e
return batch
From 83d338fd0b33d55fa62bb1026c433fdb0e9faeb8 Mon Sep 17 00:00:00 2001
From: Torkjel Hongve |
Date: Tue, 15 Nov 2022 08:59:14 +0100
Subject: [PATCH 05/13] Fix unit tests to run without gcloud credentials.
---
tests/unit/test_bigquery_adapter.py | 4 +++-
tests/unit/test_configure_dataproc_batch.py | 14 +++++++++++---
2 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py
index 2546fb00b..9e76a6f3c 100644
--- a/tests/unit/test_bigquery_adapter.py
+++ b/tests/unit/test_bigquery_adapter.py
@@ -220,9 +220,11 @@ def test_acquire_connection_oauth_validations(self, mock_open_connection):
connection.handle
mock_open_connection.assert_called_once()
+ @patch('dbt.adapters.bigquery.connections.get_bigquery_defaults', return_value=('credentials', 'project_id'))
@patch('dbt.adapters.bigquery.BigQueryConnectionManager.open', return_value=_bq_conn())
- def test_acquire_connection_dataproc_serverless(self, mock_open_connection):
+ def test_acquire_connection_dataproc_serverless(self, mock_open_connection, mock_get_bigquery_defaults):
adapter = self.get_adapter('dataproc-serverless-configured')
+ mock_get_bigquery_defaults.assert_called_once()
try:
connection = adapter.acquire_connection('dummy')
self.assertEqual(connection.type, 'bigquery')
diff --git a/tests/unit/test_configure_dataproc_batch.py b/tests/unit/test_configure_dataproc_batch.py
index 449b2ae92..a863e226e 100644
--- a/tests/unit/test_configure_dataproc_batch.py
+++ b/tests/unit/test_configure_dataproc_batch.py
@@ -1,3 +1,5 @@
+from unittest.mock import patch
+
from dbt.adapters.bigquery.python_submissions import ServerlessDataProcHelper
from google.cloud import dataproc_v1
@@ -9,8 +11,11 @@
# parsed credentials
class TestConfigureDataprocBatch(BaseTestBigQueryAdapter):
- def test_configure_dataproc_serverless_batch(self):
+ @patch('dbt.adapters.bigquery.connections.get_bigquery_defaults', return_value=('credentials', 'project_id'))
+ def test_configure_dataproc_serverless_batch(self, mock_get_bigquery_defaults):
adapter = self.get_adapter('dataproc-serverless-configured')
+ mock_get_bigquery_defaults.assert_called_once()
+
credentials = adapter.acquire_connection('dummy').credentials
self.assertIsNotNone(credentials)
@@ -37,11 +42,14 @@ def test_configure_dataproc_serverless_batch(self):
self.assertEqual(batch.environment_config.execution_config.subnetwork_uri, raw_execution_config['subnetwork_uri'])
self.assertEqual(batch.environment_config.execution_config.network_tags, raw_execution_config['network_tags'])
self.assertEqual(batch.labels, to_str_values(raw_labels))
- self.assertEquals(batch.runtime_config.properties, to_str_values(raw_rt_config['properties']))
+ self.assertEqual(batch.runtime_config.properties, to_str_values(raw_rt_config['properties']))
- def test_default_dataproc_serverless_batch(self):
+ @patch('dbt.adapters.bigquery.connections.get_bigquery_defaults', return_value=('credentials', 'project_id'))
+ def test_default_dataproc_serverless_batch(self, mock_get_bigquery_defaults):
adapter = self.get_adapter('dataproc-serverless-default')
+ mock_get_bigquery_defaults.assert_called_once()
+
credentials = adapter.acquire_connection('dummy').credentials
self.assertIsNotNone(credentials)
From c26604037810ca4fdd1cce84ada4fa26afb9457f Mon Sep 17 00:00:00 2001
From: Torkjel Hongve |
Date: Mon, 21 Nov 2022 06:23:38 +0100
Subject: [PATCH 06/13] formatting
---
dbt/adapters/bigquery/connections.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py
index 9a003124c..28b72f085 100644
--- a/dbt/adapters/bigquery/connections.py
+++ b/dbt/adapters/bigquery/connections.py
@@ -136,7 +136,7 @@ class BigQueryCredentials(Credentials):
metadata={
"serialization_strategy": pass_through,
},
- default=None
+ default=None,
)
scopes: Optional[Tuple[str, ...]] = (
From 2ef6f1c6e74a4245312e0d67d25cab05780f76c4 Mon Sep 17 00:00:00 2001
From: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com>
Date: Wed, 21 Dec 2022 12:09:33 -0800
Subject: [PATCH 07/13] Update dev-requirements.txt
---
dev-requirements.txt | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/dev-requirements.txt b/dev-requirements.txt
index 822da51a1..de146ae27 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -21,4 +21,4 @@ pytz~=2022.6.0
tox~=3.13
types-requests~=2.28.11
twine~=4.0.2
-wheel~=0.37.1
\ No newline at end of file
+wheel~=0.37.1
From 74bffe0b704cbeb4028aa6319d99e06dd537911f Mon Sep 17 00:00:00 2001
From: Colin
Date: Fri, 3 Mar 2023 13:14:27 -0800
Subject: [PATCH 08/13] fix quote policy for py models, add python-test to
tox.ini and cleanup python_submissions.py
---
dbt/adapters/bigquery/python_submissions.py | 46 +++++++++++--------
.../bigquery/macros/python_model/python.sql | 3 ++
dev-requirements.txt | 4 +-
tests/unit/test_configure_dataproc_batch.py | 4 +-
tox.ini | 15 +++++-
5 files changed, 49 insertions(+), 23 deletions(-)
create mode 100644 dbt/include/bigquery/macros/python_model/python.sql
diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py
index 5a08a51af..e5fbf037e 100644
--- a/dbt/adapters/bigquery/python_submissions.py
+++ b/dbt/adapters/bigquery/python_submissions.py
@@ -1,8 +1,8 @@
from typing import Dict, Union
-import time
from dbt.adapters.base import PythonJobHelper
from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials
+from dbt.adapters.bigquery.connections import DataprocBatchConfig
from google.api_core import retry
from google.api_core.client_options import ClientOptions
from google.cloud import storage, dataproc_v1 # type: ignore
@@ -139,7 +139,18 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job:
def _configure_batch(self):
# create the Dataproc Serverless job config
# need to pin dataproc version to 1.1 as it now defaults to 2.0
- batch = dataproc_v1.Batch({"runtime_config": dataproc_v1.RuntimeConfig(version="1.1")})
+ # https://cloud.google.com/dataproc-serverless/docs/concepts/properties
+ # https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#runtimeconfig
+ batch = dataproc_v1.Batch(
+ {
+ "runtime_config": dataproc_v1.RuntimeConfig(
+ version="1.1",
+ properties={
+ "spark.executor.instances": "2",
+ },
+ )
+ }
+ )
# Apply defaults
batch.pyspark_batch.main_python_file_uri = self.gcs_location
jar_file_uri = self.parsed_model["config"].get(
@@ -148,24 +159,23 @@ def _configure_batch(self):
)
batch.pyspark_batch.jar_file_uris = [jar_file_uri]
- # https://cloud.google.com/dataproc-serverless/docs/concepts/properties
- # https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#runtimeconfig
- batch.runtime_config.properties = {
- "spark.executor.instances": "2",
- }
-
# Apply configuration from dataproc_batch key, possibly overriding defaults.
if self.credential.dataproc_batch:
- try:
- self._configure_batch_from_config(self.credential.dataproc_batch, batch)
- except Exception as e:
- docurl = "https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.Batch"
- raise ValueError(
- f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}"
- ) from e
-
+ self._update_batch_from_config(self.credential.dataproc_batch, batch)
return batch
@classmethod
- def _configure_batch_from_config(cls, config_dict, target):
- ParseDict(config_dict, target._pb)
+ def _update_batch_from_config(
+ cls, config_dict: Union[Dict, DataprocBatchConfig], target: dataproc_v1.Batch
+ ):
+ try:
+ # updates in place
+ ParseDict(config_dict, target._pb)
+ except Exception as e:
+ docurl = (
+ "https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1"
+ "#google.cloud.dataproc.v1.Batch"
+ )
+ raise ValueError(
+ f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}"
+ ) from e
diff --git a/dbt/include/bigquery/macros/python_model/python.sql b/dbt/include/bigquery/macros/python_model/python.sql
new file mode 100644
index 000000000..adbab752e
--- /dev/null
+++ b/dbt/include/bigquery/macros/python_model/python.sql
@@ -0,0 +1,3 @@
+{% macro bigquery__resolve_model_name(input_model_name) -%}
+ {{ input_model_name | string | replace('`', '') | replace('"', '\"') }}
+{%- endmacro -%}
diff --git a/dev-requirements.txt b/dev-requirements.txt
index e7f7bd2fa..eee207633 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -1,7 +1,7 @@
# install latest changes in dbt-core
# TODO: how to automate switching from develop to version branches?
-git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
-git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter
+git+https://github.com/dbt-labs/dbt-core.git@addPyRelationNameMacro#egg=dbt-core&subdirectory=core
+git+https://github.com/dbt-labs/dbt-core.git@addPyRelationNameMacro#egg=dbt-tests-adapter&subdirectory=tests/adapter
# if version 1.x or greater -> pin to major version
# if version 0.x -> pin to minor
diff --git a/tests/unit/test_configure_dataproc_batch.py b/tests/unit/test_configure_dataproc_batch.py
index a863e226e..8222645f4 100644
--- a/tests/unit/test_configure_dataproc_batch.py
+++ b/tests/unit/test_configure_dataproc_batch.py
@@ -12,7 +12,7 @@
class TestConfigureDataprocBatch(BaseTestBigQueryAdapter):
@patch('dbt.adapters.bigquery.connections.get_bigquery_defaults', return_value=('credentials', 'project_id'))
- def test_configure_dataproc_serverless_batch(self, mock_get_bigquery_defaults):
+ def test_update_dataproc_serverless_batch(self, mock_get_bigquery_defaults):
adapter = self.get_adapter('dataproc-serverless-configured')
mock_get_bigquery_defaults.assert_called_once()
@@ -32,7 +32,7 @@ def test_configure_dataproc_serverless_batch(self, mock_get_bigquery_defaults):
batch = dataproc_v1.Batch()
- ServerlessDataProcHelper._configure_batch_from_config(batchConfig, batch)
+ ServerlessDataProcHelper._update_batch_from_config(raw_batch_config, batch)
# google's protobuf types expose maps as dict[str, str]
to_str_values = lambda d: dict([(k, str(v)) for (k, v) in d.items()])
diff --git a/tox.ini b/tox.ini
index 1721428ee..d08321901 100644
--- a/tox.ini
+++ b/tox.ini
@@ -16,6 +16,19 @@ deps =
[testenv:{integration,py37,py38,py39,py310,py311,py}-{bigquery}]
description = adapter plugin integration testing
skip_install = true
+passenv =
+ DBT_*
+ BIGQUERY_TEST_*
+ PYTEST_ADDOPTS
+commands =
+ bigquery: {envpython} -m pytest {posargs} -vv tests/functional -k "not TestPython" --profile service_account
+deps =
+ -rdev-requirements.txt
+ -e.
+
+[testenv:{python-tests,py37,py38,py39,py310,py311,py}]
+description = python integration testing
+skip_install = true
passenv =
DBT_*
BIGQUERY_TEST_*
@@ -23,7 +36,7 @@ passenv =
DATAPROC_*
GCS_BUCKET
commands =
- bigquery: {envpython} -m pytest {posargs} -vv tests/functional -k "not TestPython" --profile service_account
+ {envpython} -m pytest {posargs} -vv tests/functional -k "TestPython" --profile service_account
deps =
-rdev-requirements.txt
-e.
From c1695bcc2469bf24976e58c5c21961e5fa556eb8 Mon Sep 17 00:00:00 2001
From: Colin
Date: Fri, 3 Mar 2023 13:22:53 -0800
Subject: [PATCH 09/13] have mypy install types
---
.github/workflows/main.yml | 1 +
1 file changed, 1 insertion(+)
diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 140557beb..1d04b0252 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -59,6 +59,7 @@ jobs:
python -m pip --version
pre-commit --version
mypy --version
+ mypy --install-types
dbt --version
- name: Run pre-comit hooks
run: pre-commit run --all-files --show-diff-on-failure
From 7f8b52890794f75bb1cf496144c2ea7fae8a04a9 Mon Sep 17 00:00:00 2001
From: Colin
Date: Fri, 3 Mar 2023 13:25:23 -0800
Subject: [PATCH 10/13] add changie
---
.changes/unreleased/Features-20230303-132509.yaml | 6 ++++++
1 file changed, 6 insertions(+)
create mode 100644 .changes/unreleased/Features-20230303-132509.yaml
diff --git a/.changes/unreleased/Features-20230303-132509.yaml b/.changes/unreleased/Features-20230303-132509.yaml
new file mode 100644
index 000000000..3a0ba8403
--- /dev/null
+++ b/.changes/unreleased/Features-20230303-132509.yaml
@@ -0,0 +1,6 @@
+kind: Features
+body: add dataproc serverless config to profile
+time: 2023-03-03T13:25:09.02695-08:00
+custom:
+ Author: colin-rogers-dbt torkjel
+ Issue: "530"
From da52414b407e6384f89bd5ce7b75c1d16ab02112 Mon Sep 17 00:00:00 2001
From: Colin
Date: Fri, 3 Mar 2023 13:28:13 -0800
Subject: [PATCH 11/13] add types-protobuf to dev-requirements.txt
---
dev-requirements.txt | 1 +
1 file changed, 1 insertion(+)
diff --git a/dev-requirements.txt b/dev-requirements.txt
index eee207633..fcf17a5d6 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -28,5 +28,6 @@ tox~=3.0;python_version=="3.7"
tox~=4.4;python_version>="3.8"
types-pytz~=2022.7
types-requests~=2.28
+types-protobuf~=4.0
twine~=4.0
wheel~=0.38
From 309b8338121ad90be96dd9b5801941938d347143 Mon Sep 17 00:00:00 2001
From: Colin
Date: Fri, 3 Mar 2023 13:28:39 -0800
Subject: [PATCH 12/13] remove mypy install
---
.github/workflows/main.yml | 1 -
1 file changed, 1 deletion(-)
diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 1d04b0252..140557beb 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -59,7 +59,6 @@ jobs:
python -m pip --version
pre-commit --version
mypy --version
- mypy --install-types
dbt --version
- name: Run pre-comit hooks
run: pre-commit run --all-files --show-diff-on-failure
From 7436f55c537dc51b5cc462d6912e1978f0bc8457 Mon Sep 17 00:00:00 2001
From: Colin
Date: Wed, 8 Mar 2023 15:35:36 -0800
Subject: [PATCH 13/13] remove branch update from dev-requirements.txt
---
dev-requirements.txt | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/dev-requirements.txt b/dev-requirements.txt
index fcf17a5d6..c38fae3f1 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -1,7 +1,7 @@
# install latest changes in dbt-core
# TODO: how to automate switching from develop to version branches?
-git+https://github.com/dbt-labs/dbt-core.git@addPyRelationNameMacro#egg=dbt-core&subdirectory=core
-git+https://github.com/dbt-labs/dbt-core.git@addPyRelationNameMacro#egg=dbt-tests-adapter&subdirectory=tests/adapter
+git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
+git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter
# if version 1.x or greater -> pin to major version
# if version 0.x -> pin to minor
|