Skip to content

Commit

Permalink
#2166 databricks direct loading (#2219)
Browse files Browse the repository at this point in the history
* databricks: enable local files

* fix: databricks test config

* work in progress

* added create and drop volume to interface

* refactor direct load authentication

* fix databricks volume file name

* refactor databricks direct loading

* format and lint

* revert config.toml changes

* force notebook auth

* enhanced config validations

* force exception

* fix config resolve

* remove imports

* test: config exceptions

* restore comments

* restored destination_config

* fix pokema api values

* enables databricks no stage tests

* fix databricks config on_resolved

* adjusted direct load file management

* direct load docs

* filters by bucket when subset of destinations is set when creating test cases

* simpler file upload

* fix comment

* passes authentication directly from workspace, adds proper fingerprinting

* use real client_id in tests

* fixes config resolver to not pass NotResolved hints to config providers

---------

Co-authored-by: Marcin Rudolf <rudolfix@rudolfix.org>
  • Loading branch information
donotpush and rudolfix authored Feb 1, 2025
1 parent ae18acb commit e9bbeef
Show file tree
Hide file tree
Showing 11 changed files with 547 additions and 139 deletions.
3 changes: 3 additions & 0 deletions dlt/common/configuration/providers/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def get_value(
value, _ = super().get_value(key, hint, pipeline_name, *sections)
if value is None:
# only secrets hints are handled
# TODO: we need to refine how we filer out non-secrets
# at the least we should load known fragments for fields
# that are part of a secret (ie. coming from Credentials)
if self.only_secrets and not is_secret_hint(hint) and hint is not AnyType:
return None, full_key

Expand Down
40 changes: 22 additions & 18 deletions dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,25 @@ def _resolve_config_fields(
if key in config.__hint_resolvers__:
# Type hint for this field is created dynamically
hint = config.__hint_resolvers__[key](config)
# check if hint optional
is_optional = is_optional_type(hint)
# get default and explicit values
default_value = getattr(config, key, None)
explicit_none = False
explicit_value = None
current_value = None
traces: List[LookupTrace] = []

def _set_field() -> None:
# collect unresolved fields
# NOTE: we hide B023 here because the function is called only within a loop
if not is_optional and current_value is None: # noqa
unresolved_fields[key] = traces # noqa
# set resolved value in config
if default_value != current_value: # noqa
setattr(config, key, current_value) # noqa

if explicit_values:
explicit_value = None
if key in explicit_values:
# allow None to be passed in explicit values
# so we are able to reset defaults like in regular function calls
Expand All @@ -211,14 +223,15 @@ def _resolve_config_fields(
# detect dlt.config and dlt.secrets and force injection
if isinstance(explicit_value, ConfigValueSentinel):
explicit_value = None
else:
if is_hint_not_resolvable(hint):
# for final fields default value is like explicit
explicit_value = default_value
else:
explicit_value = None

current_value = None
if is_hint_not_resolvable(hint):
# do not resolve not resolvable, but allow for explicit values to be passed
if not explicit_none:
current_value = default_value if explicit_value is None else explicit_value
traces = [LookupTrace("ExplicitValues", None, key, current_value)]
_set_field()
continue

# explicit none skips resolution
if not explicit_none:
# if hint is union of configurations, any of them must be resolved
Expand Down Expand Up @@ -276,16 +289,7 @@ def _resolve_config_fields(
# set the trace for explicit none
traces = [LookupTrace("ExplicitValues", None, key, None)]

# check if hint optional
is_optional = is_optional_type(hint)
# collect unresolved fields
if not is_optional and current_value is None:
unresolved_fields[key] = traces
# set resolved value in config
if default_value != current_value:
if not is_hint_not_resolvable(hint) or explicit_value is not None or explicit_none:
# ignore final types
setattr(config, key, current_value)
_set_field()

# Check for dynamic hint resolvers which have no corresponding fields
unmatched_hint_resolvers: List[str] = []
Expand Down
81 changes: 71 additions & 10 deletions dlt/destinations/impl/databricks/configuration.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import dataclasses
from typing import ClassVar, Final, Optional, Any, Dict, List

from dlt.common import logger
from dlt.common.typing import TSecretStrValue
from dlt.common.configuration.specs.base_configuration import CredentialsConfiguration, configspec
from dlt.common.destination.client import DestinationClientDwhWithStagingConfiguration
from dlt.common.configuration.exceptions import ConfigurationValueError

from dlt.common.utils import digest128

DATABRICKS_APPLICATION_ID = "dltHub_dlt"


@configspec
class DatabricksCredentials(CredentialsConfiguration):
catalog: str = None
server_hostname: str = None
http_path: str = None
server_hostname: Optional[str] = None
http_path: Optional[str] = None
access_token: Optional[TSecretStrValue] = None
client_id: Optional[TSecretStrValue] = None
client_secret: Optional[TSecretStrValue] = None
Expand All @@ -37,10 +38,57 @@ class DatabricksCredentials(CredentialsConfiguration):

def on_resolved(self) -> None:
if not ((self.client_id and self.client_secret) or self.access_token):
raise ConfigurationValueError(
"No valid authentication method detected. Provide either 'client_id' and"
" 'client_secret' for OAuth, or 'access_token' for token-based authentication."
)
try:
# attempt context authentication
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
self.access_token = w.dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None) # type: ignore[union-attr]
except Exception:
self.access_token = None

try:
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
self.access_token = w.config.authenticate # type: ignore[assignment]
logger.info(f"Will attempt to use default auth of type {w.config.auth_type}")
except Exception:
pass

if not self.access_token:
raise ConfigurationValueError(
"Authentication failed: No valid authentication method detected. "
"Provide either 'client_id' and 'client_secret' for OAuth authentication, "
"or 'access_token' for token-based authentication."
)

if not self.server_hostname or not self.http_path:
try:
# attempt to fetch warehouse details
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
# warehouse ID may be present in an env variable
if w.config.warehouse_id:
warehouse = w.warehouses.get(w.config.warehouse_id)
else:
# for some reason list of warehouses has different type than a single one 🤯
warehouse = list(w.warehouses.list())[0] # type: ignore[assignment]
logger.info(
f"Will attempt to use warehouse {warehouse.id} to get sql connection params"
)
self.server_hostname = self.server_hostname or warehouse.odbc_params.hostname
self.http_path = self.http_path or warehouse.odbc_params.path
except Exception:
pass

for param in ("catalog", "server_hostname", "http_path"):
if not getattr(self, param):
raise ConfigurationValueError(
f"Configuration error: Missing required parameter '{param}'. "
"Please provide it in the configuration."
)

def to_connector_params(self) -> Dict[str, Any]:
conn_params = dict(
Expand All @@ -60,6 +108,9 @@ def to_connector_params(self) -> Dict[str, Any]:

return conn_params

def __str__(self) -> str:
return f"databricks://{self.server_hostname}{self.http_path}/{self.catalog}"


@configspec
class DatabricksClientConfiguration(DestinationClientDwhWithStagingConfiguration):
Expand All @@ -69,10 +120,20 @@ class DatabricksClientConfiguration(DestinationClientDwhWithStagingConfiguration
"If set, credentials with given name will be used in copy command"
is_staging_external_location: bool = False
"""If true, the temporary credentials are not propagated to the COPY command"""
staging_volume_name: Optional[str] = None
"""Name of the Databricks managed volume for temporary storage, e.g., <catalog_name>.<database_name>.<volume_name>. Defaults to '_dlt_temp_load_volume' if not set."""
keep_staged_files: Optional[bool] = True
"""Tells if to keep the files in internal (volume) stage"""

def __str__(self) -> str:
"""Return displayable destination location"""
if self.staging_config:
return str(self.staging_config.credentials)
if self.credentials:
return str(self.credentials)
else:
return "[no staging set]"
return ""

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
if self.credentials and self.credentials.server_hostname:
return digest128(self.credentials.server_hostname)
return ""
Loading

0 comments on commit e9bbeef

Please sign in to comment.