Skip to content

Commit

Permalink
Merge branch 'devel' into feat/2223-improve-error-arrow-typecasting
Browse files Browse the repository at this point in the history
  • Loading branch information
zilto authored Jan 29, 2025
2 parents 28b2244 + faa4d76 commit 711b55e
Show file tree
Hide file tree
Showing 186 changed files with 2,793 additions and 1,665 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ jobs:
run: poetry install --no-interaction --with sentry-sdk

- run: |
poetry run pytest tests/common tests/normalize tests/reflection tests/load/test_dummy_client.py tests/extract/test_extract.py tests/extract/test_sources.py tests/pipeline/test_pipeline_state.py
poetry run pytest tests/common tests/normalize tests/reflection tests/plugins tests/load/test_dummy_client.py tests/extract/test_extract.py tests/extract/test_sources.py tests/pipeline/test_pipeline_state.py
if: runner.os != 'Windows'
name: Run common tests with minimum dependencies Linux/MAC
- run: |
poetry run pytest tests/common tests/normalize tests/reflection tests/load/test_dummy_client.py tests/extract/test_extract.py tests/extract/test_sources.py tests/pipeline/test_pipeline_state.py -m "not forked"
poetry run pytest tests/common tests/normalize tests/reflection tests/plugins tests/load/test_dummy_client.py tests/extract/test_extract.py tests/extract/test_sources.py tests/pipeline/test_pipeline_state.py -m "not forked"
if: runner.os == 'Windows'
name: Run common tests with minimum dependencies Windows
shell: cmd
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ lint:
# $(MAKE) lint-security

format:
poetry run black dlt docs tests --exclude=".*syntax_error.py|\.venv.*|_storage/.*"
poetry run black dlt docs tests --extend-exclude='.*syntax_error.py|_storage/.*'
# poetry run isort ./

lint-snippets:
Expand Down
3 changes: 3 additions & 0 deletions deploy/dlt/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ RUN pip install /tmp/pydlt/dlt-${IMAGE_VERSION}.tar.gz[gcp,redshift,duckdb]

WORKDIR /
RUN rm -r /tmp/pydlt
# make sure dlt can be actually imported
# TODO: pendulum breaks alpine
# RUN python -c 'import dlt'
2 changes: 1 addition & 1 deletion dlt/cli/source_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def detect_source_configs(

for _, source_info in sources.items():
# accept only sources declared in the `init` or `pipeline` modules
if source_info.module.__name__.startswith(module_prefix):
if source_info.ref.startswith(module_prefix):
checked_sources[source_info.name] = source_info
source_config = source_info.SPEC() if source_info.SPEC else BaseConfiguration()
spec_fields = source_config.get_resolvable_fields()
Expand Down
34 changes: 23 additions & 11 deletions dlt/common/configuration/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def with_config(
/,
spec: Type[BaseConfiguration] = None,
sections: Union[str, Tuple[str, ...]] = (),
section_arg_name: str = None,
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: BaseConfiguration = None,
Expand All @@ -48,8 +48,8 @@ def with_config(
/,
spec: Type[BaseConfiguration] = None,
sections: Union[str, Tuple[str, ...]] = (),
section_arg_name: str = None,
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
Expand All @@ -63,8 +63,8 @@ def with_config(
/,
spec: Type[BaseConfiguration] = None,
sections: Union[str, Tuple[str, ...]] = (),
section_arg_name: str = None,
sections_merge_style: ConfigSectionContext.TMergeFunc = ConfigSectionContext.prefer_incoming,
auto_pipeline_section: bool = False,
include_defaults: bool = True,
accept_partial: bool = False,
initial_config: Optional[BaseConfiguration] = None,
Expand All @@ -80,8 +80,9 @@ def with_config(
func (Optional[AnyFun], optional): A function with arguments to be injected. Defaults to None.
spec (Type[BaseConfiguration], optional): A specification of injectable arguments. Defaults to None.
sections (Tuple[str, ...], optional): A set of config sections in which to look for arguments values. Defaults to ().
section_arg_name (bool, optional): Name of the argument in the signature of the decorated function which will be used to extend `sections` tuple.
A top level pipeline section will be added if argument name is `pipeline_name`
prefer_existing_sections: (bool, optional): When joining existing section context, the existing context will be preferred to the one in `sections`. Default: False
auto_pipeline_section (bool, optional): If True, a top level pipeline section will be added if `pipeline_name` argument is present . Defaults to False.
include_defaults (bool, optional): If True then arguments with default values will be included in synthesized spec. If False only the required arguments marked with `dlt.secrets.value` and `dlt.config.value` are included
base (Type[BaseConfiguration], optional): A base class for synthesized spec. Defaults to BaseConfiguration.
lock_context_on_injection (bool, optional): If True, the thread context will be locked during injection to prevent race conditions. Defaults to True.
Expand Down Expand Up @@ -115,7 +116,7 @@ def decorator(f: TFun) -> TFun:
return f

spec_arg: Parameter = None
pipeline_name_arg: Parameter = None
section_name_arg: Parameter = None

for p in sig.parameters.values():
# for all positional parameters that do not have default value, set default
Expand All @@ -124,10 +125,10 @@ def decorator(f: TFun) -> TFun:
if p.annotation is SPEC:
# if any argument has type SPEC then us it to take initial value
spec_arg = p
if p.name == "pipeline_name" and auto_pipeline_section:
# if argument has name pipeline_name and auto_section is used, use it to generate section context
pipeline_name_arg = p
pipeline_name_arg_default = None if p.default == Parameter.empty else p.default
if p.name == section_arg_name:
# add value of section_name_arg to
section_name_arg = p
section_name_arg_default = None if p.default == Parameter.empty else p.default

def resolve_config(
bound_args: inspect.BoundArguments, accept_partial_: bool
Expand All @@ -146,16 +147,27 @@ def resolve_config(
# sections may be a string
if isinstance(curr_sections, str):
curr_sections = (curr_sections,)
# extend sections with section_name_arg
if section_name_arg:
section_extension = bound_args.arguments.get(
section_name_arg.name, section_name_arg_default
)
if section_extension:
curr_sections = (
curr_sections + (section_extension,)
if curr_sections
else (section_extension,)
)

# if one of arguments is spec the use it as initial value
if initial_config:
config = initial_config
elif spec_arg:
config = bound_args.arguments.get(spec_arg.name, None)
# resolve SPEC, also provide section_context with pipeline_name
if pipeline_name_arg:
if section_name_arg and section_name_arg.name == "pipeline_name":
curr_pipeline_name = bound_args.arguments.get(
pipeline_name_arg.name, pipeline_name_arg_default
section_name_arg.name, section_name_arg_default
)
else:
curr_pipeline_name = None
Expand Down
27 changes: 22 additions & 5 deletions dlt/common/configuration/plugins.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import ClassVar
import os
from typing import ClassVar, List
import pluggy
import importlib.metadata

from dlt.common.configuration.specs.base_configuration import ContainerInjectableContext
from dlt.common.known_env import DLT_DISABLE_PLUGINS

hookspec = pluggy.HookspecMarker("dlt")
hookimpl = pluggy.HookimplMarker("dlt")
Expand All @@ -12,10 +14,14 @@ class PluginContext(ContainerInjectableContext):
global_affinity: ClassVar[bool] = True

manager: pluggy.PluginManager
plugin_modules: List[str]

def __init__(self) -> None:
super().__init__()
self.manager = pluggy.PluginManager("dlt")
self.plugin_modules = []
if os.environ.get(DLT_DISABLE_PLUGINS, "False").lower() == "true":
return

# TODO: we need to solve circular deps somehow

Expand All @@ -31,7 +37,7 @@ def __init__(self) -> None:
self.manager.add_hookspecs(plugins)
self.manager.register(plugins)

load_setuptools_entrypoints(self.manager)
self.plugin_modules = load_setuptools_entrypoints(self.manager)


def manager() -> pluggy.PluginManager:
Expand All @@ -41,15 +47,21 @@ def manager() -> pluggy.PluginManager:
return Container()[PluginContext].manager


def load_setuptools_entrypoints(m: pluggy.PluginManager) -> None:
def load_setuptools_entrypoints(m: pluggy.PluginManager) -> List[str]:
"""Scans setuptools distributions that are path or have name starting with `dlt-`
loads entry points in group `dlt` and instantiates them to initialize contained plugins
loads entry points in group `dlt` and instantiates them to initialize plugins.
returns a list of names of top level modules/packages from detected entry points.
"""

plugin_modules = []

for dist in list(importlib.metadata.distributions()):
# skip named dists that do not start with dlt-
if hasattr(dist, "name") and (dist.name is None or not dist.name.startswith("dlt-")):
package_name = dist.metadata.get("Name")
if not package_name or not package_name.startswith("dlt-"):
continue

for ep in dist.entry_points:
if (
ep.group != "dlt"
Expand All @@ -61,3 +73,8 @@ def load_setuptools_entrypoints(m: pluggy.PluginManager) -> None:
plugin = ep.load()
m.register(plugin, name=ep.name)
m._plugin_distinfo.append((plugin, pluggy._manager.DistFacade(dist)))
top_module = ep.module.split(".")[0]
if top_module not in plugin_modules:
plugin_modules.append(top_module)

return plugin_modules
9 changes: 9 additions & 0 deletions dlt/common/configuration/specs/azure_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ def to_object_store_rs_credentials(self) -> Dict[str, str]:
creds: Dict[str, Any] = without_none(self.to_adlfs_credentials()) # type: ignore[assignment]
# only string options accepted
creds.pop("anon", None)

if isinstance(self, CredentialsWithDefault) and self.has_default_credentials():
dc = self.default_credentials()

# https://learn.microsoft.com/en-us/azure/storage/blobs/authorize-access-azure-active-directory#microsoft-authentication-library-msal
creds["azure_storage_token"] = dc.get_token("https://storage.azure.com/.default").token

return creds

return creds


Expand Down
13 changes: 12 additions & 1 deletion dlt/common/configuration/specs/known_sections.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
# TODO: register layout types and specs for each top level sections so we can generate full config typing

SOURCES = "sources"
"""a top section holding source and resource configs often within their own sections named after modules they are in"""

DESTINATION = "destination"
"""a top section holding sections named after particular destinations with configurations and credentials."""
"""a top section holding sections named after particular destinations with configurations and credentials. NOTE: will be deprecated"""

DESTINATIONS = "destinations"
"""a top section holding sections named after particular destinations with configurations and credentials. NOTE: not yet supported"""

PIPELINES = "pipelines"
"""a top section holding pipeline configurations"""

DATASETS = "datasets"
"""a top section holding dataset configurations"""

LOAD = "load"
"""load and load storage configuration"""
Expand Down
49 changes: 48 additions & 1 deletion dlt/common/configuration/specs/pluggable_run_context.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from types import ModuleType
from typing import Any, ClassVar, Dict, List, Optional, Protocol

from dlt.common.configuration.providers.provider import ConfigProvider
from dlt.common.configuration.specs.base_configuration import ContainerInjectableContext
from dlt.common.configuration.specs.runtime_configuration import RuntimeConfiguration
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContainer
from dlt.common.utils import uniq_id


class SupportsRunContext(Protocol):
Expand Down Expand Up @@ -34,6 +36,10 @@ def settings_dir(self) -> str:
def data_dir(self) -> str:
"""Defines where the pipelines working folders are stored."""

@property
def module(self) -> Optional[ModuleType]:
"""if run_dir is a top level importable python module, returns it, otherwise return None"""

@property
def runtime_kwargs(self) -> Dict[str, Any]:
"""Additional kwargs used to initialize this instance of run context, used for reloading"""
Expand All @@ -50,16 +56,24 @@ def get_run_entity(self, entity: str) -> str:
def get_setting(self, setting_path: str) -> str:
"""Gets path in settings_dir where setting (ie. `secrets.toml`) are stored"""

def unplug(self) -> None:
"""Called when context removed from container"""

def plug(self) -> None:
"""Called when context is added to container"""


class PluggableRunContext(ContainerInjectableContext):
"""Injectable run context taken via plugin"""

global_affinity: ClassVar[bool] = True

context: SupportsRunContext
context: SupportsRunContext = None
providers: ConfigProvidersContainer
runtime_config: RuntimeConfiguration

_context_stack: List[Any] = []

def __init__(
self, init_context: SupportsRunContext = None, runtime_config: RuntimeConfiguration = None
) -> None:
Expand All @@ -82,9 +96,11 @@ def reload(self, run_dir: Optional[str] = None, runtime_kwargs: Dict[str, Any] =
runtime_kwargs = self.context.runtime_kwargs

self.runtime_config = None
self.before_remove()
self._plug(run_dir, runtime_kwargs=runtime_kwargs)

self.providers = ConfigProvidersContainer(self.context.initial_providers())
self.after_add()
# adds remaining providers and initializes runtime
self.add_extras()

Expand All @@ -95,10 +111,19 @@ def reload_providers(self) -> None:
def after_add(self) -> None:
super().after_add()

# plug context
self.context.plug()

# initialize runtime if context comes back into container
if self.runtime_config:
self.initialize_runtime(self.runtime_config)

def before_remove(self) -> None:
super().before_remove()

if self.context:
self.context.unplug()

def add_extras(self) -> None:
from dlt.common.configuration.resolve import resolve_configuration

Expand All @@ -125,3 +150,25 @@ def _plug(self, run_dir: Optional[str], runtime_kwargs: Dict[str, Any] = None) -
m = plugins.manager()
self.context = m.hook.plug_run_context(run_dir=run_dir, runtime_kwargs=runtime_kwargs)
assert self.context, "plug_run_context hook returned None"

def push_context(self) -> str:
"""Pushes current context on stack and returns assert cookie"""
cookie = uniq_id()
self._context_stack.append((cookie, self.context, self.providers, self.runtime_config))
return cookie

def pop_context(self, cookie: str) -> None:
"""Pops context from stack and re-initializes it if in container"""
_c, context, providers, runtime_config = self._context_stack.pop()
if cookie != _c:
raise ValueError(f"Run context stack mangled. Got cookie {_c} but expected {cookie}")
self.context = context
self.providers = providers
self.initialize_runtime(runtime_config)

def drop_context(self, cookie: str) -> None:
"""Pops context form stack but leaves new context for good"""
state_ = self._context_stack.pop()
_c = state_[0]
if cookie != _c:
raise ValueError(f"Run context stack mangled. Got cookie {_c} but expected {cookie}")
8 changes: 7 additions & 1 deletion dlt/common/destination/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
TLoaderFileFormat,
LOADER_FILE_FORMATS,
)
from dlt.common.destination.reference import TDestinationReferenceArg, Destination, AnyDestination
from dlt.common.destination.reference import (
TDestinationReferenceArg,
Destination,
AnyDestination,
DestinationReference,
)
from dlt.common.destination.typing import PreparedTableSchema

__all__ = [
Expand All @@ -16,4 +21,5 @@
"TDestinationReferenceArg",
"Destination",
"AnyDestination",
"DestinationReference",
]
1 change: 1 addition & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
supported_loader_file_formats: Sequence[TLoaderFileFormat] = None
loader_file_format_selector: LoaderFileFormatSelector = None
"""Callable that adapts `preferred_loader_file_format` and `supported_loader_file_formats` at runtime."""
preferred_table_format: TTableFormat = None
supported_table_formats: Sequence[TTableFormat] = None
type_mapper: Optional[Type[DataTypeMapper]] = None
recommended_file_size: Optional[int] = None
Expand Down
Loading

0 comments on commit 711b55e

Please sign in to comment.