Skip to content

Commit

Permalink
Revert to use inline params to keep support for 13.x cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Jun 5, 2024
1 parent 3e6e4ee commit e3c5d68
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from contextlib import contextmanager, suppress
from typing import Any, AnyStr, ClassVar, Iterator, Optional, Sequence, List, Union, Dict

import pendulum

from databricks import sql as databricks_lib
from databricks.sql.client import (
Connection as DatabricksSqlConnection,
Expand Down Expand Up @@ -38,7 +38,9 @@ def __init__(self, dataset_name: str, credentials: DatabricksCredentials) -> Non

def open_connection(self) -> DatabricksSqlConnection:
conn_params = self.credentials.to_connector_params()
self._conn = databricks_lib.connect(**conn_params, schema=self.dataset_name)
self._conn = databricks_lib.connect(
**conn_params, schema=self.dataset_name, use_inline_params="silent"
)
return self._conn

@raise_open_connection_error
Expand Down Expand Up @@ -89,21 +91,25 @@ def execute_sql(
@raise_database_error
def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]:
curr: DBApiCursor = None
db_args: Optional[Dict[str, Any]]
if args:
keys = [f"arg{i}" for i in range(len(args))]
# Replace position arguments (%s) with named arguments (:arg0, :arg1, ...)
query = query % tuple(f":{key}" for key in keys)
db_args = {}
for key, db_arg in zip(keys, args):
# Databricks connector doesn't accept pendulum objects
if isinstance(db_arg, pendulum.DateTime):
db_arg = to_py_datetime(db_arg)
elif isinstance(db_arg, pendulum.Date):
db_arg = to_py_date(db_arg)
db_args[key] = db_arg
else:
db_args = kwargs or None
# TODO: Inline param support will be dropped in future databricks driver, switch to :named paramstyle
# This will drop support for cluster runtime v13.x
# db_args: Optional[Dict[str, Any]]
# if args:
# keys = [f"arg{i}" for i in range(len(args))]
# # Replace position arguments (%s) with named arguments (:arg0, :arg1, ...)
# query = query % tuple(f":{key}" for key in keys)
# db_args = {}
# for key, db_arg in zip(keys, args):
# # Databricks connector doesn't accept pendulum objects
# if isinstance(db_arg, pendulum.DateTime):
# db_arg = to_py_datetime(db_arg)
# elif isinstance(db_arg, pendulum.Date):
# db_arg = to_py_date(db_arg)
# db_args[key] = db_arg
# else:
# db_args = kwargs or None

db_args = args or kwargs or None
with self._conn.cursor() as curr:
curr.execute(query, db_args)
yield DBApiCursorImpl(curr) # type: ignore[abstract]
Expand Down

0 comments on commit e3c5d68

Please sign in to comment.