diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 802b07ef36ea..38b1b9dda0fb 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -36,7 +36,7 @@ use pyo3::prelude::{ PyResult as PyO3Result, Python, }; use pyo3::types::{PyBytes, PyDict, PyList, PyTuple, PyType}; -use pyo3::{create_exception, IntoPy, PyAny}; +use pyo3::{create_exception, IntoPy, PyAny, PyRef}; use regex::Regex; use rule_graph::{self, RuleGraph}; use task_executor::Executor; @@ -839,36 +839,50 @@ async fn workunits_to_py_tuple_value( #[pyfunction] fn session_poll_workunits( - py: Python, - py_scheduler: &PyScheduler, - py_session: &PySession, + py_scheduler: PyObject, + py_session: PyObject, max_log_verbosity_level: u64, ) -> PyO3Result { - let py_level: PythonLogLevel = max_log_verbosity_level - .try_into() - .map_err(|e| PyException::new_err(format!("{}", e)))?; - let core = &py_scheduler.0.core; - core.executor.enter(|| { - let (started, completed) = py.allow_threads(|| { - py_session - .0 - .workunit_store() - .latest_workunits(py_level.into()) - }); + // TODO: Black magic. PyObject is not marked UnwindSafe, and contains an UnsafeCell. Since PyO3 + // only allows us to receive `pyfunction` arguments as `PyObject` (or references under a held + // GIL), we cannot do what it does to use `catch_unwind` which would be interacting with + // `catch_unwind` while the object is still a raw pointer, and unchecked. + // + // Instead, we wrap the call, and assert that it is safe. It really might not be though. So this + // code should only live long enough to shake out the current issue, and an upstream issue with + // PyO3 will be the long term solution. + let py_scheduler = std::panic::AssertUnwindSafe(py_scheduler); + let py_session = std::panic::AssertUnwindSafe(py_session); + std::panic::catch_unwind(|| { + let (core, session, py_level) = { + let gil = Python::acquire_gil(); + let py = gil.python(); + + let py_scheduler = py_scheduler.extract::>(py)?; + let py_session = py_session.extract::>(py)?; + let py_level: PythonLogLevel = max_log_verbosity_level + .try_into() + .map_err(|e| PyException::new_err(format!("{}", e)))?; + (py_scheduler.0.core.clone(), py_session.0.clone(), py_level) + }; + core.executor.enter(|| { + let (started, completed) = session.workunit_store().latest_workunits(py_level.into()); - let started_val = core.executor.block_on(workunits_to_py_tuple_value( - py, - started, - core, - &py_session.0, - ))?; - let completed_val = core.executor.block_on(workunits_to_py_tuple_value( - py, - completed, - core, - &py_session.0, - ))?; - Ok(externs::store_tuple(py, vec![started_val, completed_val]).into()) + let gil = Python::acquire_gil(); + let py = gil.python(); + + let started_val = core + .executor + .block_on(workunits_to_py_tuple_value(py, started, &core, &session))?; + let completed_val = core + .executor + .block_on(workunits_to_py_tuple_value(py, completed, &core, &session))?; + Ok(externs::store_tuple(py, vec![started_val, completed_val]).into()) + }) + }) + .unwrap_or_else(|e| { + log::warn!("Panic in `session_poll_workunits`: {:?}", e); + std::panic::resume_unwind(e); }) }