Skip to content

Commit

Permalink
python-venv-operator: termination log in alert
Browse files Browse the repository at this point in the history
  • Loading branch information
karunpoudel committed Jun 7, 2023
1 parent 471fdac commit 1e75b69
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
7 changes: 7 additions & 0 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def _execute_python_callable_in_subprocess(self, python_path: Path, tmp_dir: Pat
output_path = tmp_dir / "script.out"
string_args_path = tmp_dir / "string_args.txt"
script_path = tmp_dir / "script.py"
termination_log_path = tmp_dir / "termination.log"
self._write_args(input_path)
self._write_string_args(string_args_path)
write_python_script(
Expand All @@ -423,11 +424,17 @@ def _execute_python_callable_in_subprocess(self, python_path: Path, tmp_dir: Pat
os.fspath(input_path),
os.fspath(output_path),
os.fspath(string_args_path),
os.fspath(termination_log_path),
]
)
except subprocess.CalledProcessError as e:
if e.returncode in self.skip_on_exit_code:
raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.")
elif termination_log_path.exists() and termination_log_path.stat().st_size > 0:
error_msg = f"Process returned non-zero exit status {e.returncode}.\n"
with open(termination_log_path) as file:
error_msg += file.read()
raise AirflowException(error_msg) from None
else:
raise

Expand Down
7 changes: 6 additions & 1 deletion airflow/utils/python_virtualenv_script.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ with open(sys.argv[3], "r") as file:

# Script
{{ python_callable_source }}
res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"])
try:
res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"])
except Exception as e:
with open(sys.argv[4], "w") as file:
file.write(str(e))
raise

# Write output
with open(sys.argv[2], "wb") as file:
Expand Down
8 changes: 8 additions & 0 deletions tests/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,14 @@ def f():
with pytest.raises(CalledProcessError):
self.run_as_task(f)

def test_fail_with_message(self):
def f():
raise Exception("Custom error message")

with pytest.raises(AirflowException) as e:
self.run_as_task(f)
assert "Custom error message" in str(e)

def test_string_args(self):
def f():
global virtualenv_string_args
Expand Down

0 comments on commit 1e75b69

Please sign in to comment.