Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigquery localisation #62

Merged
merged 17 commits into from
Sep 20, 2022
4 changes: 2 additions & 2 deletions .github/workflows/test_loader_bigquery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ jobs:
# run: poetry install --no-interaction

- run: |
LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/redshift --ignore=tests/dbt_runner -k '(not redshift_client)'
LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/redshift --ignore=tests/dbt_runner -k '(not redshift)'
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/redshift --ignore=tests/dbt_runner -m "not forked" -k "(not redshift_client)"
poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/redshift --ignore=tests/dbt_runner -m "not forked" -k "(not redshift)"
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
4 changes: 2 additions & 2 deletions .github/workflows/test_loader_redshift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ jobs:
# run: poetry install --no-interaction

- run: |
LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/bigquery --ignore=tests/dbt_runner -k '(not bigquery_client)'
LOG_LEVEL=ERROR poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/bigquery --ignore=tests/dbt_runner -k '(not bigquery)'
if: runner.os != 'Windows'
name: Run tests Linux/MAC
- run: |
poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/bigquery --ignore=tests/dbt_runner -m "not forked" -k "(not bigquery_client)"
poetry run pytest tests --ignore=tests/common --ignore=tests/normalize --ignore=tests/load/bigquery --ignore=tests/dbt_runner -m "not forked" -k "(not bigquery)"
if: runner.os == 'Windows'
name: Run tests Windows
shell: cmd
3 changes: 2 additions & 1 deletion dlt/common/configuration/gcp_client_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class GcpClientCredentials(CredentialsConfiguration):
PROJECT_ID: str = None
CRED_TYPE: str = "service_account"
PRIVATE_KEY: TSecretValue = None
LOCATION: str = "US"
TOKEN_URI: str = "https://oauth2.googleapis.com/token"
CLIENT_EMAIL: str = None

Expand All @@ -29,4 +30,4 @@ def as_credentials(cls) -> StrAny:
"private_key": cls.PRIVATE_KEY,
"token_uri": cls.TOKEN_URI,
"client_email": cls.CLIENT_EMAIL
}
}
7 changes: 4 additions & 3 deletions dlt/load/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def open_connection(self) -> None:
credentials = None
else:
credentials = service_account.Credentials.from_service_account_info(self.C.as_credentials())
self._client = bigquery.Client(self.C.PROJECT_ID, credentials=credentials)
self._client = bigquery.Client(self.C.PROJECT_ID, credentials=credentials, location=self.C.LOCATION)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you do an experiment?
line: 204

return BigQueryLoadJob(
                JobClientBase.get_file_name_from_file_path(file_path),
                self._create_load_job(table["name"], table["write_disposition"], file_path),
                self.CREDENTIALS
            )
            

replace self.CREDENTIALS with self.sql_client.native_connection()

and see if all works. then we are sure the same client contexts used in other places is passed to the load job

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are 2 places, start_file_load, and restore_file_load, should I just do the restore or both?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I retract this :) all jobs are already using the client when created so they will be in the right location


def close_connection(self) -> None:
if self._client:
Expand Down Expand Up @@ -157,12 +157,12 @@ def status(self) -> LoadJobStatus:
if reason in BQ_TERMINAL_REASONS:
# the job permanently failed for the reason above
return "failed"
elif reason in ["backendError", "internalError"]:
elif reason in ["internalError"]:
logger.warning(f"Got reason {reason} for job {self.file_name}, job considered still running. ({self.bq_load_job.error_result})")
# status of the job could not be obtained, job still running
return "running"
else:
# retry on all other reasons
# retry on all other reasons, including `backendError` which requires retry when the job is done
return "retry"
else:
return "running"
Expand Down Expand Up @@ -204,6 +204,7 @@ def restore_file_load(self, file_path: str) -> LoadJob:
JobClientBase.get_file_name_from_file_path(file_path),
self._retrieve_load_job(file_path),
self.CREDENTIALS
#self.sql_client.native_connection()
)
except api_core_exceptions.GoogleAPICallError as gace:
reason = self._get_reason_from_errors(gace)
Expand Down
14 changes: 7 additions & 7 deletions dlt/pipeline/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class GCPPipelineCredentials(PipelineCredentials):
DEFAULT_DATASET: str = None
CLIENT_EMAIL: str = None
PRIVATE_KEY: TSecretValue = None
LOCATION: str = "US"
CRED_TYPE: str = "service_account"
TOKEN_URI: str = "https://oauth2.googleapis.com/token"
HTTP_TIMEOUT: float = 15.0
Expand All @@ -48,20 +49,19 @@ def default_dataset(self, new_value: str) -> None:
self.DEFAULT_DATASET = new_value

@classmethod
def from_services_dict(cls, services: StrAny, dataset_prefix: str) -> "GCPPipelineCredentials":
def from_services_dict(cls, services: StrAny, dataset_prefix: str, location: str = "US") -> "GCPPipelineCredentials":
assert dataset_prefix is not None

return cls("bigquery", services["project_id"], dataset_prefix, services["client_email"], services["private_key"])
return cls("bigquery", services["project_id"], dataset_prefix, services["client_email"], services["private_key"], location or cls.LOCATION)

@classmethod
def from_services_file(cls, services_path: str, dataset_prefix: str) -> "GCPPipelineCredentials":
def from_services_file(cls, services_path: str, dataset_prefix: str, location: str = "US") -> "GCPPipelineCredentials":
with open(services_path, "r", encoding="utf-8") as f:
services = json.load(f)
return GCPPipelineCredentials.from_services_dict(services, dataset_prefix)
return GCPPipelineCredentials.from_services_dict(services, dataset_prefix, location)

@classmethod
def default_credentials(cls, dataset_prefix: str, project_id: str = None) -> "GCPPipelineCredentials":
return cls("bigquery", project_id, dataset_prefix, None)
def default_credentials(cls, dataset_prefix: str, project_id: str = None, location: str = None) -> "GCPPipelineCredentials":
return cls("bigquery", project_id, dataset_prefix, None, None, location or cls.LOCATION)
adrianbr marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "python-dlt"
version = "0.1.0rc13"
version = "0.1.0rc14"
description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run."
authors = ["ScaleVector <services@scalevector.ai>"]
maintainers = [ "Marcin Rudolf <marcin@scalevector.ai>", "Adrian Brudaru <adrian@scalevector.ai>",]
Expand Down
42 changes: 20 additions & 22 deletions tests/load/bigquery/test_bigquery_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dlt.load.bigquery.client import BigQueryClient

from tests.utils import TEST_STORAGE, delete_storage
from tests.load.utils import expect_load_file, prepare_table, yield_client_with_storage
from tests.load.utils import cm_yield_client_with_storage, expect_load_file, prepare_table, yield_client_with_storage


@pytest.fixture(scope="module")
Expand All @@ -31,27 +31,6 @@ def auto_delete_storage() -> None:
delete_storage()


def test_default_schema_name_init_storage(client: BigQueryClient) -> None:
e_client: BigQueryClient = None
# pass the schema that is a default schema. that should create dataset with the name `DEFAULT_DATASET`
with Load.import_client_cls(
"bigquery",
initial_values={
"DEFAULT_DATASET": client.CONFIG.DEFAULT_DATASET,
"DEFAULT_SCHEMA_NAME": "default"
})(Schema("default")
) as e_client:
e_client.initialize_storage()
try:
# schema was created with the name of just schema prefix
assert e_client.sql_client.default_dataset_name == client.CONFIG.DEFAULT_DATASET
# update schema
e_client.update_storage_schema()
assert e_client._get_schema_version_from_storage() == 1
finally:
e_client.sql_client.drop_dataset()


def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage) -> None:
# non existing job
with pytest.raises(LoadJobNotExistsException):
Expand Down Expand Up @@ -86,6 +65,25 @@ def test_bigquery_job_errors(client: BigQueryClient, file_storage: FileStorage)
assert r_job.status() == "completed"


@pytest.mark.parametrize('location', ["US", "EU"])
def test_bigquery_location(location: str, file_storage: FileStorage) -> None:
with cm_yield_client_with_storage("bigquery", initial_values={"LOCATION": location}) as client:
user_table_name = prepare_table(client)
load_json = {
"_dlt_id": uniq_id(),
"_dlt_root_id": uniq_id(),
"sender_id": '90238094809sajlkjxoiewjhduuiuehd',
"timestamp": str(pendulum.now())
}
job = expect_load_file(client, file_storage, json.dumps(load_json), user_table_name)

# start a job from the same file. it should fallback to retrieve job silently
client.start_file_load(client.schema.get_table(user_table_name), file_storage._make_path(job.file_name()))
canonical_name = client.sql_client.make_qualified_table_name(user_table_name)
t = client.sql_client.native_connection.get_table(canonical_name)
assert t.location == location


def test_loading_errors(client: BigQueryClient, file_storage: FileStorage) -> None:
user_table_name = prepare_table(client)
# insert into unknown column
Expand Down
21 changes: 0 additions & 21 deletions tests/load/redshift/test_redshift_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,6 @@ def client() -> Iterator[RedshiftClient]:
yield from yield_client_with_storage("redshift")


def test_default_schema_name_init_storage(client: RedshiftClient) -> None:
e_client: RedshiftClient = None
# will reuse same configuration
with Load.import_client_cls(
"redshift",
initial_values={
"DEFAULT_DATASET": client.CONFIG.DEFAULT_DATASET,
"DEFAULT_SCHEMA_NAME": "default"
})(Schema("default")
) as e_client:
e_client.initialize_storage()
try:
# schema was created with the name of just schema prefix
assert e_client.sql_client.default_dataset_name == client.CONFIG.DEFAULT_DATASET
# update schema
e_client.update_storage_schema()
assert e_client._get_schema_version_from_storage() == 1
finally:
e_client.sql_client.drop_dataset()


def test_recover_tx_rollback(client: RedshiftClient) -> None:
client.update_storage_schema()
version_table = client.sql_client.make_qualified_table_name("_dlt_version")
Expand Down
11 changes: 10 additions & 1 deletion tests/load/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@

from tests.utils import TEST_STORAGE, delete_storage
from tests.common.utils import load_json_case
from tests.load.utils import TABLE_UPDATE, TABLE_ROW, expect_load_file, yield_client_with_storage, write_dataset, prepare_table
from tests.load.utils import TABLE_UPDATE, TABLE_ROW, expect_load_file, yield_client_with_storage, cm_yield_client_with_storage, write_dataset, prepare_table


ALL_CLIENTS = ['redshift_client', 'bigquery_client']
ALL_CLIENT_TYPES = ["bigquery", "redshift"]


@pytest.fixture
Expand Down Expand Up @@ -334,6 +335,14 @@ def test_retrieve_job(client: SqlJobClientBase, file_storage: FileStorage) -> No
assert r_job.status() == "completed"


@pytest.mark.parametrize('client_type', ALL_CLIENT_TYPES)
def test_default_schema_name_init_storage(client_type: str) -> None:
with cm_yield_client_with_storage(client_type, initial_values={
"DEFAULT_SCHEMA_NAME": "event" # pass the schema that is a default schema. that should create dataset with the name `DEFAULT_DATASET`
}) as client:
assert client.sql_client.default_dataset_name == client.CONFIG.DEFAULT_DATASET


def prepare_schema(client: SqlJobClientBase, case: str) -> None:
client.update_storage_schema()
rows = load_json_case(case)
Expand Down
17 changes: 13 additions & 4 deletions tests/load/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import os
from typing import Any, Iterable, Iterator, List, Sequence, cast, IO
from typing import Any, ContextManager, Iterable, Iterator, List, Sequence, cast, IO

from dlt.common import json, Decimal
from dlt.common.configuration import make_configuration
Expand Down Expand Up @@ -103,11 +104,14 @@ def prepare_table(client: JobClientBase, case_name: str = "event_user", table_na
return user_table_name


def yield_client_with_storage(client_type: str) -> Iterator[SqlJobClientBase]:

def yield_client_with_storage(client_type: str, initial_values: StrAny = None) -> Iterator[SqlJobClientBase]:
os.environ.pop("DEFAULT_DATASET", None)
# create dataset with random name
default_dataset = "test_" + uniq_id()
initial_values = {"DEFAULT_DATASET": default_dataset}
client_initial_values = {"DEFAULT_DATASET": default_dataset}
if initial_values is not None:
client_initial_values.update(initial_values)
# get event default schema
C = make_configuration(SchemaVolumeConfiguration, SchemaVolumeConfiguration, initial_values={
"SCHEMA_VOLUME_PATH": "tests/common/cases/schemas/rasa"
Expand All @@ -116,12 +120,17 @@ def yield_client_with_storage(client_type: str) -> Iterator[SqlJobClientBase]:
schema = schema_storage.load_schema("event")
# create client and dataset
client: SqlJobClientBase = None
with Load.import_client_cls(client_type, initial_values=initial_values)(schema) as client:
with Load.import_client_cls(client_type, initial_values=client_initial_values)(schema) as client:
client.initialize_storage()
yield client
client.sql_client.drop_dataset()


@contextlib.contextmanager
def cm_yield_client_with_storage(client_type: str, initial_values: StrAny = None) -> ContextManager[SqlJobClientBase]:
return yield_client_with_storage(client_type, initial_values)


def write_dataset(client: JobClientBase, f: IO[Any], rows: Sequence[StrAny], headers: Iterable[str]) -> None:
if client.capabilities()["preferred_loader_file_format"] == "jsonl":
write_jsonl(f, rows)
Expand Down