From b7556e06b67170f07479a09a592bddb0ce93d787 Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 13 Sep 2022 14:27:54 +0200 Subject: [PATCH 01/17] add region --- dlt/common/configuration/gcp_client_credentials.py | 1 + dlt/load/bigquery/client.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dlt/common/configuration/gcp_client_credentials.py b/dlt/common/configuration/gcp_client_credentials.py index 17ca3e3388..868987c960 100644 --- a/dlt/common/configuration/gcp_client_credentials.py +++ b/dlt/common/configuration/gcp_client_credentials.py @@ -7,6 +7,7 @@ class GcpClientCredentials(CredentialsConfiguration): __namespace__: str = "GCP" PROJECT_ID: str = None + LOCATION: str = None CRED_TYPE: str = "service_account" PRIVATE_KEY: TSecretValue = None TOKEN_URI: str = "https://oauth2.googleapis.com/token" diff --git a/dlt/load/bigquery/client.py b/dlt/load/bigquery/client.py index 014601fb42..64d03b6d00 100644 --- a/dlt/load/bigquery/client.py +++ b/dlt/load/bigquery/client.py @@ -66,7 +66,7 @@ def open_connection(self) -> None: credentials = None else: credentials = service_account.Credentials.from_service_account_info(self.C.as_credentials()) - self._client = bigquery.Client(self.C.PROJECT_ID, credentials=credentials) + self._client = bigquery.Client(self.C.PROJECT_ID, credentials=credentials, location=self.C.LOCATION) def close_connection(self) -> None: if self._client: From 95d0d9ee4cf5f94979f8dbc35f8a297735f9bed5 Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 13 Sep 2022 16:15:11 +0200 Subject: [PATCH 02/17] add location --- dlt/pipeline/typing.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dlt/pipeline/typing.py b/dlt/pipeline/typing.py index bdd9b44c03..319cdc22ad 100644 --- a/dlt/pipeline/typing.py +++ b/dlt/pipeline/typing.py @@ -34,6 +34,7 @@ class GCPPipelineCredentials(PipelineCredentials): DEFAULT_DATASET: str = None CLIENT_EMAIL: str = None PRIVATE_KEY: TSecretValue = None + LOCATION: str = None CRED_TYPE: str = "service_account" TOKEN_URI: str = "https://oauth2.googleapis.com/token" HTTP_TIMEOUT: float = 15.0 @@ -48,20 +49,19 @@ def default_dataset(self, new_value: str) -> None: self.DEFAULT_DATASET = new_value @classmethod - def from_services_dict(cls, services: StrAny, dataset_prefix: str) -> "GCPPipelineCredentials": + def from_services_dict(cls, services: StrAny, dataset_prefix: str, location: str) -> "GCPPipelineCredentials": assert dataset_prefix is not None - - return cls("bigquery", services["project_id"], dataset_prefix, services["client_email"], services["private_key"]) + return cls("bigquery", services["project_id"], dataset_prefix, services["client_email"], services["private_key"], location) @classmethod - def from_services_file(cls, services_path: str, dataset_prefix: str) -> "GCPPipelineCredentials": + def from_services_file(cls, services_path: str, dataset_prefix: str, location: str) -> "GCPPipelineCredentials": with open(services_path, "r", encoding="utf-8") as f: services = json.load(f) - return GCPPipelineCredentials.from_services_dict(services, dataset_prefix) + return GCPPipelineCredentials.from_services_dict(services, dataset_prefix, location) @classmethod - def default_credentials(cls, dataset_prefix: str, project_id: str = None) -> "GCPPipelineCredentials": - return cls("bigquery", project_id, dataset_prefix, None) + def default_credentials(cls, dataset_prefix: str, project_id: str = None, location: str = None) -> "GCPPipelineCredentials": + return cls("bigquery", project_id, dataset_prefix, None, None, location) @dataclass From 8cfc67ceb915df818e075c89529fb9a91efb226b Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 13 Sep 2022 16:50:10 +0200 Subject: [PATCH 03/17] add location --- dlt/common/configuration/gcp_client_credentials.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dlt/common/configuration/gcp_client_credentials.py b/dlt/common/configuration/gcp_client_credentials.py index 868987c960..da7aba80b9 100644 --- a/dlt/common/configuration/gcp_client_credentials.py +++ b/dlt/common/configuration/gcp_client_credentials.py @@ -7,7 +7,6 @@ class GcpClientCredentials(CredentialsConfiguration): __namespace__: str = "GCP" PROJECT_ID: str = None - LOCATION: str = None CRED_TYPE: str = "service_account" PRIVATE_KEY: TSecretValue = None TOKEN_URI: str = "https://oauth2.googleapis.com/token" @@ -30,4 +29,4 @@ def as_credentials(cls) -> StrAny: "private_key": cls.PRIVATE_KEY, "token_uri": cls.TOKEN_URI, "client_email": cls.CLIENT_EMAIL - } + } \ No newline at end of file From 5d2c8aef078b216b03a6593e021019f0869fd23a Mon Sep 17 00:00:00 2001 From: Adrian Date: Wed, 14 Sep 2022 12:11:07 +0200 Subject: [PATCH 04/17] add location default --- dlt/common/configuration/gcp_client_credentials.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dlt/common/configuration/gcp_client_credentials.py b/dlt/common/configuration/gcp_client_credentials.py index da7aba80b9..5388eb6e9f 100644 --- a/dlt/common/configuration/gcp_client_credentials.py +++ b/dlt/common/configuration/gcp_client_credentials.py @@ -9,6 +9,7 @@ class GcpClientCredentials(CredentialsConfiguration): PROJECT_ID: str = None CRED_TYPE: str = "service_account" PRIVATE_KEY: TSecretValue = None + LOCATION: str = "US" TOKEN_URI: str = "https://oauth2.googleapis.com/token" CLIENT_EMAIL: str = None From bedbe785b4d4bde04634fcb7d7ef72cde4b561b7 Mon Sep 17 00:00:00 2001 From: Adrian Date: Wed, 14 Sep 2022 14:43:32 +0200 Subject: [PATCH 05/17] add location default --- dlt/pipeline/typing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/pipeline/typing.py b/dlt/pipeline/typing.py index 319cdc22ad..778de04d59 100644 --- a/dlt/pipeline/typing.py +++ b/dlt/pipeline/typing.py @@ -34,7 +34,7 @@ class GCPPipelineCredentials(PipelineCredentials): DEFAULT_DATASET: str = None CLIENT_EMAIL: str = None PRIVATE_KEY: TSecretValue = None - LOCATION: str = None + LOCATION: str = "US" CRED_TYPE: str = "service_account" TOKEN_URI: str = "https://oauth2.googleapis.com/token" HTTP_TIMEOUT: float = 15.0 From f7ea162fc187677e177b3a51a14a7f8729ecff54 Mon Sep 17 00:00:00 2001 From: Adrian Date: Wed, 14 Sep 2022 14:52:59 +0200 Subject: [PATCH 06/17] add default location --- dlt/pipeline/typing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/pipeline/typing.py b/dlt/pipeline/typing.py index 778de04d59..1cfc7cc68d 100644 --- a/dlt/pipeline/typing.py +++ b/dlt/pipeline/typing.py @@ -51,7 +51,7 @@ def default_dataset(self, new_value: str) -> None: @classmethod def from_services_dict(cls, services: StrAny, dataset_prefix: str, location: str) -> "GCPPipelineCredentials": assert dataset_prefix is not None - return cls("bigquery", services["project_id"], dataset_prefix, services["client_email"], services["private_key"], location) + return cls("bigquery", services["project_id"], dataset_prefix, services["client_email"], services["private_key"], location or cls.LOCATION) @classmethod def from_services_file(cls, services_path: str, dataset_prefix: str, location: str) -> "GCPPipelineCredentials": @@ -61,7 +61,7 @@ def from_services_file(cls, services_path: str, dataset_prefix: str, location: s @classmethod def default_credentials(cls, dataset_prefix: str, project_id: str = None, location: str = None) -> "GCPPipelineCredentials": - return cls("bigquery", project_id, dataset_prefix, None, None, location) + return cls("bigquery", project_id, dataset_prefix, None, None, location or cls.LOCATION) @dataclass From 3100357681ea1da6da39f7c31d9250aac81efb4f Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 14 Sep 2022 15:39:00 +0200 Subject: [PATCH 07/17] bigqeury location test skeleton --- tests/load/bigquery/test_bigquery_client.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/load/bigquery/test_bigquery_client.py b/tests/load/bigquery/test_bigquery_client.py index 1777e857b2..9ffe36b05a 100644 --- a/tests/load/bigquery/test_bigquery_client.py +++ b/tests/load/bigquery/test_bigquery_client.py @@ -86,6 +86,25 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) assert r_job.status() == "completed" +@pytest.mark.skip() +def test_bigquery_location(client: BigQueryClient, file_storage: FileStorage) -> None: + user_table_name = prepare_event_user_table(client) + load_json = { + "_dlt_id": uniq_id(), + "_dlt_root_id": uniq_id(), + "sender_id":'90238094809sajlkjxoiewjhduuiuehd', + "timestamp": str(pendulum.now()) + } + job = expect_load_file(client, file_storage, json.dumps(load_json), user_table_name) + + # start a job from the same file. it should fallback to retrieve job silently + r_job = client.start_file_load(client.schema.get_table(user_table_name), file_storage._make_path(job.file_name())) + assert r_job.status() == "completed" + + # client.sql_client.default_dataset_name - take dataset name + # client.sql_client.native_connection() + + def test_loading_errors(client: BigQueryClient, file_storage: FileStorage) -> None: user_table_name = prepare_table(client) # insert into unknown column From b21e115cd1952027d0d2327a9ada073ce0ce761c Mon Sep 17 00:00:00 2001 From: Adrian Date: Wed, 14 Sep 2022 17:41:13 +0200 Subject: [PATCH 08/17] fix backwards compatibility. --- dlt/load/bigquery/client.py | 3 ++- dlt/pipeline/typing.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dlt/load/bigquery/client.py b/dlt/load/bigquery/client.py index 64d03b6d00..668d144d2d 100644 --- a/dlt/load/bigquery/client.py +++ b/dlt/load/bigquery/client.py @@ -203,7 +203,8 @@ def restore_file_load(self, file_path: str) -> LoadJob: return BigQueryLoadJob( JobClientBase.get_file_name_from_file_path(file_path), self._retrieve_load_job(file_path), - self.CREDENTIALS + #self.CREDENTIALS + self.sql_client.native_connection() ) except api_core_exceptions.GoogleAPICallError as gace: reason = self._get_reason_from_errors(gace) diff --git a/dlt/pipeline/typing.py b/dlt/pipeline/typing.py index 1cfc7cc68d..f269a0c17e 100644 --- a/dlt/pipeline/typing.py +++ b/dlt/pipeline/typing.py @@ -49,12 +49,12 @@ def default_dataset(self, new_value: str) -> None: self.DEFAULT_DATASET = new_value @classmethod - def from_services_dict(cls, services: StrAny, dataset_prefix: str, location: str) -> "GCPPipelineCredentials": + def from_services_dict(cls, services: StrAny, dataset_prefix: str, location: str = "US") -> "GCPPipelineCredentials": assert dataset_prefix is not None return cls("bigquery", services["project_id"], dataset_prefix, services["client_email"], services["private_key"], location or cls.LOCATION) @classmethod - def from_services_file(cls, services_path: str, dataset_prefix: str, location: str) -> "GCPPipelineCredentials": + def from_services_file(cls, services_path: str, dataset_prefix: str, location: str = "US") -> "GCPPipelineCredentials": with open(services_path, "r", encoding="utf-8") as f: services = json.load(f) return GCPPipelineCredentials.from_services_dict(services, dataset_prefix, location) From 587132567524585ad4e5c9f31b5f98b9bcb2283a Mon Sep 17 00:00:00 2001 From: Adrian Date: Fri, 16 Sep 2022 13:11:13 +0200 Subject: [PATCH 09/17] get table location --- tests/load/bigquery/test_bigquery_client.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/load/bigquery/test_bigquery_client.py b/tests/load/bigquery/test_bigquery_client.py index 9ffe36b05a..5e681df5cf 100644 --- a/tests/load/bigquery/test_bigquery_client.py +++ b/tests/load/bigquery/test_bigquery_client.py @@ -95,14 +95,14 @@ def test_bigquery_location(client: BigQueryClient, file_storage: FileStorage) -> "sender_id":'90238094809sajlkjxoiewjhduuiuehd', "timestamp": str(pendulum.now()) } - job = expect_load_file(client, file_storage, json.dumps(load_json), user_table_name) + #job = expect_load_file(client, file_storage, json.dumps(load_json), user_table_name) # start a job from the same file. it should fallback to retrieve job silently - r_job = client.start_file_load(client.schema.get_table(user_table_name), file_storage._make_path(job.file_name())) - assert r_job.status() == "completed" - - # client.sql_client.default_dataset_name - take dataset name - # client.sql_client.native_connection() + #r_job = client.start_file_load(client.schema.get_table(user_table_name), file_storage._make_path(job.file_name())) + #canonical_name = client.sql_client.make_qualified_table_name(user_table_name) + #native_client = client.sql_client.native_connection() + #t = native_client.get_table(canonical_name) + #assert t.location == 'US' def test_loading_errors(client: BigQueryClient, file_storage: FileStorage) -> None: From f4a023ec0bd001ae1cfed68549a64f4137f1d35f Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 20 Sep 2022 12:50:55 +0200 Subject: [PATCH 10/17] get table location --- dlt/load/bigquery/client.py | 4 ++-- tests/load/bigquery/test_bigquery_client.py | 15 +++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/dlt/load/bigquery/client.py b/dlt/load/bigquery/client.py index 668d144d2d..989f2a2bcf 100644 --- a/dlt/load/bigquery/client.py +++ b/dlt/load/bigquery/client.py @@ -203,8 +203,8 @@ def restore_file_load(self, file_path: str) -> LoadJob: return BigQueryLoadJob( JobClientBase.get_file_name_from_file_path(file_path), self._retrieve_load_job(file_path), - #self.CREDENTIALS - self.sql_client.native_connection() + self.CREDENTIALS + #self.sql_client.native_connection() ) except api_core_exceptions.GoogleAPICallError as gace: reason = self._get_reason_from_errors(gace) diff --git a/tests/load/bigquery/test_bigquery_client.py b/tests/load/bigquery/test_bigquery_client.py index 5e681df5cf..b313de5ce9 100644 --- a/tests/load/bigquery/test_bigquery_client.py +++ b/tests/load/bigquery/test_bigquery_client.py @@ -86,23 +86,22 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) assert r_job.status() == "completed" -@pytest.mark.skip() +#@pytest.mark.skip() def test_bigquery_location(client: BigQueryClient, file_storage: FileStorage) -> None: user_table_name = prepare_event_user_table(client) load_json = { "_dlt_id": uniq_id(), "_dlt_root_id": uniq_id(), - "sender_id":'90238094809sajlkjxoiewjhduuiuehd', + "sender_id": '90238094809sajlkjxoiewjhduuiuehd', "timestamp": str(pendulum.now()) } - #job = expect_load_file(client, file_storage, json.dumps(load_json), user_table_name) + job = expect_load_file(client, file_storage, json.dumps(load_json), user_table_name) # start a job from the same file. it should fallback to retrieve job silently - #r_job = client.start_file_load(client.schema.get_table(user_table_name), file_storage._make_path(job.file_name())) - #canonical_name = client.sql_client.make_qualified_table_name(user_table_name) - #native_client = client.sql_client.native_connection() - #t = native_client.get_table(canonical_name) - #assert t.location == 'US' + client.start_file_load(client.schema.get_table(user_table_name), file_storage._make_path(job.file_name())) + canonical_name = client.sql_client.make_qualified_table_name(user_table_name) + t = client.sql_client.native_connection.get_table(canonical_name) + assert t.location == 'US' def test_loading_errors(client: BigQueryClient, file_storage: FileStorage) -> None: From 6ef5b3de9fb6979f905cef0415bbdc89dd387905 Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 20 Sep 2022 13:13:09 +0200 Subject: [PATCH 11/17] get table location --- tests/load/bigquery/test_bigquery_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/load/bigquery/test_bigquery_client.py b/tests/load/bigquery/test_bigquery_client.py index b313de5ce9..8b471807c0 100644 --- a/tests/load/bigquery/test_bigquery_client.py +++ b/tests/load/bigquery/test_bigquery_client.py @@ -86,7 +86,7 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) assert r_job.status() == "completed" -#@pytest.mark.skip() +@pytest.mark.skip() def test_bigquery_location(client: BigQueryClient, file_storage: FileStorage) -> None: user_table_name = prepare_event_user_table(client) load_json = { From bed93269f0ea9672974ed075956939b75e05d8fc Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 20 Sep 2022 15:26:00 +0200 Subject: [PATCH 12/17] allows to parametrize load clients in tests --- tests/load/bigquery/test_bigquery_client.py | 56 +++++++-------------- tests/load/redshift/test_redshift_client.py | 21 -------- tests/load/test_client.py | 11 +++- tests/load/utils.py | 17 +++++-- 4 files changed, 41 insertions(+), 64 deletions(-) diff --git a/tests/load/bigquery/test_bigquery_client.py b/tests/load/bigquery/test_bigquery_client.py index 8b471807c0..cf740cd230 100644 --- a/tests/load/bigquery/test_bigquery_client.py +++ b/tests/load/bigquery/test_bigquery_client.py @@ -13,7 +13,7 @@ from dlt.load.bigquery.client import BigQueryClient from tests.utils import TEST_STORAGE, delete_storage -from tests.load.utils import expect_load_file, prepare_table, yield_client_with_storage +from tests.load.utils import cm_yield_client_with_storage, expect_load_file, prepare_table, prepare_event_user_table, yield_client_with_storage @pytest.fixture(scope="module") @@ -31,27 +31,6 @@ def auto_delete_storage() -> None: delete_storage() -def test_default_schema_name_init_storage(client: BigQueryClient) -> None: - e_client: BigQueryClient = None - # pass the schema that is a default schema. that should create dataset with the name `DEFAULT_DATASET` - with Load.import_client_cls( - "bigquery", - initial_values={ - "DEFAULT_DATASET": client.CONFIG.DEFAULT_DATASET, - "DEFAULT_SCHEMA_NAME": "default" - })(Schema("default") - ) as e_client: - e_client.initialize_storage() - try: - # schema was created with the name of just schema prefix - assert e_client.sql_client.default_dataset_name == client.CONFIG.DEFAULT_DATASET - # update schema - e_client.update_storage_schema() - assert e_client._get_schema_version_from_storage() == 1 - finally: - e_client.sql_client.drop_dataset() - - def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) -> None: # non existing job with pytest.raises(LoadJobNotExistsException): @@ -86,22 +65,23 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) assert r_job.status() == "completed" -@pytest.mark.skip() -def test_bigquery_location(client: BigQueryClient, file_storage: FileStorage) -> None: - user_table_name = prepare_event_user_table(client) - load_json = { - "_dlt_id": uniq_id(), - "_dlt_root_id": uniq_id(), - "sender_id": '90238094809sajlkjxoiewjhduuiuehd', - "timestamp": str(pendulum.now()) - } - job = expect_load_file(client, file_storage, json.dumps(load_json), user_table_name) - - # start a job from the same file. it should fallback to retrieve job silently - client.start_file_load(client.schema.get_table(user_table_name), file_storage._make_path(job.file_name())) - canonical_name = client.sql_client.make_qualified_table_name(user_table_name) - t = client.sql_client.native_connection.get_table(canonical_name) - assert t.location == 'US' +@pytest.mark.parametrize('location', ["US"]) +def test_bigquery_location(location: str, file_storage: FileStorage) -> None: + with cm_yield_client_with_storage("bigquery", initial_values={"LOCATION": location}) as client: + user_table_name = prepare_event_user_table(client) + load_json = { + "_dlt_id": uniq_id(), + "_dlt_root_id": uniq_id(), + "sender_id": '90238094809sajlkjxoiewjhduuiuehd', + "timestamp": str(pendulum.now()) + } + job = expect_load_file(client, file_storage, json.dumps(load_json), user_table_name) + + # start a job from the same file. it should fallback to retrieve job silently + client.start_file_load(client.schema.get_table(user_table_name), file_storage._make_path(job.file_name())) + canonical_name = client.sql_client.make_qualified_table_name(user_table_name) + t = client.sql_client.native_connection.get_table(canonical_name) + assert t.location == location def test_loading_errors(client: BigQueryClient, file_storage: FileStorage) -> None: diff --git a/tests/load/redshift/test_redshift_client.py b/tests/load/redshift/test_redshift_client.py index ac4fa1a864..c799bc2669 100644 --- a/tests/load/redshift/test_redshift_client.py +++ b/tests/load/redshift/test_redshift_client.py @@ -30,27 +30,6 @@ def client() -> Iterator[RedshiftClient]: yield from yield_client_with_storage("redshift") -def test_default_schema_name_init_storage(client: RedshiftClient) -> None: - e_client: RedshiftClient = None - # will reuse same configuration - with Load.import_client_cls( - "redshift", - initial_values={ - "DEFAULT_DATASET": client.CONFIG.DEFAULT_DATASET, - "DEFAULT_SCHEMA_NAME": "default" - })(Schema("default") - ) as e_client: - e_client.initialize_storage() - try: - # schema was created with the name of just schema prefix - assert e_client.sql_client.default_dataset_name == client.CONFIG.DEFAULT_DATASET - # update schema - e_client.update_storage_schema() - assert e_client._get_schema_version_from_storage() == 1 - finally: - e_client.sql_client.drop_dataset() - - def test_recover_tx_rollback(client: RedshiftClient) -> None: client.update_storage_schema() version_table = client.sql_client.make_qualified_table_name("_dlt_version") diff --git a/tests/load/test_client.py b/tests/load/test_client.py index e78b7ac99e..483b348749 100644 --- a/tests/load/test_client.py +++ b/tests/load/test_client.py @@ -14,10 +14,11 @@ from tests.utils import TEST_STORAGE, delete_storage from tests.common.utils import load_json_case -from tests.load.utils import TABLE_UPDATE, TABLE_ROW, expect_load_file, yield_client_with_storage, write_dataset, prepare_table +from tests.load.utils import TABLE_UPDATE, TABLE_ROW, expect_load_file, yield_client_with_storage, cm_yield_client_with_storage, write_dataset, prepare_table ALL_CLIENTS = ['redshift_client', 'bigquery_client'] +ALL_CLIENT_TYPES = ["bigquery", "redshift"] @pytest.fixture @@ -334,6 +335,14 @@ def test_retrieve_job(client: SqlJobClientBase, file_storage: FileStorage) -> No assert r_job.status() == "completed" +@pytest.mark.parametrize('client_type', ALL_CLIENT_TYPES) +def test_default_schema_name_init_storage(client_type: str) -> None: + with cm_yield_client_with_storage(client_type, initial_values={ + "DEFAULT_SCHEMA_NAME": "event" # pass the schema that is a default schema. that should create dataset with the name `DEFAULT_DATASET` + }) as client: + assert client.sql_client.default_dataset_name == client.CONFIG.DEFAULT_DATASET + + def prepare_schema(client: SqlJobClientBase, case: str) -> None: client.update_storage_schema() rows = load_json_case(case) diff --git a/tests/load/utils.py b/tests/load/utils.py index e1b98071ba..9deed5a806 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -1,5 +1,6 @@ +import contextlib import os -from typing import Any, Iterable, Iterator, List, Sequence, cast, IO +from typing import Any, ContextManager, Iterable, Iterator, List, Sequence, cast, IO from dlt.common import json, Decimal from dlt.common.configuration import make_configuration @@ -103,11 +104,14 @@ def prepare_table(client: JobClientBase, case_name: str = "event_user", table_na return user_table_name -def yield_client_with_storage(client_type: str) -> Iterator[SqlJobClientBase]: + +def yield_client_with_storage(client_type: str, initial_values: StrAny = None) -> Iterator[SqlJobClientBase]: os.environ.pop("DEFAULT_DATASET", None) # create dataset with random name default_dataset = "test_" + uniq_id() - initial_values = {"DEFAULT_DATASET": default_dataset} + client_initial_values = {"DEFAULT_DATASET": default_dataset} + if initial_values is not None: + client_initial_values.update(initial_values) # get event default schema C = make_configuration(SchemaVolumeConfiguration, SchemaVolumeConfiguration, initial_values={ "SCHEMA_VOLUME_PATH": "tests/common/cases/schemas/rasa" @@ -116,12 +120,17 @@ def yield_client_with_storage(client_type: str) -> Iterator[SqlJobClientBase]: schema = schema_storage.load_schema("event") # create client and dataset client: SqlJobClientBase = None - with Load.import_client_cls(client_type, initial_values=initial_values)(schema) as client: + with Load.import_client_cls(client_type, initial_values=client_initial_values)(schema) as client: client.initialize_storage() yield client client.sql_client.drop_dataset() +@contextlib.contextmanager +def cm_yield_client_with_storage(client_type: str, initial_values: StrAny = None) -> ContextManager[SqlJobClientBase]: + return yield_client_with_storage(client_type, initial_values) + + def write_dataset(client: JobClientBase, f: IO[Any], rows: Sequence[StrAny], headers: Iterable[str]) -> None: if client.capabilities()["preferred_loader_file_format"] == "jsonl": write_jsonl(f, rows) From 202f1d0f4f3f57e17ca20a7dceefbdda19a9f545 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 20 Sep 2022 15:29:08 +0200 Subject: [PATCH 13/17] re-submittes bigquery job on backendError --- dlt/load/bigquery/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dlt/load/bigquery/client.py b/dlt/load/bigquery/client.py index 989f2a2bcf..27a73efea4 100644 --- a/dlt/load/bigquery/client.py +++ b/dlt/load/bigquery/client.py @@ -157,12 +157,12 @@ def status(self) -> LoadJobStatus: if reason in BQ_TERMINAL_REASONS: # the job permanently failed for the reason above return "failed" - elif reason in ["backendError", "internalError"]: + elif reason in ["internalError"]: logger.warning(f"Got reason {reason} for job {self.file_name}, job considered still running. ({self.bq_load_job.error_result})") # status of the job could not be obtained, job still running return "running" else: - # retry on all other reasons + # retry on all other reasons, including `backendError` which requires retry when the job is done return "retry" else: return "running" From 83deb25703934cf1d3d81a86adfdc8612a4c0481 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 20 Sep 2022 15:30:55 +0200 Subject: [PATCH 14/17] bumps version to 0.1.0rc14 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 71dd038b07..77de3167b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "python-dlt" -version = "0.1.0rc13" +version = "0.1.0rc14" description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run." authors = ["ScaleVector "] maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ",] From bd92de879a2a5401f34679348c7b2ca8bad99011 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 20 Sep 2022 15:57:14 +0200 Subject: [PATCH 15/17] fixes post rebase conflicts --- tests/load/bigquery/test_bigquery_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/load/bigquery/test_bigquery_client.py b/tests/load/bigquery/test_bigquery_client.py index cf740cd230..842f86dc10 100644 --- a/tests/load/bigquery/test_bigquery_client.py +++ b/tests/load/bigquery/test_bigquery_client.py @@ -13,7 +13,7 @@ from dlt.load.bigquery.client import BigQueryClient from tests.utils import TEST_STORAGE, delete_storage -from tests.load.utils import cm_yield_client_with_storage, expect_load_file, prepare_table, prepare_event_user_table, yield_client_with_storage +from tests.load.utils import cm_yield_client_with_storage, expect_load_file, prepare_table, yield_client_with_storage @pytest.fixture(scope="module") @@ -68,7 +68,7 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) @pytest.mark.parametrize('location', ["US"]) def test_bigquery_location(location: str, file_storage: FileStorage) -> None: with cm_yield_client_with_storage("bigquery", initial_values={"LOCATION": location}) as client: - user_table_name = prepare_event_user_table(client) + user_table_name = prepare_table(client) load_json = { "_dlt_id": uniq_id(), "_dlt_root_id": uniq_id(), From 4145f2a3c6230b456e961bacf9a74d38db0a73db Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Tue, 20 Sep 2022 16:06:39 +0200 Subject: [PATCH 16/17] fixes test filter for github workflows --- .github/workflows/test_loader_bigquery.yml | 4 ++-- .github/workflows/test_loader_redshift.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test_loader_bigquery.yml b/.github/workflows/test_loader_bigquery.yml index 8e51742bf1..0da50817cb 100644 --- a/.github/workflows/test_loader_bigquery.yml +++ b/.github/workflows/test_loader_bigquery.yml @@ -66,11 +66,11 @@ jobs: # run: poetry install --no-interaction - run: | - LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/redshift --ignore=tests/dbt_runner -k '(not redshift_client)' + LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/redshift --ignore=tests/dbt_runner -k '(not redshift)' if: runner.os != 'Windows' name: Run tests Linux/MAC - run: | - poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/redshift --ignore=tests/dbt_runner -m "not forked" -k "(not redshift_client)" + poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/redshift --ignore=tests/dbt_runner -m "not forked" -k "(not redshift)" if: runner.os == 'Windows' name: Run tests Windows shell: cmd diff --git a/.github/workflows/test_loader_redshift.yml b/.github/workflows/test_loader_redshift.yml index 4a2e97cfc3..a928f2234b 100644 --- a/.github/workflows/test_loader_redshift.yml +++ b/.github/workflows/test_loader_redshift.yml @@ -66,11 +66,11 @@ jobs: # run: poetry install --no-interaction - run: | - LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/bigquery --ignore=tests/dbt_runner -k '(not bigquery_client)' + LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/bigquery --ignore=tests/dbt_runner -k '(not bigquery)' if: runner.os != 'Windows' name: Run tests Linux/MAC - run: | - poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/bigquery --ignore=tests/dbt_runner -m "not forked" -k "(not bigquery_client)" + poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/bigquery --ignore=tests/dbt_runner -m "not forked" -k "(not bigquery)" if: runner.os == 'Windows' name: Run tests Windows shell: cmd From 443c475311b7ae4e152c3381ede948b94d4ef3ce Mon Sep 17 00:00:00 2001 From: Adrian Date: Tue, 20 Sep 2022 16:28:03 +0200 Subject: [PATCH 17/17] add "EU" location test --- tests/load/bigquery/test_bigquery_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/load/bigquery/test_bigquery_client.py b/tests/load/bigquery/test_bigquery_client.py index 842f86dc10..e22ea64107 100644 --- a/tests/load/bigquery/test_bigquery_client.py +++ b/tests/load/bigquery/test_bigquery_client.py @@ -65,7 +65,7 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) assert r_job.status() == "completed" -@pytest.mark.parametrize('location', ["US"]) +@pytest.mark.parametrize('location', ["US", "EU"]) def test_bigquery_location(location: str, file_storage: FileStorage) -> None: with cm_yield_client_with_storage("bigquery", initial_values={"LOCATION": location}) as client: user_table_name = prepare_table(client)