diff --git a/dlt/__init__.py b/dlt/__init__.py index 9b4c540657..bd4614a561 100644 --- a/dlt/__init__.py +++ b/dlt/__init__.py @@ -28,9 +28,16 @@ from dlt.extract.source import with_table_name from dlt.common.schema import Schema from dlt.common.configuration.accessors import config, secrets +from dlt.common.typing import TSecretValue as _TSecretValue +from dlt.common.configuration.specs import CredentialsConfiguration as _CredentialsConfiguration pipeline = _pipeline +TSecretValue = _TSecretValue +"When typing source/resource function arguments indicates that given argument is a secret and should be taken from dlt.secrets. The value itself is a string" + +TCredentials = _CredentialsConfiguration +"When typing source/resource function arguments indicates that given argument represents credentials and should be taken from dlt.secrets. Credentials may be string, dictionaries or any other types." from dlt.__version__ import __version__ diff --git a/dlt/common/configuration/accessors.py b/dlt/common/configuration/accessors.py index 5eadb0149a..d34b7425b8 100644 --- a/dlt/common/configuration/accessors.py +++ b/dlt/common/configuration/accessors.py @@ -1,20 +1,114 @@ -from typing import ClassVar +import abc +import contextlib +import inspect +from typing import Any, ClassVar, Sequence, Type, TypeVar +from dlt.common import json +from dlt.common.configuration.container import Container -from dlt.common.typing import ConfigValue +from dlt.common.configuration.providers.provider import ConfigProvider +from dlt.common.configuration.resolve import deserialize_value +from dlt.common.configuration.specs import BaseConfiguration +from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext +from dlt.common.schema.utils import coerce_value +from dlt.common.typing import AnyType, ConfigValue -class _ConfigAccessor: - """Configuration accessor class""" +TConfigAny = TypeVar("TConfigAny", bound=Any) + +class _Accessor(abc.ABC): + + def __getitem__(self, field: str) -> Any: + value = self._get_value(field) + if value is None: + raise KeyError(field) + if isinstance(value, str): + return self._auto_cast(value) + else: + return value + + + def get(self, field: str, expected_type: Type[TConfigAny] = None) -> TConfigAny: + value: TConfigAny = self._get_value(field, expected_type) + if value is None: + return None + # cast to required type + if expected_type: + if inspect.isclass(expected_type) and issubclass(expected_type, BaseConfiguration): + c = expected_type() + if isinstance(value, dict): + c.update(value) + else: + c.parse_native_representation(value) + return c # type: ignore + else: + return deserialize_value(field, value, expected_type) # type: ignore + else: + return value + + + @property + @abc.abstractmethod + def config_providers(self) -> Sequence[ConfigProvider]: + pass + + def _get_providers_from_context(self) -> Sequence[ConfigProvider]: + return Container()[ConfigProvidersContext].providers + + def _auto_cast(self, value: str) -> Any: + # try to cast to bool, int, float and complex (via JSON) + if value.lower() == "true": + return True + if value.lower() == "false": + return False + with contextlib.suppress(ValueError): + return coerce_value("bigint", "text", value) + with contextlib.suppress(ValueError): + return coerce_value("double", "text", value) + with contextlib.suppress(ValueError): + c_v = json.loads(value) + # only lists and dictionaries count + if isinstance(c_v, (list, dict)): + return c_v + return value + + def _get_value(self, field: str, type_hint: Type[Any] = AnyType) -> Any: + # split field into namespaces and a key + namespaces = field.split(".") + key = namespaces.pop() + value = None + for provider in self.config_providers: + value, _ = provider.get_value(key, type_hint, *namespaces) + if value is not None: + break + return value + + +class _ConfigAccessor(_Accessor): + """Provides direct access to configured values that are not secrets.""" + + @property + def config_providers(self) -> Sequence[ConfigProvider]: + """Return a list of config providers, in lookup order""" + return [p for p in self._get_providers_from_context()] value: ClassVar[None] = ConfigValue - "A placeholder value that represents any argument that should be injected from the available configuration" + "A placeholder that tells dlt to replace it with actual config value during the call to a source or resource decorated function." + + +class _SecretsAccessor(_Accessor): + """Provides direct access to secrets.""" + + @property + def config_providers(self) -> Sequence[ConfigProvider]: + """Return a list of config providers that can hold secrets, in lookup order""" + return [p for p in self._get_providers_from_context() if p.supports_secrets] -class _SecretsAccessor: value: ClassVar[None] = ConfigValue - "A placeholder value that represents any secret argument that should be injected from the available secrets" + "A placeholder that tells dlt to replace it with actual secret during the call to a source or resource decorated function." + config = _ConfigAccessor() -"""Configuration with dictionary like access to available keys and groups""" +"""Dictionary-like access to all secrets known to dlt""" secrets = _SecretsAccessor() -"""Secrets with dictionary like access to available keys and groups""" +"""Dictionary-like access to all config values known to dlt""" diff --git a/dlt/common/configuration/exceptions.py b/dlt/common/configuration/exceptions.py index 3537b41e50..0b26612b6b 100644 --- a/dlt/common/configuration/exceptions.py +++ b/dlt/common/configuration/exceptions.py @@ -50,7 +50,7 @@ class FinalConfigFieldException(ConfigurationException): def __init__(self, spec_name: str, field: str) -> None: super().__init__(f"Field {field} in spec {spec_name} is final but is being changed by a config provider") -class ConfigValueCannotBeCoercedException(ConfigurationException): +class ConfigValueCannotBeCoercedException(ConfigurationException, ValueError): """thrown when value returned by config provider cannot be coerced to hinted type""" def __init__(self, field_name: str, field_value: Any, hint: type) -> None: diff --git a/dlt/common/configuration/inject.py b/dlt/common/configuration/inject.py index 7e293d3c49..241aa7f58d 100644 --- a/dlt/common/configuration/inject.py +++ b/dlt/common/configuration/inject.py @@ -56,8 +56,8 @@ def decorator(f: TFun) -> TFun: for p in sig.parameters.values(): # for all positional parameters that do not have default value, set default - if hasattr(SPEC, p.name) and p.default == Parameter.empty: - p._default = None # type: ignore + # if hasattr(SPEC, p.name) and p.default == Parameter.empty: + # p._default = None # type: ignore if p.annotation is SPEC: # if any argument has type SPEC then us it to take initial value spec_arg = p @@ -161,8 +161,8 @@ def _spec_from_signature(name: str, module: ModuleType, sig: Signature, kw_only: # set annotations annotations[p.name] = field_type # set field with default value - fields[p.name] = field_default + # new type goes to the module where sig was declared fields["__module__"] = module.__name__ # set annotations so they are present in __dict__ diff --git a/dlt/common/configuration/providers/toml.py b/dlt/common/configuration/providers/toml.py index 9864d5399e..0288e03e88 100644 --- a/dlt/common/configuration/providers/toml.py +++ b/dlt/common/configuration/providers/toml.py @@ -1,6 +1,8 @@ import os import tomlkit -from typing import Any, Optional, Tuple, Type +from tomlkit.items import Item as TOMLItem +from tomlkit.container import Container as TOMLContainer +from typing import Any, Optional, Tuple, Type, Union from dlt.common.typing import StrAny @@ -30,7 +32,7 @@ def get_key_name(key: str, *namespaces: str) -> str: def get_value(self, key: str, hint: Type[Any], *namespaces: str) -> Tuple[Optional[Any], str]: full_path = namespaces + (key,) full_key = self.get_key_name(key, *namespaces) - node = self._toml + node: Union[TOMLContainer, TOMLItem] = self._toml try: for k in full_path: if not isinstance(node, dict): @@ -44,14 +46,18 @@ def get_value(self, key: str, hint: Type[Any], *namespaces: str) -> Tuple[Option def supports_namespaces(self) -> bool: return True + def _write_toml(self) -> None: + with open(self._toml_path, "w", encoding="utf-8") as f: + tomlkit.dump(self._toml, f) + @staticmethod - def _read_toml(toml_path: str) -> StrAny: + def _read_toml(toml_path: str) -> tomlkit.TOMLDocument: if os.path.isfile(toml_path): with open(toml_path, "r", encoding="utf-8") as f: # use whitespace preserving parser return tomlkit.load(f) else: - return {} + return tomlkit.document() class ConfigTomlProvider(TomlProvider): diff --git a/dlt/common/configuration/resolve.py b/dlt/common/configuration/resolve.py index abd084e750..b890395e23 100644 --- a/dlt/common/configuration/resolve.py +++ b/dlt/common/configuration/resolve.py @@ -4,8 +4,9 @@ from typing import Any, Dict, ContextManager, List, Optional, Sequence, Tuple, Type, TypeVar, get_origin from dlt.common import json, logger +from dlt.common.configuration.providers.provider import ConfigProvider from dlt.common.typing import AnyType, StrAny, TSecretValue, is_final_type, is_optional_type, extract_inner_type -from dlt.common.schema.utils import coerce_type, py_type_to_sc_type +from dlt.common.schema.utils import coerce_value, py_type_to_sc_type from dlt.common.configuration.specs.base_configuration import BaseConfiguration, CredentialsConfiguration, ContainerInjectableContext, get_config_if_union from dlt.common.configuration.specs.config_namespace_context import ConfigNamespacesContext @@ -48,7 +49,7 @@ def deserialize_value(key: str, value: Any, hint: Type[Any]) -> Any: else: # for types that are not complex, reuse schema coercion rules if value_dt != hint_dt: - value = coerce_type(hint_dt, value_dt, value) + value = coerce_value(hint_dt, value_dt, value) return value except ConfigValueCannotBeCoercedException: raise @@ -64,7 +65,28 @@ def serialize_value(value: Any) -> Any: return str(value) # coerce type to text which will use json for mapping and sequences value_dt = py_type_to_sc_type(type(value)) - return coerce_type("text", value_dt, value) + return coerce_value("text", value_dt, value) + + +def is_secret_hint(hint: Type[Any]) -> bool: + return hint is TSecretValue or (inspect.isclass(hint) and issubclass(hint, CredentialsConfiguration)) + + +def is_base_configuration_hint(hint: Type[Any]) -> bool: + return inspect.isclass(hint) and issubclass(hint, BaseConfiguration) + + +def is_context_hint(hint: Type[Any]) -> bool: + return inspect.isclass(hint) and issubclass(hint, ContainerInjectableContext) + + +def extract_inner_hint(hint: Type[Any]) -> Type[Any]: + # extract hint from Optional / Literal / NewType hints + inner_hint = extract_inner_type(hint) + # get base configuration from union type + inner_hint = get_config_if_union(inner_hint) or inner_hint + # extract origin from generic types (ie List[str] -> List) + return get_origin(inner_hint) or inner_hint def inject_namespace(namespace_context: ConfigNamespacesContext, merge_existing: bool = True) -> ContextManager[ConfigNamespacesContext]: @@ -184,12 +206,8 @@ def _resolve_config_field( embedded_namespaces: Tuple[str, ...], accept_partial: bool ) -> Tuple[Any, List[LookupTrace]]: - # extract hint from Optional / Literal / NewType hints - inner_hint = extract_inner_type(hint) - # get base configuration from union type - inner_hint = get_config_if_union(inner_hint) or inner_hint - # extract origin from generic types (ie List[str] -> List) - inner_hint = get_origin(inner_hint) or inner_hint + + inner_hint = extract_inner_hint(hint) if explicit_value is not None: value = explicit_value @@ -199,10 +217,10 @@ def _resolve_config_field( value, traces = _resolve_single_value(key, hint, inner_hint, config_namespace, explicit_namespaces, embedded_namespaces) _log_traces(config, key, hint, value, traces) # contexts must be resolved as a whole - if inspect.isclass(inner_hint) and issubclass(inner_hint, ContainerInjectableContext): + if is_context_hint(inner_hint): pass # if inner_hint is BaseConfiguration then resolve it recursively - elif inspect.isclass(inner_hint) and issubclass(inner_hint, BaseConfiguration): + elif is_base_configuration_hint(inner_hint): if isinstance(value, BaseConfiguration): # if resolved value is instance of configuration (typically returned by context provider) embedded_config = value @@ -282,11 +300,11 @@ def _resolve_single_value( # get providers from container providers_context = container[ConfigProvidersContext] # we may be resolving context - if inspect.isclass(inner_hint) and issubclass(inner_hint, ContainerInjectableContext): + if is_context_hint(inner_hint): # resolve context with context provider and do not look further value, _ = providers_context.context_provider.get_value(key, inner_hint) return value, traces - if inspect.isclass(inner_hint) and issubclass(inner_hint, BaseConfiguration): + if is_base_configuration_hint(inner_hint): # cannot resolve configurations directly return value, traces @@ -296,57 +314,25 @@ def _resolve_single_value( # get additional namespaces to look in from container namespaces_context = container[ConfigNamespacesContext] - - # start looking from the top provider with most specific set of namespaces first - def look_namespaces(pipeline_name: str = None) -> Any: + # start looking from the top provider with most specific set of namespaces first for provider in providers: - if provider.supports_namespaces: + value, provider_traces = resolve_single_provider_value( + provider, + key, + hint, + pipeline_name, + config_namespace, # if explicit namespaces are provided, ignore the injected context - if explicit_namespaces: - ns = list(explicit_namespaces) - else: - ns = list(namespaces_context.namespaces) - # always extend with embedded namespaces - ns.extend(embedded_namespaces) - else: - # if provider does not support namespaces and pipeline name is set then ignore it - if pipeline_name: - continue - else: - # pass empty namespaces - ns = [] + explicit_namespaces or namespaces_context.namespaces, + embedded_namespaces + ) + traces.extend(provider_traces) + if value is not None: + # value found, ignore other providers + break - value = None - while True: - if (pipeline_name or config_namespace) and provider.supports_namespaces: - full_ns = ns.copy() - # pipeline, when provided, is the most outer and always present - if pipeline_name: - full_ns.insert(0, pipeline_name) - # config namespace, is always present and innermost - if config_namespace: - full_ns.append(config_namespace) - else: - full_ns = ns - value, ns_key = provider.get_value(key, hint, *full_ns) - # if secret is obtained from non secret provider, we must fail - cant_hold_it: bool = not provider.supports_secrets and _is_secret_hint(hint) - if value is not None and cant_hold_it: - raise ValueNotSecretException(provider.name, ns_key) - - # create trace, ignore providers that cant_hold_it - if not cant_hold_it: - traces.append(LookupTrace(provider.name, full_ns, ns_key, value)) - - if value is not None: - # value found, ignore other providers - return value - if len(ns) == 0: - # check next provider - break - # pop optional namespaces for less precise lookup - ns.pop() + return value # first try with pipeline name as namespace, if present if namespaces_context.pipeline_name: @@ -358,6 +344,64 @@ def look_namespaces(pipeline_name: str = None) -> Any: return value, traces +def resolve_single_provider_value( + provider: ConfigProvider, + key: str, + hint: Type[Any], + pipeline_name: str = None, + config_namespace: str = None, + explicit_namespaces: Tuple[str, ...] = (), + embedded_namespaces: Tuple[str, ...] = (), + # context_namespaces: Tuple[str, ...] = (), + ) -> Tuple[Optional[Any], List[LookupTrace]]: + traces: List[LookupTrace] = [] + + if provider.supports_namespaces: + ns = list(explicit_namespaces) + # always extend with embedded namespaces + ns.extend(embedded_namespaces) + else: + # if provider does not support namespaces and pipeline name is set then ignore it + if pipeline_name: + return None, traces + else: + # pass empty namespaces + ns = [] + + value = None + while True: + if (pipeline_name or config_namespace) and provider.supports_namespaces: + full_ns = ns.copy() + # pipeline, when provided, is the most outer and always present + if pipeline_name: + full_ns.insert(0, pipeline_name) + # config namespace, is always present and innermost + if config_namespace: + full_ns.append(config_namespace) + else: + full_ns = ns + value, ns_key = provider.get_value(key, hint, *full_ns) + # if secret is obtained from non secret provider, we must fail + cant_hold_it: bool = not provider.supports_secrets and is_secret_hint(hint) + if value is not None and cant_hold_it: + raise ValueNotSecretException(provider.name, ns_key) + + # create trace, ignore providers that cant_hold_it + if not cant_hold_it: + traces.append(LookupTrace(provider.name, full_ns, ns_key, value)) + + if value is not None: + # value found, ignore further namespaces + break + if len(ns) == 0: + # namespaces exhausted + break + # pop optional namespaces for less precise lookup + ns.pop() + + return value, traces + + def _apply_embedded_namespaces_to_config_namespace(config_namespace: str, embedded_namespaces: Tuple[str, ...]) -> Tuple[str, Tuple[str, ...]]: # for the configurations that have __namespace__ (config_namespace) defined and are embedded in other configurations, # the innermost embedded namespace replaces config_namespace @@ -369,7 +413,3 @@ def _apply_embedded_namespaces_to_config_namespace(config_namespace: str, embedd # remove all embedded ns starting with _ return config_namespace, tuple(ns for ns in embedded_namespaces if not ns.startswith("_")) - - -def _is_secret_hint(hint: Type[Any]) -> bool: - return hint is TSecretValue or (inspect.isclass(hint) and issubclass(hint, CredentialsConfiguration)) diff --git a/dlt/common/configuration/specs/gcp_client_credentials.py b/dlt/common/configuration/specs/gcp_client_credentials.py index 90534f3c21..25dfb6b4c3 100644 --- a/dlt/common/configuration/specs/gcp_client_credentials.py +++ b/dlt/common/configuration/specs/gcp_client_credentials.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Final from dlt.common import json from dlt.common.configuration.specs.exceptions import InvalidServicesJson @@ -10,10 +10,10 @@ class GcpClientCredentials(CredentialsConfiguration): project_id: str = None - type: str = "service_account" # noqa: A003 + type: Final[str] = "service_account" # noqa: A003 private_key: TSecretValue = None location: str = "US" - token_uri: str = "https://oauth2.googleapis.com/token" + token_uri: Final[str] = "https://oauth2.googleapis.com/token" client_email: str = None http_timeout: float = 15.0 diff --git a/dlt/common/json.py b/dlt/common/json.py index 6f2d3cf5d7..e5889cad28 100644 --- a/dlt/common/json.py +++ b/dlt/common/json.py @@ -97,11 +97,11 @@ def custom_pua_decode(obj: Any) -> Any: simplejson.loads = partial(simplejson.loads, use_decimal=False) simplejson.load = partial(simplejson.load, use_decimal=False) # prevent default decimal serializer (use_decimal=False) and binary serializer (encoding=None) -simplejson.dumps = partial(simplejson.dumps, use_decimal=False, default=custom_encode, encoding=None, ensure_ascii=False) -simplejson.dump = partial(simplejson.dump, use_decimal=False, default=custom_encode, encoding=None, ensure_ascii=False) +simplejson.dumps = partial(simplejson.dumps, use_decimal=False, default=custom_encode, encoding=None, ensure_ascii=False, separators=(',', ':')) +simplejson.dump = partial(simplejson.dump, use_decimal=False, default=custom_encode, encoding=None, ensure_ascii=False, separators=(',', ':')) # provide drop-in replacement json = simplejson # helpers for typed dump -json_typed_dumps: Callable[..., str] = partial(simplejson.dumps, use_decimal=False, default=custom_pua_encode, encoding=None, ensure_ascii=False) -json_typed_dump: Callable[..., None] = partial(simplejson.dump, use_decimal=False, default=custom_pua_encode, encoding=None, ensure_ascii=False) +json_typed_dumps: Callable[..., str] = partial(simplejson.dumps, use_decimal=False, default=custom_pua_encode, encoding=None, ensure_ascii=False, separators=(',', ':')) +json_typed_dump: Callable[..., None] = partial(simplejson.dump, use_decimal=False, default=custom_pua_encode, encoding=None, ensure_ascii=False, separators=(',', ':')) diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index b7216456e7..5850923378 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -1,12 +1,14 @@ import os import tempfile -from typing import Any, Callable, ClassVar, Dict, List, NamedTuple, Protocol, Sequence, Tuple +import datetime # noqa: 251 +from typing import Any, Callable, ClassVar, Dict, List, NamedTuple, Optional, Protocol, Sequence, Tuple, TypedDict from dlt.common.configuration.container import ContainerInjectableContext from dlt.common.configuration import configspec from dlt.common.destination import TDestinationReferenceArg from dlt.common.schema import Schema from dlt.common.schema.typing import TColumnSchema, TWriteDisposition +from dlt.common.typing import DictStrAny class LoadInfo(NamedTuple): @@ -33,9 +35,30 @@ def __str__(self) -> str: msg += f"\t{job_id}: {failed_message}\n" return msg + +class TPipelineState(TypedDict, total=False): + """Schema for a pipeline state that is stored within the pipeline working directory""" + pipeline_name: str + dataset_name: str + default_schema_name: Optional[str] + """Name of the first schema added to the pipeline to which all the resources without schemas will be added""" + schema_names: Optional[List[str]] + """All the schemas present within the pipeline working directory""" + destination: Optional[str] + + # properties starting with _ are not automatically applied to pipeline object when state is restored + _state_version: int + _state_engine_version: int + _last_extracted_at: datetime.datetime + """Timestamp indicating when the state was synced with the destination. Lack of timestamp means not synced state.""" + + class SupportsPipeline(Protocol): """A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties""" pipeline_name: str + @property + def state(self) -> TPipelineState: + ... def run( self, diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index ca36f31a20..e08f8850ac 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -360,7 +360,7 @@ def _coerce_non_null_value(self, table_columns: TTableSchemaColumns, table_name: py_type = utils.py_type_to_sc_type(type(v)) # and coerce type if inference changed the python type try: - coerced_v = utils.coerce_type(col_type, py_type, v) + coerced_v = utils.coerce_value(col_type, py_type, v) # print(f"co: {py_type} -> {col_type} {v}") except (ValueError, SyntaxError): if final: @@ -399,7 +399,7 @@ def _infer_column_type(self, v: Any, col_name: str) -> TDataType: if preferred_type: # try to coerce to destination type try: - utils.coerce_type(preferred_type, mapped_type, v) + utils.coerce_value(preferred_type, mapped_type, v) # coercion possible so preferred type may be used mapped_type = preferred_type except ValueError: diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 3bf34a4b23..cd236f1447 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -314,7 +314,7 @@ def py_type_to_sc_type(t: Type[Any]) -> TDataType: raise TypeError(t) -def coerce_type(to_type: TDataType, from_type: TDataType, value: Any) -> Any: +def coerce_value(to_type: TDataType, from_type: TDataType, value: Any) -> Any: if to_type == from_type: if to_type == "complex": # complex types will be always represented as strings diff --git a/dlt/common/validation.py b/dlt/common/validation.py index c13e54dd8b..8029806699 100644 --- a/dlt/common/validation.py +++ b/dlt/common/validation.py @@ -4,11 +4,11 @@ from dlt.common.typing import StrAny, extract_optional_type, is_literal_type, is_optional_type, is_typeddict, is_list_generic_type, is_dict_generic_type, _TypedDict -TFilterFuc = Callable[[str], bool] +TFilterFunc = Callable[[str], bool] TCustomValidator = Callable[[str, str, Any, Any], bool] -def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFuc = None, validator_f: TCustomValidator = None) -> None: +def validate_dict(spec: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFunc = None, validator_f: TCustomValidator = None) -> None: # pass through filter filter_f = filter_f or (lambda _: True) # cannot validate anything diff --git a/dlt/pipeline/exceptions.py b/dlt/pipeline/exceptions.py index dbe6eaeace..48b4fefd08 100644 --- a/dlt/pipeline/exceptions.py +++ b/dlt/pipeline/exceptions.py @@ -46,11 +46,13 @@ def __init__(self, pipeline_name: str, step: TPipelineStep, exception: BaseExcep class PipelineStateNotAvailable(PipelineException): - def __init__(self, is_pipeline_active: bool) -> None: - if is_pipeline_active: - msg = "There is no active pipeline. The resource that requests the access to state requires that dlt.pipeline() was called before it was used" + def __init__(self, source_name: str) -> None: + if source_name: + msg = f"The source {source_name} requested the access to pipeline state but no pipeline is active right now." else: - msg = "Pipeline state is not available. The state is available only within the resource function body ie. decorated with @dlt.source. This problem most often happen if state is accessed in the source function body ie. decorated with @dlt.source" + msg = "The resource you called requested the access to pipeline state but no pipeline is active right now." + msg += " Call dlt.pipeline(...) before you call the @dlt.source or @dlt.resource decorated function." + self.source_name = source_name super().__init__(None, msg) diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index e84143cbd6..5740ac5a13 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -23,12 +23,12 @@ from dlt.common.configuration import inject_namespace from dlt.common.configuration.specs import RunConfiguration, NormalizeVolumeConfiguration, SchemaVolumeConfiguration, LoadVolumeConfiguration, PoolRunnerConfiguration from dlt.common.destination import DestinationCapabilitiesContext, DestinationReference, JobClientBase, DestinationClientConfiguration, DestinationClientDwhConfiguration, TDestinationReferenceArg -from dlt.common.pipeline import LoadInfo +from dlt.common.pipeline import LoadInfo, TPipelineState from dlt.common.schema import Schema from dlt.common.storages.file_storage import FileStorage from dlt.common.utils import is_interactive -from dlt.extract.exceptions import SourceExhausted +from dlt.extract.exceptions import SourceExhausted from dlt.extract.extract import ExtractorStorage, extract from dlt.extract.source import DltResource, DltSource from dlt.normalize import Normalize @@ -40,7 +40,7 @@ from dlt.pipeline.exceptions import CannotRestorePipelineException, InvalidPipelineName, PipelineConfigMissing, PipelineStepFailed, SqlClientNotAvailable from dlt.pipeline.typing import TPipelineStep -from dlt.pipeline.state import STATE_ENGINE_VERSION, TPipelineState, load_state_from_destination, merge_state_if_changed, state_resource, StateInjectableContext +from dlt.pipeline.state import STATE_ENGINE_VERSION, load_state_from_destination, merge_state_if_changed, state_resource, StateInjectableContext def with_state_sync(extract_state: bool = False) -> Callable[[TFun], TFun]: @@ -397,6 +397,10 @@ def schemas(self) -> Mapping[str, Schema]: def default_schema(self) -> Schema: return self.schemas[self.default_schema_name] + @property + def state(self) -> TPipelineState: + return self._get_state() + @property def last_run_exception(self) -> BaseException: return runner.LAST_RUN_EXCEPTION @@ -533,10 +537,11 @@ def _iterate_source(self, source: DltSource, pipeline_schema: Schema, max_parall # TODO: create source context # iterate over all items in the pipeline and update the schema if dynamic table hints were present storage = ExtractorStorage(self._normalize_storage_config) - extractor = extract(source, storage, max_parallel_items=max_parallel_items, workers=workers) - # source iterates - source.exhausted = True + # inject the config namespace with the current source name with inject_namespace(ConfigNamespacesContext(namespaces=("sources", source.name))): + extractor = extract(source, storage, max_parallel_items=max_parallel_items, workers=workers) + # source iterates + source.exhausted = True for _, partials in extractor.items(): for partial in partials: pipeline_schema.update_schema(pipeline_schema.normalize_table_identifiers(partial)) diff --git a/dlt/pipeline/state.py b/dlt/pipeline/state.py index 8bcd185dad..6768852d8b 100644 --- a/dlt/pipeline/state.py +++ b/dlt/pipeline/state.py @@ -1,16 +1,17 @@ -import datetime # noqa: 251 -from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, TypedDict, Optional, cast +import contextlib +from typing import TYPE_CHECKING, Any, ClassVar, Dict, Optional, cast import dlt -from dlt.common import json, pendulum +from dlt.common import json from dlt.common.configuration.container import Container from dlt.common.configuration.exceptions import ContextDefaultCannotBeCreated from dlt.common.configuration.specs import ContainerInjectableContext from dlt.common.configuration.specs.base_configuration import configspec -from dlt.common.pipeline import PipelineContext +from dlt.common.configuration.specs.config_namespace_context import ConfigNamespacesContext +from dlt.common.pipeline import PipelineContext, TPipelineState from dlt.common.typing import DictStrAny -from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common.schema.typing import LOADS_TABLE_NAME, TTableSchemaColumns from dlt.destinations.exceptions import DatabaseUndefinedRelation from dlt.destinations.sql_client import SqlClientBase @@ -52,17 +53,6 @@ } } -class TPipelineState(TypedDict, total=False): - pipeline_name: str - dataset_name: str - default_schema_name: Optional[str] - schema_names: Optional[List[str]] - destination: Optional[str] - # properties starting with _ are not automatically applied to pipeline object when state is restored - _state_version: int - _state_engine_version: int - _last_extracted_at: datetime.datetime - class TSourceState(TPipelineState): sources: Dict[str, Dict[str, Any]] @@ -103,8 +93,7 @@ def state_resource(state: TPipelineState) -> DltResource: def load_state_from_destination(pipeline_name: str, sql_client: SqlClientBase[Any]) -> TPipelineState: try: - # TODO: improve the query to load only from completed load ids - query = f"SELECT state FROM {STATE_TABLE_NAME} WHERE pipeline_name = %s ORDER BY created_at DESC" + query = f"SELECT state FROM {STATE_TABLE_NAME} AS s JOIN {LOADS_TABLE_NAME} AS l ON l.load_id = s._dlt_load_id WHERE pipeline_name = %s AND l.status = 0 ORDER BY created_at DESC" with sql_client.execute_query(query, pipeline_name) as cur: row = cur.fetchone() if not row: @@ -120,12 +109,36 @@ def load_state_from_destination(pipeline_name: str, sql_client: SqlClientBase[An def state() -> DictStrAny: + """Returns a dictionary with the current source state. Any JSON-serializable values can be written and the read from the state. + The state is persisted after the data is successfully read from the source. + """ + global _last_full_state + container = Container() + # get the source name from the namespace context + source_name: str = None + with contextlib.suppress(ContextDefaultCannotBeCreated): + namespaces = container[ConfigNamespacesContext].namespaces + if namespaces and len(namespaces) > 1 and namespaces[0] == "sources": + source_name = namespaces[1] try: + # get managed state that is read/write state: TSourceState = container[StateInjectableContext].state # type: ignore - # TODO: take source context and get dict key by source name - return state.setdefault("sources", {}) except ContextDefaultCannotBeCreated: # check if there's pipeline context proxy = container[PipelineContext] - raise PipelineStateNotAvailable(proxy.is_active()) + if not proxy.is_active(): + raise PipelineStateNotAvailable(source_name) + else: + # get unmanaged state that is read only + state = proxy.pipeline().state # type: ignore + + source_state = state.setdefault("sources", {}) + if source_name: + source_state = source_state.setdefault(source_name, {}) + + # allow inspection of last returned full state + _last_full_state = state + return source_state + +_last_full_state: TPipelineState = None diff --git a/tests/common/cases/schemas/eth/ethereum_schema_v5.yml b/tests/common/cases/schemas/eth/ethereum_schema_v5.yml index ac30e2ee74..f70ebf70f7 100644 --- a/tests/common/cases/schemas/eth/ethereum_schema_v5.yml +++ b/tests/common/cases/schemas/eth/ethereum_schema_v5.yml @@ -1,5 +1,5 @@ -version: 9 -version_hash: VCdpY/nGien9Yz1FA2fge/iu8alntmFRVVPoPsib80I= +version: 10 +version_hash: yUiERp2eM2SEYWyVf8NjRiglWVwb39OPY+VwStAq7uE= engine_version: 5 name: ethereum tables: diff --git a/tests/common/cases/schemas/ev1/event.schema.json b/tests/common/cases/schemas/ev1/event.schema.json index 4dca771818..34f1f4080c 100644 --- a/tests/common/cases/schemas/ev1/event.schema.json +++ b/tests/common/cases/schemas/ev1/event.schema.json @@ -3221,7 +3221,7 @@ } }, "name": "event", - "description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur? At vero eos et accusamus et iusto odio dignissimos ducimus qui blanditiis praesentium voluptatum deleniti atque corrupti quos dolores et quas molestias excepturi sint occaecati cupiditate non provident, similique sunt in culpa qui officia deserunt mollitia animi, id est laborum et dolorum fuga. Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda est, omnis dolor repellendus. Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet ut et voluptates repudiandae sint et molestiae non recusandae. Itaque earum rerum hic tenetur a sapiente delectus, ut aut reiciendis voluptatibus maiores alias consequatur aut perferendis doloribus asperiores repellat", + "description": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur? At vero eos et accusamus et iusto odio dignissimos ducimus qui blanditiis praesentium voluptatum deleniti atque corrupti quos dolores et quas molestias excepturi sint occaecati cupiditate non provident, similique sunt in culpa qui officia deserunt mollitia animi, id est laborum et dolorum fuga. Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda est, omnis dolor repellendus. Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet ut et voluptates repudiandae sint et molestiae non recusandae. Itaque earum rerum hic tenetur a sapiente delectus, ut aut reiciendis voluptatibus maiores alias consequatur aut perferendis doloribus asperiores repellatLorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur? At vero eos et accusamus et iusto odio dignissimos ducimus qui blanditiis praesentium voluptatum deleniti atque corrupti quos dolores et quas molestias excepturi sint occaecati cupiditate non provident, similique sunt in culpa qui officia deserunt mollitia animi, id est laborum et dolorum fuga. Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda est, omnis dolor repellendus. Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet ut et voluptates repudiandae sint et molestiae non recusandae. Itaque earum rerum hic tenetur a sapiente delectus, ut aut reiciendis voluptatibus maiores alias consequatur aut perferendis doloribus asperiores repellatLorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur? At vero eos et accusamus et iusto odio dignissimos ducimus qui blanditiis praesentium voluptatum deleniti atque corrupti quos dolores et quas molestias excepturi sint occaecati cupiditate non provident, similique sunt in culpa qui officia deserunt mollitia animi, id est laborum et dolorum fuga. Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda est, omnis dolor repellendus. Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet ut et voluptates repudiandae sint et molestiae non recusandae. Itaque earum rerum hic tenetur a sapiente delectus, ut aut reiciendis voluptatibus maiores alias consequatur aut perferendis doloribus asperiores repellatLorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur? At vero eos et accusamus et iusto odio dignissimos ducimus qui blanditiis praesentium voluptatum deleniti atque corrupti quos dolores et quas molestias excepturi sint occaecati cupiditate non provident, similique sunt in culpa qui officia deserunt mollitia animi, id est laborum et dolorum fuga. Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda est, omnis dolor repellendus. Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet ut et voluptates repudiandae sint et molestiae non recusandae. Itaque earum rerum hic tenetur a sapiente delectus, ut aut reiciendis voluptatibus maiores alias consequatur aut perferendis doloribus asperiores repellatLorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa. Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur? At vero eos et accusamus et iusto odio dignissimos ducimus qui blanditiis praesentium voluptatum deleniti atque corrupti quos dolores et quas molestias excepturi sint occaecati cupiditate non provident, similique sunt in culpa qui officia deserunt mollitia animi, id est laborum et dolorum fuga. Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est eligendi optio cumque nihil impedit quo minus id quod maxime placeat facere possimus, omnis voluptas assumenda est, omnis dolor repellendus. Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet ut et voluptates repudiandae sint et molestiae non recusandae. Itaque earum rerum hic tenetur a sapiente delectus, ut aut reiciendis voluptatibus maiores alias consequatur aut perferendis doloribus asperiores repellat", "version": 26, "preferred_types": { "^timestamp$": "timestamp", diff --git a/tests/common/configuration/test_accessors.py b/tests/common/configuration/test_accessors.py new file mode 100644 index 0000000000..566d283678 --- /dev/null +++ b/tests/common/configuration/test_accessors.py @@ -0,0 +1,120 @@ +import datetime # noqa: 251 +from typing import Any +import pytest + +import dlt + +from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext +from dlt.common.configuration.specs.gcp_client_credentials import GcpClientCredentials +from dlt.common.configuration.specs.postgres_credentials import ConnectionStringCredentials + +from tests.utils import preserve_environ +from tests.common.configuration.utils import environment, toml_providers + + +def test_accessor_singletons() -> None: + assert dlt.config.value is None + assert dlt.secrets.value is None + + +def test_getter_accessor(toml_providers: ConfigProvidersContext, environment: Any) -> None: + with pytest.raises(KeyError) as py_ex: + dlt.config["_unknown"] + assert py_ex.value.args[0] == "_unknown" + + with pytest.raises(KeyError) as py_ex: + dlt.secrets["_unknown"] + assert py_ex.value.args[0] == "_unknown" + + environment["VALUE"] = "{SET" + assert dlt.config["value"] == "{SET" + assert dlt.secrets["value"] == "{SET" + + # get namespaced values + assert dlt.config["typecheck.str_val"] == "test string" + + environment["DLT__THIS__VALUE"] = "embedded" + assert dlt.config["dlt.this.value"] == "embedded" + assert dlt.secrets["dlt.this.value"] == "embedded" + + +def test_getter_auto_cast(toml_providers: ConfigProvidersContext, environment: Any) -> None: + environment["VALUE"] = "{SET}" + assert dlt.config["value"] == "{SET}" + # bool + environment["VALUE"] = "true" + assert dlt.config["value"] is True + environment["VALUE"] = "False" + assert dlt.config["value"] is False + environment["VALUE"] = "yes" + assert dlt.config["value"] == "yes" + # int + environment["VALUE"] = "17261" + assert dlt.config["value"] == 17261 + environment["VALUE"] = "-17261" + assert dlt.config["value"] == -17261 + # float + environment["VALUE"] = "17261.4" + assert dlt.config["value"] == 17261.4 + environment["VALUE"] = "-10e45" + assert dlt.config["value"] == -10e45 + # list + environment["VALUE"] = "[1,2,3]" + assert dlt.config["value"] == [1, 2, 3] + assert dlt.config["value"][2] == 3 + # dict + environment["VALUE"] = '{"a": 1}' + assert dlt.config["value"] == {"a": 1} + assert dlt.config["value"]["a"] == 1 + # if not dict or list then original string must be returned, null is a JSON -> None + environment["VALUE"] = 'null' + assert dlt.config["value"] == "null" + + # typed values are returned as they are + assert isinstance(dlt.config["typecheck.date_val"], datetime.datetime) + + # access dict from toml + assert dlt.secrets["destination.bigquery"]["client_email"] == "loader@a7513.iam.gserviceaccount.com" + # equivalent + assert dlt.secrets["destination.bigquery.client_email"] == "loader@a7513.iam.gserviceaccount.com" + + +def test_getter_accessor_typed(toml_providers: ConfigProvidersContext, environment: Any) -> None: + # get a dict as str + assert dlt.secrets.get("credentials", str) == '{"secret_value":"2137","project_id":"mock-project-id-credentials"}' + # unchanged type + assert isinstance(dlt.secrets.get("credentials"), dict) + # fail on type coercion + environment["VALUE"] = "a" + with pytest.raises(ValueError): + dlt.config.get("value", int) + # not found -> return none + assert dlt.config.get("_unk") is None + # credentials + c = dlt.secrets.get("databricks.credentials", ConnectionStringCredentials) + assert c.drivername == "databricks+connector" + c = dlt.secrets.get("destination.credentials", GcpClientCredentials) + assert c.client_email == "loader@a7513.iam.gserviceaccount.com" + + +def test_secrets_separation(toml_providers: ConfigProvidersContext) -> None: + # secrets are available both in config and secrets + assert dlt.config.get("credentials") is not None + assert dlt.secrets.get("credentials") is not None + + # configs are not available in secrets + assert dlt.config.get("api_type") is not None + assert dlt.secrets.get("api_type") is None + + +def test_access_access_injection(toml_providers: ConfigProvidersContext) -> None: + + @dlt.source + def the_source(api_type, credentials: GcpClientCredentials, databricks_creds: ConnectionStringCredentials): + assert api_type == "REST" + assert credentials.client_email == "loader@a7513.iam.gserviceaccount.com" + assert databricks_creds.drivername == "databricks+connector" + return dlt.resource([1,2,3], name="data") + + # inject first argument, the rest pass explicitly + the_source(dlt.config.value, dlt.secrets["destination.credentials"], dlt.secrets["databricks.credentials"]) diff --git a/tests/common/configuration/test_configuration.py b/tests/common/configuration/test_configuration.py index 962e7734d3..5260effd90 100644 --- a/tests/common/configuration/test_configuration.py +++ b/tests/common/configuration/test_configuration.py @@ -115,6 +115,13 @@ class EmbeddedSecretConfiguration(BaseConfiguration): secret: SecretConfiguration +@configspec +class NonTemplatedComplexTypesConfiguration(BaseConfiguration): + list_val: list + tuple_val: tuple + dict_val: dict + + LongInteger = NewType("LongInteger", int) FirstOrderStr = NewType("FirstOrderStr", str) SecondOrderStr = NewType("SecondOrderStr", FirstOrderStr) @@ -516,7 +523,7 @@ def test_values_serialization() -> None: # test list t_list = ["a", 3, True] v = resolve.serialize_value(t_list) - assert v == '["a", 3, true]' # json serialization + assert v == '["a",3,true]' # json serialization assert resolve.deserialize_value("K", v, list) == t_list # test datetime @@ -578,6 +585,16 @@ class NoHintConfiguration(BaseConfiguration): NoHintConfiguration() +def test_config_with_non_templated_complex_hints(environment: Any) -> None: + environment["LIST_VAL"] = "[1,2,3]" + environment["TUPLE_VAL"] = "(1,2,3)" + environment["DICT_VAL"] = '{"a": 1}' + c = resolve.resolve_configuration(NonTemplatedComplexTypesConfiguration()) + assert c.list_val == [1,2,3] + assert c.tuple_val == (1,2,3) + assert c.dict_val == {"a": 1} + + def test_resolve_configuration(environment: Any) -> None: # fill up configuration environment["NONECONFIGVAR"] = "1" diff --git a/tests/common/configuration/test_inject.py b/tests/common/configuration/test_inject.py index 4657ca7265..ad47df4c7e 100644 --- a/tests/common/configuration/test_inject.py +++ b/tests/common/configuration/test_inject.py @@ -1,5 +1,6 @@ import inspect from typing import Any, Optional +import dlt from dlt.common import Decimal from dlt.common.typing import TSecretValue @@ -123,7 +124,8 @@ def f_var_env(user, path): assert path == "explicit path" # user will be injected - f_var_env(path="explicit path") + f_var_env(None, path="explicit path") + f_var_env(path="explicit path", user=None) def test_inject_with_non_injectable_param() -> None: @@ -146,7 +148,7 @@ def test_inject_with_auto_namespace(environment: Any) -> None: def f(pipeline_name, value): assert value == "test" - f("pipe") + f("pipe", dlt.config.value) # make sure the spec is available for decorated fun assert get_fun_spec(f) is not None diff --git a/tests/common/configuration/test_namespaces.py b/tests/common/configuration/test_namespaces.py index 3272946eb7..6abd6fc486 100644 --- a/tests/common/configuration/test_namespaces.py +++ b/tests/common/configuration/test_namespaces.py @@ -125,7 +125,7 @@ def test_explicit_namespaces_from_embedded_config(mock_provider: MockProvider) - mock_provider.return_value_on = ("sv_config",) c = resolve.resolve_configuration(EmbeddedConfiguration()) # we mock the dictionary below as the value for all requests - assert c.sv_config.sv == '{"sv": "A"}' + assert c.sv_config.sv == '{"sv":"A"}' # following namespaces were used when resolving EmbeddedConfig: # - initial value for the whole embedded sv_config skipped because it does not have namespace # - then ("sv_config",) resolve sv in sv_config diff --git a/tests/common/configuration/test_toml_provider.py b/tests/common/configuration/test_toml_provider.py index 9ef98936be..e85b01e527 100644 --- a/tests/common/configuration/test_toml_provider.py +++ b/tests/common/configuration/test_toml_provider.py @@ -1,22 +1,20 @@ import pytest -from typing import Any, Iterator +from typing import Any import datetime # noqa: I251 - +import dlt from dlt.common import pendulum from dlt.common.configuration import configspec, ConfigFieldMissingException, resolve -from dlt.common.configuration.container import Container from dlt.common.configuration.inject import with_config from dlt.common.configuration.exceptions import LookupTrace -from dlt.common.configuration.providers.environ import EnvironProvider -from dlt.common.configuration.providers.toml import SecretsTomlProvider, ConfigTomlProvider, TomlProviderReadException +from dlt.common.configuration.providers.toml import ConfigTomlProvider, TomlProviderReadException from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext from dlt.common.configuration.specs import BaseConfiguration, GcpClientCredentials, PostgresCredentials from dlt.common.configuration.specs.postgres_credentials import ConnectionStringCredentials from dlt.common.typing import TSecretValue from tests.utils import preserve_environ -from tests.common.configuration.utils import WithCredentialsConfiguration, CoercionTestConfiguration, COERCIONS, SecretConfiguration, environment +from tests.common.configuration.utils import WithCredentialsConfiguration, CoercionTestConfiguration, COERCIONS, SecretConfiguration, environment, toml_providers @configspec @@ -29,17 +27,6 @@ class EmbeddedWithGcpCredentials(BaseConfiguration): credentials: GcpClientCredentials -@pytest.fixture -def providers() -> Iterator[ConfigProvidersContext]: - pipeline_root = "./tests/common/cases/configuration/.dlt" - ctx = ConfigProvidersContext() - ctx.providers.clear() - ctx.add_provider(SecretsTomlProvider(project_dir=pipeline_root)) - ctx.add_provider(ConfigTomlProvider(project_dir=pipeline_root)) - with Container().injectable_context(ctx): - yield ctx - - def test_secrets_from_toml_secrets() -> None: with pytest.raises(ConfigFieldMissingException) as py_ex: resolve.resolve_configuration(SecretConfiguration()) @@ -54,7 +41,7 @@ def test_secrets_from_toml_secrets() -> None: resolve.resolve_configuration(WithCredentialsConfiguration()) -def test_toml_types(providers: ConfigProvidersContext) -> None: +def test_toml_types(toml_providers: ConfigProvidersContext) -> None: # resolve CoercionTestConfiguration from typecheck namespace c = resolve.resolve_configuration(CoercionTestConfiguration(), namespaces=("typecheck",)) for k, v in COERCIONS.items(): @@ -66,17 +53,17 @@ def test_toml_types(providers: ConfigProvidersContext) -> None: assert v == c[k] -def test_config_provider_order(providers: ConfigProvidersContext, environment: Any) -> None: +def test_config_provider_order(toml_providers: ConfigProvidersContext, environment: Any) -> None: # add env provider - providers.providers.insert(0, EnvironProvider()) + @with_config(namespaces=("api",)) - def single_val(port): + def single_val(port=None): return port # secrets have api.port=1023 and this will be used - assert single_val() == 1023 + assert single_val(None) == 1023 # env will make it string, also namespace is optional environment["PORT"] = "UNKNOWN" @@ -86,27 +73,27 @@ def single_val(port): assert single_val() == "1025" -def test_toml_mixed_config_inject(providers: ConfigProvidersContext) -> None: +def test_toml_mixed_config_inject(toml_providers: ConfigProvidersContext) -> None: # get data from both providers @with_config def mixed_val(api_type, secret_value: TSecretValue, typecheck: Any): return api_type, secret_value, typecheck - _tup = mixed_val() + _tup = mixed_val(dlt.config.value, dlt.secrets.value, dlt.config.value) assert _tup[0] == "REST" assert _tup[1] == "2137" assert isinstance(_tup[2], dict) -def test_toml_namespaces(providers: ConfigProvidersContext) -> None: - cfg = providers["Pipeline config.toml"] +def test_toml_namespaces(toml_providers: ConfigProvidersContext) -> None: + cfg = toml_providers["Pipeline config.toml"] assert cfg.get_value("api_type", str) == ("REST", "api_type") assert cfg.get_value("port", int, "api") == (1024, "api.port") assert cfg.get_value("param1", str, "api", "params") == ("a", "api.params.param1") -def test_secrets_toml_credentials(providers: ConfigProvidersContext) -> None: +def test_secrets_toml_credentials(environment: Any, toml_providers: ConfigProvidersContext) -> None: # there are credentials exactly under destination.bigquery.credentials c = resolve.resolve_configuration(GcpClientCredentials(), namespaces=("destination", "bigquery")) assert c.project_id.endswith("destination.bigquery.credentials") @@ -118,7 +105,7 @@ def test_secrets_toml_credentials(providers: ConfigProvidersContext) -> None: assert c.project_id.endswith("destination.credentials") # there's "credentials" key but does not contain valid gcp credentials with pytest.raises(ConfigFieldMissingException): - resolve.resolve_configuration(GcpClientCredentials()) + print(dict(resolve.resolve_configuration(GcpClientCredentials()))) # also try postgres credentials c = resolve.resolve_configuration(PostgresCredentials(), namespaces=("destination", "redshift")) assert c.database == "destination.redshift.credentials" @@ -127,7 +114,7 @@ def test_secrets_toml_credentials(providers: ConfigProvidersContext) -> None: resolve.resolve_configuration(PostgresCredentials(), namespaces=("destination", "bigquery")) -def test_secrets_toml_embedded_credentials(providers: ConfigProvidersContext) -> None: +def test_secrets_toml_embedded_credentials(environment: Any, toml_providers: ConfigProvidersContext) -> None: # will try destination.bigquery.credentials c = resolve.resolve_configuration(EmbeddedWithGcpCredentials(), namespaces=("destination", "bigquery")) assert c.credentials.project_id.endswith("destination.bigquery.credentials") @@ -156,8 +143,8 @@ def test_secrets_toml_embedded_credentials(providers: ConfigProvidersContext) -> resolve.resolve_configuration(GcpClientCredentials()) -def test_secrets_toml_credentials_from_native_repr(providers: ConfigProvidersContext) -> None: - # cfg = providers["Pipeline secrets.toml"] +def test_secrets_toml_credentials_from_native_repr(environment: Any, toml_providers: ConfigProvidersContext) -> None: + # cfg = toml_providers["Pipeline secrets.toml"] # print(cfg._toml) # print(cfg._toml["source"]["credentials"]) # resolve gcp_credentials by parsing initial value which is str holding json doc @@ -174,8 +161,8 @@ def test_secrets_toml_credentials_from_native_repr(providers: ConfigProvidersCon assert c.query == {"conn_timeout": "15", "search_path": "a,b,c"} -def test_toml_get_key_as_namespace(providers: ConfigProvidersContext) -> None: - cfg = providers["Pipeline secrets.toml"] +def test_toml_get_key_as_namespace(toml_providers: ConfigProvidersContext) -> None: + cfg = toml_providers["Pipeline secrets.toml"] # [credentials] # secret_value="2137" # so the line below will try to use secrets_value value as namespace, this must fallback to not found diff --git a/tests/common/configuration/utils.py b/tests/common/configuration/utils.py index 49eb460f91..8af0346ed9 100644 --- a/tests/common/configuration/utils.py +++ b/tests/common/configuration/utils.py @@ -1,10 +1,12 @@ import pytest from os import environ import datetime # noqa: I251 -from typing import Any, List, Optional, Tuple, Type, Dict, MutableMapping, Optional, Sequence +from typing import Any, Iterator, List, Optional, Tuple, Type, Dict, MutableMapping, Optional, Sequence from dlt.common import Decimal, pendulum from dlt.common.configuration.container import Container +from dlt.common.configuration.providers.environ import EnvironProvider +from dlt.common.configuration.providers.toml import ConfigTomlProvider, SecretsTomlProvider from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext from dlt.common.typing import TSecretValue, StrAny from dlt.common.configuration import configspec @@ -82,6 +84,18 @@ def mock_provider() -> "MockProvider": yield mock_provider +@pytest.fixture +def toml_providers() -> Iterator[ConfigProvidersContext]: + pipeline_root = "./tests/common/cases/configuration/.dlt" + ctx = ConfigProvidersContext() + ctx.providers.clear() + ctx.add_provider(EnvironProvider()) + ctx.add_provider(SecretsTomlProvider(project_dir=pipeline_root)) + ctx.add_provider(ConfigTomlProvider(project_dir=pipeline_root)) + with Container().injectable_context(ctx): + yield ctx + + class MockProvider(ConfigProvider): def __init__(self) -> None: diff --git a/tests/common/schema/test_coercion.py b/tests/common/schema/test_coercion.py index 2c82a95429..2a68f4b29c 100644 --- a/tests/common/schema/test_coercion.py +++ b/tests/common/schema/test_coercion.py @@ -14,82 +14,82 @@ def test_coerce_same_type() -> None: # same type coercion - assert utils.coerce_type("double", "double", 8721.1) == 8721.1 - assert utils.coerce_type("bigint", "bigint", 8721) == 8721 + assert utils.coerce_value("double", "double", 8721.1) == 8721.1 + assert utils.coerce_value("bigint", "bigint", 8721) == 8721 # try various special types for k, v in JSON_TYPED_DICT.items(): typ_ = JSON_TYPED_DICT_TYPES[k] - assert utils.coerce_type(typ_, typ_, v) + assert utils.coerce_value(typ_, typ_, v) def test_coerce_type_to_text() -> None: - assert utils.coerce_type("text", "bool", False) == str(False) + assert utils.coerce_value("text", "bool", False) == str(False) # double into text - assert utils.coerce_type("text", "double", -1726.1288) == "-1726.1288" + assert utils.coerce_value("text", "double", -1726.1288) == "-1726.1288" # bytes to text (base64) - assert utils.coerce_type("text", "binary", b'binary string') == "YmluYXJ5IHN0cmluZw==" + assert utils.coerce_value("text", "binary", b'binary string') == "YmluYXJ5IHN0cmluZw==" # HexBytes to text (hex with prefix) - assert utils.coerce_type("text", "binary", HexBytes(b'binary string')) == "0x62696e61727920737472696e67" + assert utils.coerce_value("text", "binary", HexBytes(b'binary string')) == "0x62696e61727920737472696e67" def test_coerce_type_to_bool() -> None: # text into bool - assert utils.coerce_type("bool", "text", "False") is False - assert utils.coerce_type("bool", "text", "yes") is True - assert utils.coerce_type("bool", "text", "no") is False + assert utils.coerce_value("bool", "text", "False") is False + assert utils.coerce_value("bool", "text", "yes") is True + assert utils.coerce_value("bool", "text", "no") is False # some numeric types - assert utils.coerce_type("bool", "bigint", 1) is True - assert utils.coerce_type("bool", "bigint", 0) is False - assert utils.coerce_type("bool", "decimal", Decimal(1)) is True - assert utils.coerce_type("bool", "decimal", Decimal(0)) is False + assert utils.coerce_value("bool", "bigint", 1) is True + assert utils.coerce_value("bool", "bigint", 0) is False + assert utils.coerce_value("bool", "decimal", Decimal(1)) is True + assert utils.coerce_value("bool", "decimal", Decimal(0)) is False # no coercions with pytest.raises(ValueError): - utils.coerce_type("bool", "complex", {"a": True}) + utils.coerce_value("bool", "complex", {"a": True}) with pytest.raises(ValueError): - utils.coerce_type("bool", "binary", b'True') + utils.coerce_value("bool", "binary", b'True') with pytest.raises(ValueError): - utils.coerce_type("bool", "timestamp", pendulum.now()) + utils.coerce_value("bool", "timestamp", pendulum.now()) def test_coerce_type_to_double() -> None: # bigint into double - assert utils.coerce_type("double", "bigint", 762162) == 762162.0 + assert utils.coerce_value("double", "bigint", 762162) == 762162.0 # text into double if parsable - assert utils.coerce_type("double", "text", " -1726.1288 ") == -1726.1288 + assert utils.coerce_value("double", "text", " -1726.1288 ") == -1726.1288 # hex text into double - assert utils.coerce_type("double", "text", "0xff") == 255.0 + assert utils.coerce_value("double", "text", "0xff") == 255.0 # wei, decimal to double - assert utils.coerce_type("double", "wei", Wei.from_int256(2137, decimals=2)) == 21.37 - assert utils.coerce_type("double", "decimal", Decimal("-1121.11")) == -1121.11 + assert utils.coerce_value("double", "wei", Wei.from_int256(2137, decimals=2)) == 21.37 + assert utils.coerce_value("double", "decimal", Decimal("-1121.11")) == -1121.11 # non parsable text with pytest.raises(ValueError): - utils.coerce_type("double", "text", "a912.12") + utils.coerce_value("double", "text", "a912.12") # bool does not coerce with pytest.raises(ValueError): - utils.coerce_type("double", "bool", False) + utils.coerce_value("double", "bool", False) def test_coerce_type_to_bigint() -> None: - assert utils.coerce_type("bigint", "text", " -1726 ") == -1726 + assert utils.coerce_value("bigint", "text", " -1726 ") == -1726 # for round numerics we can convert - assert utils.coerce_type("bigint", "double", -762162.0) == -762162 - assert utils.coerce_type("bigint", "decimal", Decimal("1276.0")) == 1276 - assert utils.coerce_type("bigint", "wei", Wei("1276.0")) == 1276 + assert utils.coerce_value("bigint", "double", -762162.0) == -762162 + assert utils.coerce_value("bigint", "decimal", Decimal("1276.0")) == 1276 + assert utils.coerce_value("bigint", "wei", Wei("1276.0")) == 1276 # raises when not round with pytest.raises(ValueError): - utils.coerce_type("bigint", "double", 762162.1) + utils.coerce_value("bigint", "double", 762162.1) with pytest.raises(ValueError): - utils.coerce_type("bigint", "decimal", Decimal(912.12)) + utils.coerce_value("bigint", "decimal", Decimal(912.12)) with pytest.raises(ValueError): - utils.coerce_type("bigint", "wei", Wei(912.12)) + utils.coerce_value("bigint", "wei", Wei(912.12)) # non parsable floats and ints with pytest.raises(ValueError): - utils.coerce_type("bigint", "text", "f912.12") + utils.coerce_value("bigint", "text", "f912.12") with pytest.raises(ValueError): - utils.coerce_type("bigint", "text", "912.12") + utils.coerce_value("bigint", "text", "912.12") @pytest.mark.parametrize("dec_cls,data_type", [ @@ -97,101 +97,101 @@ def test_coerce_type_to_bigint() -> None: (Wei, "wei") ]) def test_coerce_to_numeric(dec_cls: Type[Any], data_type: TDataType) -> None: - v = utils.coerce_type(data_type, "text", " -1726.839283 ") + v = utils.coerce_value(data_type, "text", " -1726.839283 ") assert type(v) is dec_cls assert v == dec_cls("-1726.839283") - v = utils.coerce_type(data_type, "bigint", -1726) + v = utils.coerce_value(data_type, "bigint", -1726) assert type(v) is dec_cls assert v == dec_cls("-1726") # mind that 1276.37 does not have binary representation as used in float - v = utils.coerce_type(data_type, "double", 1276.37) + v = utils.coerce_value(data_type, "double", 1276.37) assert type(v) is dec_cls assert v.quantize(Decimal("1.00")) == dec_cls("1276.37") # wei to decimal and reverse - v = utils.coerce_type(data_type, "decimal", Decimal("1276.37")) + v = utils.coerce_value(data_type, "decimal", Decimal("1276.37")) assert type(v) is dec_cls assert v == dec_cls("1276.37") - v = utils.coerce_type(data_type, "wei", Wei("1276.37")) + v = utils.coerce_value(data_type, "wei", Wei("1276.37")) assert type(v) is dec_cls assert v == dec_cls("1276.37") # invalid format with pytest.raises(ValueError): - utils.coerce_type(data_type, "text", "p912.12") + utils.coerce_value(data_type, "text", "p912.12") def test_coerce_type_from_hex_text() -> None: # hex text into various types - assert utils.coerce_type("wei", "text", " 0xff") == 255 - assert utils.coerce_type("bigint", "text", " 0xff") == 255 - assert utils.coerce_type("decimal", "text", " 0xff") == Decimal(255) - assert utils.coerce_type("double", "text", " 0xff") == 255.0 + assert utils.coerce_value("wei", "text", " 0xff") == 255 + assert utils.coerce_value("bigint", "text", " 0xff") == 255 + assert utils.coerce_value("decimal", "text", " 0xff") == Decimal(255) + assert utils.coerce_value("double", "text", " 0xff") == 255.0 def test_coerce_type_to_timestamp() -> None: # timestamp cases - assert utils.coerce_type("timestamp", "text", " 1580405246 ") == pendulum.parse("2020-01-30T17:27:26+00:00") + assert utils.coerce_value("timestamp", "text", " 1580405246 ") == pendulum.parse("2020-01-30T17:27:26+00:00") # the tenths of microseconds will be ignored - assert utils.coerce_type("timestamp", "double", 1633344898.7415245) == pendulum.parse("2021-10-04T10:54:58.741524+00:00") + assert utils.coerce_value("timestamp", "double", 1633344898.7415245) == pendulum.parse("2021-10-04T10:54:58.741524+00:00") # if text is ISO string it will be coerced - assert utils.coerce_type("timestamp", "text", "2022-05-10T03:41:31.466000+00:00") == pendulum.parse("2022-05-10T03:41:31.466000+00:00") - assert utils.coerce_type("timestamp", "text", "2022-05-10T03:41:31.466+02:00") == pendulum.parse("2022-05-10T01:41:31.466Z") - assert utils.coerce_type("timestamp", "text", "2022-05-10T03:41:31.466+0200") == pendulum.parse("2022-05-10T01:41:31.466Z") + assert utils.coerce_value("timestamp", "text", "2022-05-10T03:41:31.466000+00:00") == pendulum.parse("2022-05-10T03:41:31.466000+00:00") + assert utils.coerce_value("timestamp", "text", "2022-05-10T03:41:31.466+02:00") == pendulum.parse("2022-05-10T01:41:31.466Z") + assert utils.coerce_value("timestamp", "text", "2022-05-10T03:41:31.466+0200") == pendulum.parse("2022-05-10T01:41:31.466Z") # parse almost ISO compliant string - assert utils.coerce_type("timestamp", "text", "2022-04-26 10:36+02") == pendulum.parse("2022-04-26T10:36:00+02:00") - assert utils.coerce_type("timestamp", "text", "2022-04-26 10:36") == pendulum.parse("2022-04-26T10:36:00+00:00") + assert utils.coerce_value("timestamp", "text", "2022-04-26 10:36+02") == pendulum.parse("2022-04-26T10:36:00+02:00") + assert utils.coerce_value("timestamp", "text", "2022-04-26 10:36") == pendulum.parse("2022-04-26T10:36:00+00:00") # parse date string - assert utils.coerce_type("timestamp", "text", "2021-04-25") == pendulum.parse("2021-04-25", exact=True) + assert utils.coerce_value("timestamp", "text", "2021-04-25") == pendulum.parse("2021-04-25", exact=True) # fails on "now" - yes pendulum by default parses "now" as .now() with pytest.raises(ValueError): - utils.coerce_type("timestamp", "text", "now") + utils.coerce_value("timestamp", "text", "now") # fails on intervals - pendulum by default parses a string into: datetime, data, time or interval with pytest.raises(ValueError): - utils.coerce_type("timestamp", "text", "2007-03-01T13:00:00Z/2008-05-11T15:30:00Z") + utils.coerce_value("timestamp", "text", "2007-03-01T13:00:00Z/2008-05-11T15:30:00Z") with pytest.raises(ValueError): - utils.coerce_type("timestamp", "text", "2011--20012") + utils.coerce_value("timestamp", "text", "2011--20012") # test wrong unix timestamps with pytest.raises(ValueError): - utils.coerce_type("timestamp", "double", -1000000000000000000000000000) + utils.coerce_value("timestamp", "double", -1000000000000000000000000000) with pytest.raises(ValueError): - utils.coerce_type("timestamp", "double", 1000000000000000000000000000) + utils.coerce_value("timestamp", "double", 1000000000000000000000000000) # formats with timezones are not parsed with pytest.raises(ValueError): - utils.coerce_type("timestamp", "text", "06/04/22, 11:15PM IST") + utils.coerce_value("timestamp", "text", "06/04/22, 11:15PM IST") # we do not parse RFC 822, 2822, 850 etc. with pytest.raises(ValueError): - utils.coerce_type("timestamp", "text", "Wed, 06 Jul 2022 11:58:08 +0200") + utils.coerce_value("timestamp", "text", "Wed, 06 Jul 2022 11:58:08 +0200") with pytest.raises(ValueError): - utils.coerce_type("timestamp", "text", "Tuesday, 13-Sep-22 18:42:31 UTC") + utils.coerce_value("timestamp", "text", "Tuesday, 13-Sep-22 18:42:31 UTC") # time data type not supported yet with pytest.raises(ValueError): - utils.coerce_type("timestamp", "text", "10:36") + utils.coerce_value("timestamp", "text", "10:36") # non parsable datetime with pytest.raises(ValueError): - utils.coerce_type("timestamp", "text", "x2022-05-10T03:41:31.466000X+00:00") + utils.coerce_value("timestamp", "text", "x2022-05-10T03:41:31.466000X+00:00") def test_coerce_type_to_binary() -> None: # from hex string - assert utils.coerce_type("binary", "text", "0x30") == b'0' + assert utils.coerce_value("binary", "text", "0x30") == b'0' # from base64 - assert utils.coerce_type("binary", "text", "YmluYXJ5IHN0cmluZw==") == b'binary string' + assert utils.coerce_value("binary", "text", "YmluYXJ5IHN0cmluZw==") == b'binary string' # int into bytes - assert utils.coerce_type("binary", "bigint", 15) == b"\x0f" + assert utils.coerce_value("binary", "bigint", 15) == b"\x0f" # can't into double with pytest.raises(ValueError): - utils.coerce_type("binary", "double", 912.12) + utils.coerce_value("binary", "double", 912.12) # can't broken base64 with pytest.raises(ValueError): - assert utils.coerce_type("binary", "text", "!YmluYXJ5IHN0cmluZw==") + assert utils.coerce_value("binary", "text", "!YmluYXJ5IHN0cmluZw==") def test_py_type_to_sc_type() -> None: @@ -225,12 +225,12 @@ def test_coerce_type_complex() -> None: v_dict = {"list": [1, 2], "str": "complex"} assert utils.py_type_to_sc_type(type(v_list)) == "complex" assert utils.py_type_to_sc_type(type(v_dict)) == "complex" - assert type(utils.coerce_type("complex", "complex", v_dict)) is str - assert type(utils.coerce_type("complex", "complex", v_list)) is str - assert utils.coerce_type("complex", "complex", v_dict) == json.dumps(v_dict) - assert utils.coerce_type("complex", "complex", v_list) == json.dumps(v_list) - assert utils.coerce_type("text", "complex", v_dict) == json.dumps(v_dict) - assert utils.coerce_type("text", "complex", v_list) == json.dumps(v_list) + assert type(utils.coerce_value("complex", "complex", v_dict)) is str + assert type(utils.coerce_value("complex", "complex", v_list)) is str + assert utils.coerce_value("complex", "complex", v_dict) == json.dumps(v_dict) + assert utils.coerce_value("complex", "complex", v_list) == json.dumps(v_list) + assert utils.coerce_value("text", "complex", v_dict) == json.dumps(v_dict) + assert utils.coerce_value("text", "complex", v_list) == json.dumps(v_list) # all other coercions fail with pytest.raises(ValueError): - utils.coerce_type("binary", "complex", v_list) + utils.coerce_value("binary", "complex", v_list) diff --git a/tests/common/test_json.py b/tests/common/test_json.py index 1fa3c5ef69..4a657c77b1 100644 --- a/tests/common/test_json.py +++ b/tests/common/test_json.py @@ -11,12 +11,12 @@ def test_json_decimals() -> None: # serialize as string s = json.dumps({"decimal": Decimal("21.37")}) - assert s == '{"decimal": "21.37"}' + assert s == '{"decimal":"21.37"}' # serialize max precision which is 10**38 with numeric_default_context(): s = json.dumps({"decimal": Decimal(10**29) - Decimal("0.001")}) - assert s == '{"decimal": "99999999999999999999999999999.999"}' + assert s == '{"decimal":"99999999999999999999999999999.999"}' # serialize untypical context with numeric_default_context(precision=77): @@ -24,7 +24,7 @@ def test_json_decimals() -> None: # serialize out of local context s = json.dumps(doc) # full precision. you need to quantize yourself if you need it - assert s == '{"decimal": "99999999999999999999999999999999999999999999999999999999999999999999999999.999"}' + assert s == '{"decimal":"99999999999999999999999999999999999999999999999999999999999999999999999999.999"}' def test_json_pendulum() -> None: diff --git a/tests/common/utils.py b/tests/common/utils.py index 65fe04c9f1..b50186d083 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -9,7 +9,7 @@ COMMON_TEST_CASES_PATH = "./tests/common/cases/" # for import schema tests, change when upgrading the schema version -IMPORTED_VERSION_HASH_ETH_V5 = "VCdpY/nGien9Yz1FA2fge/iu8alntmFRVVPoPsib80I=" +IMPORTED_VERSION_HASH_ETH_V5 = "yUiERp2eM2SEYWyVf8NjRiglWVwb39OPY+VwStAq7uE=" def load_json_case(name: str) -> Mapping: diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index aa6b569c00..247fc3f46c 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -6,7 +6,7 @@ import dlt from dlt.common import pendulum from dlt.common.schema.schema import Schema, utils -from dlt.common.schema.typing import VERSION_TABLE_NAME +from dlt.common.schema.typing import LOADS_TABLE_NAME, VERSION_TABLE_NAME from dlt.common.utils import uniq_id from dlt.pipeline.exceptions import PipelineConfigMissing from dlt.pipeline.pipeline import Pipeline @@ -177,7 +177,7 @@ def some_data(param: str) -> Any: p = dlt.pipeline(pipeline_name=pipeline_name, destination=destination_name, dataset_name=dataset_name, restore_from_destination=True) assert p.default_schema_name == "default" assert set(p.schema_names) == set(["default", "two", "three"]) - assert p._get_state()["sources"] == {'state1': 'state1', 'state2': 'state2', 'state3': 'state3', 'state4': 'state4'} + assert p.state["sources"] == {"default": {'state1': 'state1', 'state2': 'state2'}, "two": {'state3': 'state3'}, "three": {'state4': 'state4'}} for schema in p.schemas.values(): assert "some_data" in schema._schema_tables @@ -198,6 +198,28 @@ def some_data(param: str) -> Any: p = dlt.pipeline(pipeline_name=pipeline_name, destination=destination_name, restore_from_destination=True) +@pytest.mark.parametrize('destination_name', ALL_DESTINATIONS) +def test_ignore_state_unfinished_load(destination_name: str) -> None: + pipeline_name = "pipe_" + uniq_id() + dataset_name="state_test_" + uniq_id() + p = dlt.pipeline(pipeline_name=pipeline_name, destination=destination_name, dataset_name=dataset_name) + + @dlt.resource + def some_data(param: str) -> Any: + dlt.state()[param] = param + yield param + + info = p.run(some_data("fix_1")) + with p._sql_job_client(p.default_schema) as job_client: + state = load_state_from_destination(pipeline_name, job_client.sql_client) + assert state is not None + # delete load id + job_client.sql_client.execute_sql(f"DELETE FROM {LOADS_TABLE_NAME} WHERE load_id = %s", next(iter(info.loads_ids))) + # state without completed load id is not visible + state = load_state_from_destination(pipeline_name, job_client.sql_client) + assert state is None + + @pytest.mark.parametrize('destination_name', ALL_DESTINATIONS) def test_restore_schemas_while_import_schemas_exist(destination_name: str) -> None: # restored schema should attach itself to imported schema and it should not get overwritten diff --git a/tests/pipeline/test_pipeline_state.py b/tests/pipeline/test_pipeline_state.py new file mode 100644 index 0000000000..ac9e649d36 --- /dev/null +++ b/tests/pipeline/test_pipeline_state.py @@ -0,0 +1,115 @@ +import os +from typing import Any, Iterator + +import pytest + +import dlt +from dlt.common.configuration.container import Container +from dlt.common.exceptions import UnknownDestinationModule +from dlt.common.pipeline import PipelineContext +from dlt.common.schema.exceptions import InvalidDatasetName +from dlt.common.schema import Schema + +from dlt.extract.exceptions import SourceExhausted +from dlt.extract.source import DltSource +from dlt.pipeline import state as state_module +from dlt.pipeline.exceptions import InvalidPipelineName, PipelineStateNotAvailable, PipelineStepFailed + +from tests.utils import ALL_DESTINATIONS, TEST_STORAGE_ROOT, preserve_environ, autouse_test_storage +from tests.common.configuration.utils import environment +from tests.pipeline.utils import drop_dataset_from_env, patch_working_dir, drop_pipeline + +@dlt.resource +def some_data(): + last_value = dlt.state().get("last_value", 0) + yield [1,2,3] + dlt.state()["last_value"] = last_value + 1 + + +def test_managed_state() -> None: + + p = dlt.pipeline(pipeline_name="managed_state") + p.extract(some_data) + # managed state becomes the source name + state = p.state["sources"] + assert "managed_state" in state + assert state["managed_state"]["last_value"] == 1 + # run again - increases the last_value + p.extract(some_data()) + state = p.state["sources"] + assert state["managed_state"]["last_value"] == 2 + # attach to different source that will get separate state + + @dlt.source + def some_source(): + return some_data + + p.extract(some_source()) + state = p.state["sources"] + assert state["some_source"]["last_value"] == 1 + assert state["managed_state"]["last_value"] == 2 + # attach to a different source by forcing to different schema + p.extract(some_data(), schema=Schema("default")) + state = p.state["sources"] + assert state["some_source"]["last_value"] == 1 + assert state["managed_state"]["last_value"] == 2 + assert state["default"]["last_value"] == 1 + + +def test_must_have_active_pipeline() -> None: + # iterate resource + with pytest.raises(PipelineStateNotAvailable) as py_ex: + for _ in some_data(): + pass + assert py_ex.value.source_name is None + + # call source that reads state + @dlt.source + def some_source(): + dlt.state().get("last_value", 0) + return some_data + + with pytest.raises(PipelineStateNotAvailable) as py_ex: + some_source() + assert py_ex.value.source_name == "some_source" + + +def test_unmanaged_state() -> None: + p = dlt.pipeline(pipeline_name="unmanaged") + # evaluate generator that reads and writes state + list(some_data()) + # state is not in pipeline + assert "sources" not in p.state + # but last state global will have it directly in source because resource was evaluated outside of pipeline + assert state_module._last_full_state["sources"]["last_value"] == 1 + # this state is discarded + list(some_data()) + # state is not in pipeline + assert "sources" not in p.state + assert state_module._last_full_state["sources"]["last_value"] == 1 + + @dlt.source + def some_source(): + state = dlt.state() + value = state.get("last_value", 0) + state["last_value"] = value + 1 + return some_data + + some_source() + # this time the source is there + assert state_module._last_full_state["sources"]["some_source"]["last_value"] == 1 + # but the state is discarded + some_source() + assert state_module._last_full_state["sources"]["some_source"]["last_value"] == 1 + + # but when you run it inside pipeline + p.extract(some_source()) + state = p.state["sources"] + assert state["some_source"]["last_value"] == 1 + + # the unmanaged call later gets the correct pipeline state + some_source() + assert state_module._last_full_state["sources"]["some_source"]["last_value"] == 2 + # again - discarded + state = p.state["sources"] + assert state["some_source"]["last_value"] == 1 diff --git a/tests/pipeline/utils.py b/tests/pipeline/utils.py index 6748cbc5fa..d7c3cc16bf 100644 --- a/tests/pipeline/utils.py +++ b/tests/pipeline/utils.py @@ -25,10 +25,13 @@ def patch_working_dir() -> None: @pytest.fixture(autouse=True) def drop_pipeline() -> Iterator[None]: + container = Container() + if container[PipelineContext].is_active(): + container[PipelineContext].deactivate() yield - if Container()[PipelineContext].is_active(): + if container[PipelineContext].is_active(): # take existing pipeline p = dlt.pipeline() p._wipe_working_folder() # deactivate context - Container()[PipelineContext].deactivate() + container[PipelineContext].deactivate() diff --git a/tests/utils.py b/tests/utils.py index afc99e62cd..f210097228 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -19,8 +19,8 @@ TEST_STORAGE_ROOT = "_storage" -ALL_DESTINATIONS = ["bigquery", "redshift", "postgres"] -# ALL_DESTINATIONS = ["postgres"] +# ALL_DESTINATIONS = ["bigquery", "redshift", "postgres"] +ALL_DESTINATIONS = ["postgres"] # add test dictionary provider def TEST_DICT_CONFIG_PROVIDER():