Skip to content

Commit

Permalink
Add Delta table support for filesystem destination (#1382)
Browse files Browse the repository at this point in the history
* add delta table support for filesystem destination

* Merge branch 'refs/heads/devel' into 978-filesystem-delta-table

* remove duplicate method definition

* make property robust

* exclude high-precision decimal columns

* make delta imports conditional

* include pyarrow in deltalake dependency

* install extra deltalake dependency

* disable high precision decimal arrow test columns by default

* include arrow max precision decimal column

* introduce directory job and refactor delta table code

* refactor delta table load

* revert import changes

* add delta table format child table handling

* make table_format key lookups robust

* write remote path to reference file

* add supported table formats and file format adapter to destination capabilities

* remove jsonl and parquet from table formats

* add object_store rust crate credentials handling

* add deltalake_storage_options to filesystem config

* move function to top level to prevent multiprocessing pickle error

* add new deltalake_storage_options filesystem config key to tests

* replace secrets with dummy values in test

* reorganize object_store rust crate credentials tests

* add delta table format docs

* move delta table logical delete logic to filesystem client

* rename pyarrow lib method names

* rename utils to delta_utils

* import pyarrow from dlt common libs

* move delta lake utitilities to module in dlt common libs

* import delta lake utils early to assert dependencies availability

* handle file format adaptation at table level

* initialize file format variables

* split delta table format tests

* handle table schema is None case

* add test for dynamic dispatching of delta tables

* mark core delta table test as essential

* simplify item normalizer dict key

* make list copy to prevent in place mutations

* add extra deltalake dependency

* only test deltalake lib on local filesystem

* properly evaluates lazy annotations

* uses base FilesystemConfiguration from common in libs

* solves union type reordering due to caching and clash with delta-rs DeltaTable method signature

* creates a table with just root name to cache item normalizers properly

---------

Co-authored-by: Jorrit Sandbrink <sandbj01@heiway.net>
Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
  • Loading branch information
3 people authored Jun 6, 2024
1 parent e78a3c1 commit 1c1ce7e
Show file tree
Hide file tree
Showing 41 changed files with 1,203 additions and 207 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ jobs:
shell: cmd
- name: Install pipeline dependencies
run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E duckdb -E cli -E parquet --with sentry-sdk --with pipeline -E deltalake

- run: |
poetry run pytest tests/extract tests/pipeline tests/libs tests/cli/common tests/destinations
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:

- name: Install dependencies
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E redshift -E gs -E s3 -E az -E parquet -E duckdb -E cli --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline
run: poetry install --no-interaction -E postgres -E duckdb -E parquet -E filesystem -E cli -E weaviate --with sentry-sdk --with pipeline -E deltalake

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ def to_session_credentials(self) -> Dict[str, str]:
aws_session_token=self.aws_session_token,
)

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/aws
assert self.region_name is not None, "`object_store` Rust crate requires AWS region."
creds = self.to_session_credentials()
if creds["aws_session_token"] is None:
creds.pop("aws_session_token")
return {**creds, **{"region": self.region_name}}


@configspec
class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
11 changes: 11 additions & 0 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
sas_token=self.azure_storage_sas_token,
)

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/azure
creds = self.to_adlfs_credentials()
if creds["sas_token"] is None:
creds.pop("sas_token")
return creds

def create_sas_token(self) -> None:
from azure.storage.blob import generate_account_sas, ResourceTypes

Expand Down Expand Up @@ -61,6 +68,10 @@ def to_adlfs_credentials(self) -> Dict[str, Any]:
client_secret=self.azure_client_secret,
)

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/azure
return self.to_adlfs_credentials()


@configspec
class AzureCredentials(AzureCredentialsWithoutDefaults, CredentialsWithDefault):
Expand Down
11 changes: 9 additions & 2 deletions dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import copy
import inspect
import contextlib
import dataclasses
import warnings
Expand Down Expand Up @@ -221,6 +220,11 @@ def wrap(cls: Type[TAnyClass]) -> Type[TAnyClass]:
if att_name not in cls.__annotations__:
raise ConfigFieldMissingTypeHintException(att_name, cls)
hint = cls.__annotations__[att_name]
# resolve the annotation as per PEP 563
# NOTE: we do not use get_type_hints because at this moment cls is an unknown name
# (ie. used as decorator and module is being imported)
if isinstance(hint, str):
hint = eval(hint)

# context can have any type
if not is_valid_hint(hint) and not is_context:
Expand Down Expand Up @@ -321,7 +325,10 @@ def _get_resolvable_dataclass_fields(cls) -> Iterator[TDtcField]:
@classmethod
def get_resolvable_fields(cls) -> Dict[str, type]:
"""Returns a mapping of fields to their type hints. Dunders should not be resolved and are not returned"""
return {f.name: f.type for f in cls._get_resolvable_dataclass_fields()}
return {
f.name: eval(f.type) if isinstance(f.type, str) else f.type # type: ignore[arg-type]
for f in cls._get_resolvable_dataclass_fields()
}

def is_resolved(self) -> bool:
return self.__is_resolved__
Expand Down
13 changes: 12 additions & 1 deletion dlt/common/configuration/specs/gcp_credentials.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import sys
from typing import Any, ClassVar, Final, List, Tuple, Union, Dict
from typing import Any, ClassVar, Final, List, Tuple, Union, Dict, Optional

from dlt.common.json import json
from dlt.common.pendulum import pendulum
Expand Down Expand Up @@ -74,6 +74,7 @@ def to_gcs_credentials(self) -> Dict[str, Any]:
@configspec
class GcpServiceAccountCredentialsWithoutDefaults(GcpCredentials):
private_key: TSecretValue = None
private_key_id: Optional[str] = None
client_email: str = None
type: Final[str] = dataclasses.field( # noqa: A003
default="service_account", init=False, repr=False, compare=False
Expand Down Expand Up @@ -122,6 +123,10 @@ def to_native_credentials(self) -> Any:
else:
return ServiceAccountCredentials.from_service_account_info(self)

def to_object_store_rs_credentials(self) -> Dict[str, str]:
# https://docs.rs/object_store/latest/object_store/gcp
return {"service_account_key": json.dumps(dict(self))}

def __str__(self) -> str:
return f"{self.client_email}@{self.project_id}"

Expand Down Expand Up @@ -171,6 +176,12 @@ def parse_native_representation(self, native_value: Any) -> None:
def to_native_representation(self) -> str:
return json.dumps(self._info_dict())

def to_object_store_rs_credentials(self) -> Dict[str, str]:
raise NotImplementedError(
"`object_store` Rust crate does not support OAuth for GCP credentials. Reference:"
" https://docs.rs/object_store/latest/object_store/gcp."
)

def auth(self, scopes: Union[str, List[str]] = None, redirect_url: str = None) -> None:
if not self.refresh_token:
self.add_scopes(scopes)
Expand Down
34 changes: 32 additions & 2 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
from typing import Any, Callable, ClassVar, List, Literal, Optional, Sequence, Tuple, Set, get_args
from typing import (
Any,
Callable,
ClassVar,
Literal,
Optional,
Sequence,
Tuple,
Set,
Protocol,
get_args,
)

from dlt.common.configuration.utils import serialize_value
from dlt.common.configuration import configspec
Expand All @@ -9,7 +20,6 @@
DestinationLoadingWithoutStagingNotSupported,
)
from dlt.common.utils import identity
from dlt.common.pendulum import pendulum

from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.wei import EVM_DECIMAL_PRECISION
Expand All @@ -23,12 +33,28 @@
ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat))


class LoaderFileFormatAdapter(Protocol):
"""Callback protocol for `loader_file_format_adapter` capability."""

def __call__(
self,
preferred_loader_file_format: TLoaderFileFormat,
supported_loader_file_formats: Sequence[TLoaderFileFormat],
/,
*,
table_schema: "TTableSchema", # type: ignore[name-defined] # noqa: F821
) -> Tuple[TLoaderFileFormat, Sequence[TLoaderFileFormat]]: ...


@configspec
class DestinationCapabilitiesContext(ContainerInjectableContext):
"""Injectable destination capabilities required for many Pipeline stages ie. normalize"""

preferred_loader_file_format: TLoaderFileFormat = None
supported_loader_file_formats: Sequence[TLoaderFileFormat] = None
loader_file_format_adapter: LoaderFileFormatAdapter = None
"""Callable that adapts `preferred_loader_file_format` and `supported_loader_file_formats` at runtime."""
supported_table_formats: Sequence["TTableFormat"] = None # type: ignore[name-defined] # noqa: F821
recommended_file_size: Optional[int] = None
"""Recommended file size in bytes when writing extract/load files"""
preferred_staging_file_format: Optional[TLoaderFileFormat] = None
Expand Down Expand Up @@ -65,14 +91,18 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
@staticmethod
def generic_capabilities(
preferred_loader_file_format: TLoaderFileFormat = None,
loader_file_format_adapter: LoaderFileFormatAdapter = None,
supported_table_formats: Sequence["TTableFormat"] = None, # type: ignore[name-defined] # noqa: F821
) -> "DestinationCapabilitiesContext":
from dlt.common.data_writers.escape import format_datetime_literal

caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = preferred_loader_file_format
caps.supported_loader_file_formats = ["jsonl", "insert_values", "parquet", "csv"]
caps.loader_file_format_adapter = loader_file_format_adapter
caps.preferred_staging_file_format = None
caps.supported_staging_file_formats = []
caps.supported_table_formats = supported_table_formats or []
caps.escape_identifier = identity
caps.escape_literal = serialize_value
caps.format_datetime_literal = format_datetime_literal
Expand Down
5 changes: 4 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.storages.load_package import LoadJobInfo

TLoaderReplaceStrategy = Literal["truncate-and-insert", "insert-from-staging", "staging-optimized"]
TDestinationConfig = TypeVar("TDestinationConfig", bound="DestinationClientConfiguration")
Expand Down Expand Up @@ -312,7 +313,9 @@ def should_truncate_table_before_load(self, table: TTableSchema) -> bool:
return table["write_disposition"] == "replace"

def create_table_chain_completed_followup_jobs(
self, table_chain: Sequence[TTableSchema]
self,
table_chain: Sequence[TTableSchema],
table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None,
) -> List[NewLoadJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
return []
Expand Down
90 changes: 90 additions & 0 deletions dlt/common/libs/deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from typing import Optional, Dict, Union

from dlt import version
from dlt.common import logger
from dlt.common.libs.pyarrow import pyarrow as pa
from dlt.common.libs.pyarrow import dataset_to_table, cast_arrow_schema_types
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.exceptions import MissingDependencyException
from dlt.common.storages import FilesystemConfiguration

try:
from deltalake import write_deltalake
except ModuleNotFoundError:
raise MissingDependencyException(
"dlt deltalake helpers",
[f"{version.DLT_PKG_NAME}[deltalake]"],
"Install `deltalake` so dlt can create Delta tables in the `filesystem` destination.",
)


def ensure_delta_compatible_arrow_table(table: pa.table) -> pa.Table:
"""Returns Arrow table compatible with Delta table format.
Casts table schema to replace data types not supported by Delta.
"""
ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP = {
# maps type check function to type factory function
pa.types.is_null: pa.string(),
pa.types.is_time: pa.string(),
pa.types.is_decimal256: pa.string(), # pyarrow does not allow downcasting to decimal128
}
adjusted_schema = cast_arrow_schema_types(
table.schema, ARROW_TO_DELTA_COMPATIBLE_ARROW_TYPE_MAP
)
return table.cast(adjusted_schema)


def get_delta_write_mode(write_disposition: TWriteDisposition) -> str:
"""Translates dlt write disposition to Delta write mode."""
if write_disposition in ("append", "merge"): # `merge` disposition resolves to `append`
return "append"
elif write_disposition == "replace":
return "overwrite"
else:
raise ValueError(
"`write_disposition` must be `append`, `replace`, or `merge`,"
f" but `{write_disposition}` was provided."
)


def write_delta_table(
path: str,
data: Union[pa.Table, pa.dataset.Dataset],
write_disposition: TWriteDisposition,
storage_options: Optional[Dict[str, str]] = None,
) -> None:
"""Writes in-memory Arrow table to on-disk Delta table."""

table = dataset_to_table(data)

# throws warning for `s3` protocol: https://github.com/delta-io/delta-rs/issues/2460
# TODO: upgrade `deltalake` lib after https://github.com/delta-io/delta-rs/pull/2500
# is released
write_deltalake( # type: ignore[call-overload]
table_or_uri=path,
data=ensure_delta_compatible_arrow_table(table),
mode=get_delta_write_mode(write_disposition),
schema_mode="merge", # enable schema evolution (adding new columns)
storage_options=storage_options,
engine="rust", # `merge` schema mode requires `rust` engine
)


def _deltalake_storage_options(config: FilesystemConfiguration) -> Dict[str, str]:
"""Returns dict that can be passed as `storage_options` in `deltalake` library."""
creds = {}
extra_options = {}
if config.protocol in ("az", "gs", "s3"):
creds = config.credentials.to_object_store_rs_credentials()
if config.deltalake_storage_options is not None:
extra_options = config.deltalake_storage_options
shared_keys = creds.keys() & extra_options.keys()
if len(shared_keys) > 0:
logger.warning(
"The `deltalake_storage_options` configuration dictionary contains "
"keys also provided by dlt's credential system: "
+ ", ".join([f"`{key}`" for key in shared_keys])
+ ". dlt will use the values in `deltalake_storage_options`."
)
return {**creds, **extra_options}
26 changes: 26 additions & 0 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import pyarrow
import pyarrow.parquet
import pyarrow.compute
import pyarrow.dataset
except ModuleNotFoundError:
raise MissingDependencyException(
"dlt pyarrow helpers",
Expand All @@ -37,6 +38,8 @@

TAnyArrowItem = Union[pyarrow.Table, pyarrow.RecordBatch]

ARROW_DECIMAL_MAX_PRECISION = 76


def get_py_arrow_datatype(
column: TColumnType,
Expand Down Expand Up @@ -411,6 +414,29 @@ def pq_stream_with_new_columns(
yield tbl


def dataset_to_table(data: Union[pyarrow.Table, pyarrow.dataset.Dataset]) -> pyarrow.Table:
return data.to_table() if isinstance(data, pyarrow.dataset.Dataset) else data


def cast_arrow_schema_types(
schema: pyarrow.Schema,
type_map: Dict[Callable[[pyarrow.DataType], bool], Callable[..., pyarrow.DataType]],
) -> pyarrow.Schema:
"""Returns type-casted Arrow schema.
Replaces data types for fields matching a type check in `type_map`.
Type check functions in `type_map` are assumed to be mutually exclusive, i.e.
a data type does not match more than one type check function.
"""
for i, e in enumerate(schema.types):
for type_check, cast_type in type_map.items():
if type_check(e):
adjusted_field = schema.field(i).with_type(cast_type)
schema = schema.set(i, adjusted_field)
break # if type matches type check, do not do other type checks
return schema


class NameNormalizationClash(ValueError):
def __init__(self, reason: str) -> None:
msg = f"Arrow column name clash after input data normalization. {reason}"
Expand Down
3 changes: 3 additions & 0 deletions dlt/common/libs/pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def pydantic_to_table_schema_columns(

inner_type = extract_inner_type(annotation)
if is_union_type(inner_type):
# TODO: order those types deterministically before getting first one
# order of the types in union is in many cases not deterministic
# https://docs.python.org/3/library/typing.html#typing.get_args
first_argument_type = get_args(inner_type)[0]
inner_type = extract_inner_type(first_argument_type)

Expand Down
2 changes: 1 addition & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"dedup_sort",
]
"""Known hints of a column used to declare hint regexes."""
TTableFormat = Literal["iceberg", "parquet", "jsonl"]
TTableFormat = Literal["iceberg", "delta"]
TTypeDetections = Literal[
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
]
Expand Down
1 change: 1 addition & 0 deletions dlt/common/storages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class FilesystemConfiguration(BaseConfiguration):
"""Indicates read only filesystem access. Will enable caching"""
kwargs: Optional[DictStrAny] = None
client_kwargs: Optional[DictStrAny] = None
deltalake_storage_options: Optional[DictStrAny] = None

@property
def protocol(self) -> str:
Expand Down
Loading

0 comments on commit 1c1ce7e

Please sign in to comment.