Skip to content

Commit

Permalink
[CHORE] refactor review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav7261 committed Apr 26, 2024
1 parent f406db4 commit 805544c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 35 deletions.
7 changes: 3 additions & 4 deletions airflow/providers/databricks/operators/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ def _handle_databricks_operator_execution(operator, hook, log, context) -> None:

if run_state.result_state == "FAILED":
task_run_id = None
if "tasks" in run_info:
for task in run_info["tasks"]:
if task.get("state", {}).get("result_state", "") == "FAILED":
task_run_id = task["run_id"]
for task in run_info.get("tasks", []):
if task.get("state", {}).get("result_state", "") == "FAILED":
task_run_id = task["run_id"]
if task_run_id is not None:
run_output = hook.get_run_output(task_run_id)
if "error" in run_output:
Expand Down
62 changes: 31 additions & 31 deletions airflow/providers/databricks/triggers/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,37 +85,37 @@ async def run(self):
while True:
run_state = await self.hook.a_get_run_state(self.run_id)
notebook_error = None
if run_state.is_terminal:
if run_state.result_state == "FAILED":
run_info = await self.hook.a_get_run(self.run_id)
task_run_id = None
if "tasks" in run_info:
for task in run_info["tasks"]:
if task.get("state", {}).get("result_state", "") == "FAILED":
task_run_id = task["run_id"]
if task_run_id is not None:
run_output = await self.hook.a_get_run_output(task_run_id)
if "error" in run_output:
notebook_error = run_output["error"]
else:
notebook_error = run_state.state_message
else:
notebook_error = run_state.state_message
yield TriggerEvent(
{
"run_id": self.run_id,
"run_page_url": self.run_page_url,
"run_state": run_state.to_json(),
"repair_run": self.repair_run,
"notebook_error": notebook_error,
}
if not run_state.is_terminal:
self.log.info(
"run-id %s in run state %s. sleeping for %s seconds",
self.run_id,
run_state,
self.polling_period_seconds,
)
return
await asyncio.sleep(self.polling_period_seconds)
continue

self.log.info(
"run-id %s in run state %s. sleeping for %s seconds",
self.run_id,
run_state,
self.polling_period_seconds,
if run_state.result_state == "FAILED":
run_info = await self.hook.a_get_run(self.run_id)
task_run_id = None
for task in run_info.get("tasks", []):
if task.get("state", {}).get("result_state", "") == "FAILED":
task_run_id = task["run_id"]
if task_run_id is not None:
run_output = await self.hook.a_get_run_output(task_run_id)
if "error" in run_output:
notebook_error = run_output["error"]
else:
notebook_error = run_state.state_message
else:
notebook_error = run_state.state_message
yield TriggerEvent(
{
"run_id": self.run_id,
"run_page_url": self.run_page_url,
"run_state": run_state.to_json(),
"repair_run": self.repair_run,
"notebook_error": notebook_error,
}
)
await asyncio.sleep(self.polling_period_seconds)
return

0 comments on commit 805544c

Please sign in to comment.