Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] raise exception with main notebook error in DatabricksRunNowDeferrableOperator #39110

Merged
merged 12 commits into from
May 1, 2024
Merged
42 changes: 25 additions & 17 deletions airflow/providers/databricks/operators/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,7 @@ def _handle_databricks_operator_execution(operator, hook, log, context) -> None:
return

if run_state.result_state == "FAILED":
task_run_id = None
if "tasks" in run_info:
for task in run_info["tasks"]:
if task.get("state", {}).get("result_state", "") == "FAILED":
task_run_id = task["run_id"]
if task_run_id is not None:
run_output = hook.get_run_output(task_run_id)
if "error" in run_output:
notebook_error = run_output["error"]
else:
notebook_error = run_state.state_message
else:
notebook_error = run_state.state_message
notebook_error = _get_databricks_notebook_error(run_info, hook, run_state)
error_message = (
f"{operator.task_id} failed with terminal state: {run_state} "
f"and with the error {notebook_error}"
Expand Down Expand Up @@ -156,17 +144,37 @@ def _handle_deferrable_databricks_operator_execution(operator, hook, log, contex
log.info("%s completed successfully.", operator.task_id)


def _handle_deferrable_databricks_operator_completion(event: dict, log: Logger) -> None:
def _get_databricks_notebook_error(run_info: dict, hook: DatabricksHook, run_state: RunState) -> str:
task_run_id = None
if "tasks" in run_info:
for task in run_info["tasks"]:
if task.get("state", {}).get("result_state", "") == "FAILED":
task_run_id = task["run_id"]
if task_run_id is not None:
run_output = hook.get_run_output(task_run_id)
if "error" in run_output:
notebook_error = run_output["error"]
else:
notebook_error = run_state.state_message
else:
notebook_error = run_state.state_message
return notebook_error


def _handle_deferrable_databricks_operator_completion(event: dict, log: Logger, hook: DatabricksHook) -> None:
validate_trigger_event(event)
run_state = RunState.from_json(event["run_state"])
run_page_url = event["run_page_url"]
run_id = event["run_id"]
log.info("View run status, Spark UI, and logs at %s", run_page_url)

if run_state.is_successful:
log.info("Job run completed successfully.")
return
run_info = hook.get_run(run_id)
notebook_error = _get_databricks_notebook_error(run_info, hook, run_state)
error_message = f"Job run failed with terminal state: {run_state} and with the error {notebook_error}"

error_message = f"Job run failed with terminal state: {run_state}"
if event["repair_run"]:
log.warning(
"%s but since repair run is set, repairing the run with all failed tasks",
Expand Down Expand Up @@ -573,7 +581,7 @@ def on_kill(self):
self.log.error("Error: Task: %s with invalid run_id was requested to be cancelled.", self.task_id)

def execute_complete(self, context: dict | None, event: dict):
_handle_deferrable_databricks_operator_completion(event, self.log)
_handle_deferrable_databricks_operator_completion(event, self.log, self._hook)


@deprecated(
Expand Down Expand Up @@ -850,7 +858,7 @@ def execute(self, context: Context):

def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
if event:
_handle_deferrable_databricks_operator_completion(event, self.log)
_handle_deferrable_databricks_operator_completion(event, self.log, self._hook)
if event["repair_run"]:
self.repair_run = False
self.run_id = event["run_id"]
Expand Down