Skip to content

Commit

Permalink
Merge pull request #89 from scale-vector/rfix/improves-pipeline-state…
Browse files Browse the repository at this point in the history
…-management

improves pipeline state management
  • Loading branch information
rudolfix authored Nov 20, 2022
2 parents 00383af + 63d5a80 commit b727b0d
Show file tree
Hide file tree
Showing 30 changed files with 719 additions and 249 deletions.
7 changes: 7 additions & 0 deletions dlt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,16 @@
from dlt.extract.source import with_table_name
from dlt.common.schema import Schema
from dlt.common.configuration.accessors import config, secrets
from dlt.common.typing import TSecretValue as _TSecretValue
from dlt.common.configuration.specs import CredentialsConfiguration as _CredentialsConfiguration

pipeline = _pipeline

TSecretValue = _TSecretValue
"When typing source/resource function arguments indicates that given argument is a secret and should be taken from dlt.secrets. The value itself is a string"

TCredentials = _CredentialsConfiguration
"When typing source/resource function arguments indicates that given argument represents credentials and should be taken from dlt.secrets. Credentials may be string, dictionaries or any other types."

from dlt.__version__ import __version__

Expand Down
112 changes: 103 additions & 9 deletions dlt/common/configuration/accessors.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,114 @@
from typing import ClassVar
import abc
import contextlib
import inspect
from typing import Any, ClassVar, Sequence, Type, TypeVar
from dlt.common import json
from dlt.common.configuration.container import Container

from dlt.common.typing import ConfigValue
from dlt.common.configuration.providers.provider import ConfigProvider
from dlt.common.configuration.resolve import deserialize_value
from dlt.common.configuration.specs import BaseConfiguration
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext
from dlt.common.schema.utils import coerce_value
from dlt.common.typing import AnyType, ConfigValue


class _ConfigAccessor:
"""Configuration accessor class"""
TConfigAny = TypeVar("TConfigAny", bound=Any)

class _Accessor(abc.ABC):

def __getitem__(self, field: str) -> Any:
value = self._get_value(field)
if value is None:
raise KeyError(field)
if isinstance(value, str):
return self._auto_cast(value)
else:
return value


def get(self, field: str, expected_type: Type[TConfigAny] = None) -> TConfigAny:
value: TConfigAny = self._get_value(field, expected_type)
if value is None:
return None
# cast to required type
if expected_type:
if inspect.isclass(expected_type) and issubclass(expected_type, BaseConfiguration):
c = expected_type()
if isinstance(value, dict):
c.update(value)
else:
c.parse_native_representation(value)
return c # type: ignore
else:
return deserialize_value(field, value, expected_type) # type: ignore
else:
return value


@property
@abc.abstractmethod
def config_providers(self) -> Sequence[ConfigProvider]:
pass

def _get_providers_from_context(self) -> Sequence[ConfigProvider]:
return Container()[ConfigProvidersContext].providers

def _auto_cast(self, value: str) -> Any:
# try to cast to bool, int, float and complex (via JSON)
if value.lower() == "true":
return True
if value.lower() == "false":
return False
with contextlib.suppress(ValueError):
return coerce_value("bigint", "text", value)
with contextlib.suppress(ValueError):
return coerce_value("double", "text", value)
with contextlib.suppress(ValueError):
c_v = json.loads(value)
# only lists and dictionaries count
if isinstance(c_v, (list, dict)):
return c_v
return value

def _get_value(self, field: str, type_hint: Type[Any] = AnyType) -> Any:
# split field into namespaces and a key
namespaces = field.split(".")
key = namespaces.pop()
value = None
for provider in self.config_providers:
value, _ = provider.get_value(key, type_hint, *namespaces)
if value is not None:
break
return value


class _ConfigAccessor(_Accessor):
"""Provides direct access to configured values that are not secrets."""

@property
def config_providers(self) -> Sequence[ConfigProvider]:
"""Return a list of config providers, in lookup order"""
return [p for p in self._get_providers_from_context()]

value: ClassVar[None] = ConfigValue
"A placeholder value that represents any argument that should be injected from the available configuration"
"A placeholder that tells dlt to replace it with actual config value during the call to a source or resource decorated function."


class _SecretsAccessor(_Accessor):
"""Provides direct access to secrets."""

@property
def config_providers(self) -> Sequence[ConfigProvider]:
"""Return a list of config providers that can hold secrets, in lookup order"""
return [p for p in self._get_providers_from_context() if p.supports_secrets]

class _SecretsAccessor:
value: ClassVar[None] = ConfigValue
"A placeholder value that represents any secret argument that should be injected from the available secrets"
"A placeholder that tells dlt to replace it with actual secret during the call to a source or resource decorated function."


config = _ConfigAccessor()
"""Configuration with dictionary like access to available keys and groups"""
"""Dictionary-like access to all secrets known to dlt"""

secrets = _SecretsAccessor()
"""Secrets with dictionary like access to available keys and groups"""
"""Dictionary-like access to all config values known to dlt"""
2 changes: 1 addition & 1 deletion dlt/common/configuration/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class FinalConfigFieldException(ConfigurationException):
def __init__(self, spec_name: str, field: str) -> None:
super().__init__(f"Field {field} in spec {spec_name} is final but is being changed by a config provider")

class ConfigValueCannotBeCoercedException(ConfigurationException):
class ConfigValueCannotBeCoercedException(ConfigurationException, ValueError):
"""thrown when value returned by config provider cannot be coerced to hinted type"""

def __init__(self, field_name: str, field_value: Any, hint: type) -> None:
Expand Down
6 changes: 3 additions & 3 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def decorator(f: TFun) -> TFun:

for p in sig.parameters.values():
# for all positional parameters that do not have default value, set default
if hasattr(SPEC, p.name) and p.default == Parameter.empty:
p._default = None # type: ignore
# if hasattr(SPEC, p.name) and p.default == Parameter.empty:
# p._default = None # type: ignore
if p.annotation is SPEC:
# if any argument has type SPEC then us it to take initial value
spec_arg = p
Expand Down Expand Up @@ -161,8 +161,8 @@ def _spec_from_signature(name: str, module: ModuleType, sig: Signature, kw_only:
# set annotations
annotations[p.name] = field_type
# set field with default value

fields[p.name] = field_default

# new type goes to the module where sig was declared
fields["__module__"] = module.__name__
# set annotations so they are present in __dict__
Expand Down
14 changes: 10 additions & 4 deletions dlt/common/configuration/providers/toml.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import tomlkit
from typing import Any, Optional, Tuple, Type
from tomlkit.items import Item as TOMLItem
from tomlkit.container import Container as TOMLContainer
from typing import Any, Optional, Tuple, Type, Union

from dlt.common.typing import StrAny

Expand Down Expand Up @@ -30,7 +32,7 @@ def get_key_name(key: str, *namespaces: str) -> str:
def get_value(self, key: str, hint: Type[Any], *namespaces: str) -> Tuple[Optional[Any], str]:
full_path = namespaces + (key,)
full_key = self.get_key_name(key, *namespaces)
node = self._toml
node: Union[TOMLContainer, TOMLItem] = self._toml
try:
for k in full_path:
if not isinstance(node, dict):
Expand All @@ -44,14 +46,18 @@ def get_value(self, key: str, hint: Type[Any], *namespaces: str) -> Tuple[Option
def supports_namespaces(self) -> bool:
return True

def _write_toml(self) -> None:
with open(self._toml_path, "w", encoding="utf-8") as f:
tomlkit.dump(self._toml, f)

@staticmethod
def _read_toml(toml_path: str) -> StrAny:
def _read_toml(toml_path: str) -> tomlkit.TOMLDocument:
if os.path.isfile(toml_path):
with open(toml_path, "r", encoding="utf-8") as f:
# use whitespace preserving parser
return tomlkit.load(f)
else:
return {}
return tomlkit.document()


class ConfigTomlProvider(TomlProvider):
Expand Down
Loading

0 comments on commit b727b0d

Please sign in to comment.