Skip to content

Commit

Permalink
trying to merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Dec 5, 2024
1 parent e0ee991 commit a13dd53
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
15 changes: 9 additions & 6 deletions dbt/adapters/databricks/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def _convert_to_sdk_types(self, job_settings: dict[str, Any]) -> dict[str, Any]:

def _get_exception(self, run: Run, run_id: int) -> None:
try:
run_id = utils.if_some(run.tasks, lambda x: x[0].run_id) or run_id
run_id = utils.if_some(run.tasks, lambda x: x[0].run_id if x else None) or run_id # type: ignore
output = self.wc.jobs.get_run_output(run_id)
raise DbtRuntimeError(
"Python model failed with traceback as:\n"
Expand All @@ -281,8 +281,8 @@ def _get_exception(self, run: Run, run_id: int) -> None:
if isinstance(e, DbtRuntimeError):
raise e
else:
result_state = utils.if_some(run.state, lambda s: s.result_state) or ""
state_message = utils.if_some(run.state, lambda s: s.state_message) or ""
result_state = utils.if_some(run.state, lambda s: s.result_state) or "" # type: ignore
state_message = utils.if_some(run.state, lambda s: s.state_message) or "" # type: ignore
raise DbtRuntimeError(
f"Python model run ended in state {result_state} "
f"with state_message\n{state_message}"
Expand Down Expand Up @@ -391,7 +391,10 @@ def poll_for_completion(self, pipeline_id: str) -> None:
if response.cause:
raise DbtRuntimeError(f"Pipeline {pipeline_id} failed: {response.cause}")
else:
latest_update = utils.if_some(response.latest_updates, lambda x: x[0])
latest_update = utils.if_some(
response.latest_updates,
lambda x: x[0] if x else None, # type: ignore
)
last_error = self.get_update_error(pipeline_id, latest_update)
raise DbtRuntimeError(f"Pipeline {pipeline_id} failed: {last_error}")

Expand All @@ -401,7 +404,7 @@ def get_update_error(self, pipeline_id: str, update_id: str) -> str:
e
for e in events
if e.event_type == "update_progress"
and utils.if_some(e.origin, lambda x: x.update_id == update_id)
and utils.if_some(e.origin, lambda x: x.update_id == update_id) # type: ignore
]

error_events = [e.error for e in update_events if e.error]
Expand All @@ -411,7 +414,7 @@ def get_update_error(self, pipeline_id: str, update_id: str) -> str:
msg = (
utils.if_some(
error_events[0].exceptions,
lambda x: "\n".join(map(lambda y: y.message or "", x)),
lambda x: "\n".join(map(lambda y: y.message or "", x)), # type: ignore
)
or ""
)
Expand Down
4 changes: 0 additions & 4 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from dbt.adapters.databricks.__version__ import version as __version__
from dbt.adapters.databricks.api_client import DatabricksApiClient
from dbt.adapters.databricks.credentials import (
BearerAuth,
DatabricksCredentialManager,
DatabricksCredentials,
)
Expand Down Expand Up @@ -392,9 +391,6 @@ def __init__(self, profile: AdapterRequiredConfig, mp_context: SpawnContext):

def cancel_open(self) -> list[str]:
cancelled = super().cancel_open()
creds = cast(DatabricksCredentials, self.profile.credentials)
assert self.credentials_manager
api_client = DatabricksApiClient.create(self.credentials_manager.api_client, creds, 15 * 60)
logger.info("Cancelling open python jobs")
PythonRunTracker.cancel_runs(self.api_client)
return cancelled
Expand Down

0 comments on commit a13dd53

Please sign in to comment.