diff --git a/src/python/pants/bin/daemon_pants_runner.py b/src/python/pants/bin/daemon_pants_runner.py index 70a132e915d..35a4ed40248 100644 --- a/src/python/pants/bin/daemon_pants_runner.py +++ b/src/python/pants/bin/daemon_pants_runner.py @@ -142,7 +142,6 @@ def __call__( command: str, args: Tuple[str, ...], env: Dict[str, str], - working_directory: bytes, cancellation_latch: PySessionCancellationLatch, stdin_fileno: int, stdout_fileno: int, diff --git a/src/python/pants/engine/fs_test.py b/src/python/pants/engine/fs_test.py index ae57d6b8e9d..db23103e039 100644 --- a/src/python/pants/engine/fs_test.py +++ b/src/python/pants/engine/fs_test.py @@ -40,8 +40,6 @@ Workspace, ) from pants.engine.goal import Goal, GoalSubsystem -from pants.engine.internals.native_engine_pyo3 import PyDigest as DigestPyO3 -from pants.engine.internals.native_engine_pyo3 import PySnapshot as SnapshotPyO3 from pants.engine.internals.scheduler import ExecutionError from pants.engine.rules import Get, goal_rule, rule from pants.testutil.rule_runner import QueryRule, RuleRunner @@ -1093,51 +1091,41 @@ def is_changed_snapshot() -> bool: # ----------------------------------------------------------------------------------------------- -@pytest.mark.parametrize("cls", [Digest, DigestPyO3]) -def test_digest_properties(cls) -> None: - digest = cls("a" * 64, 1000) +def test_digest_properties() -> None: + digest = Digest("a" * 64, 1000) assert digest.fingerprint == "a" * 64 assert digest.serialized_bytes_length == 1000 -@pytest.mark.parametrize("cls", [Digest, DigestPyO3]) -def test_digest_repr(cls) -> None: - assert str(cls("a" * 64, 1)) == f"Digest({repr('a' * 64)}, 1)" +def test_digest_repr() -> None: + assert str(Digest("a" * 64, 1)) == f"Digest({repr('a' * 64)}, 1)" -@pytest.mark.parametrize("cls", [Digest, DigestPyO3]) -def test_digest_hash(cls) -> None: - assert hash(cls("a" * 64, 1)) == -6148914691236517206 - assert hash(cls("b" * 64, 1)) == -4919131752989213765 +def test_digest_hash() -> None: + assert hash(Digest("a" * 64, 1)) == -6148914691236517206 + assert hash(Digest("b" * 64, 1)) == -4919131752989213765 # Note that the size bytes is not considered in the hash. - assert hash(cls("a" * 64, 1000)) == -6148914691236517206 + assert hash(Digest("a" * 64, 1000)) == -6148914691236517206 -@pytest.mark.parametrize("cls", [Digest, DigestPyO3]) -def test_digest_equality(cls) -> None: - digest = cls("a" * 64, 1) - assert digest == cls("a" * 64, 1) - assert digest != cls("a" * 64, 1000) - assert digest != cls("0" * 64, 1) +def test_digest_equality() -> None: + digest = Digest("a" * 64, 1) + assert digest == Digest("a" * 64, 1) + assert digest != Digest("a" * 64, 1000) + assert digest != Digest("0" * 64, 1) with pytest.raises(TypeError): - digest < digest + digest < digest # type: ignore[operator] -@pytest.mark.parametrize( - "snapshot_cls,digest_cls", [(Snapshot, Digest), (SnapshotPyO3, DigestPyO3)] -) -def test_snapshot_properties(snapshot_cls, digest_cls) -> None: - digest = digest_cls("a" * 64, 1000) - snapshot = snapshot_cls._create_for_testing(digest, ["f.ext", "dir/f.ext"], ["dir"]) +def test_snapshot_properties() -> None: + digest = Digest("a" * 64, 1000) + snapshot = Snapshot._create_for_testing(digest, ["f.ext", "dir/f.ext"], ["dir"]) assert snapshot.digest == digest assert snapshot.files == ("f.ext", "dir/f.ext") assert snapshot.dirs == ("dir",) -@pytest.mark.parametrize( - "snapshot_cls,digest_cls", [(Snapshot, Digest), (SnapshotPyO3, DigestPyO3)] -) -def test_snapshot_hash(snapshot_cls, digest_cls) -> None: +def test_snapshot_hash() -> None: def assert_hash( expected: int, *, @@ -1145,8 +1133,8 @@ def assert_hash( files: Optional[List[str]] = None, dirs: Optional[List[str]] = None, ) -> None: - digest = digest_cls(digest_char * 64, 1000) - snapshot = snapshot_cls._create_for_testing( + digest = Digest(digest_char * 64, 1000) + snapshot = Snapshot._create_for_testing( digest, files or ["f.ext", "dir/f.ext"], dirs or ["dir"] ) assert hash(snapshot) == expected @@ -1159,28 +1147,21 @@ def assert_hash( assert_hash(-4919131752989213765, digest_char="b") -@pytest.mark.parametrize( - "snapshot_cls,digest_cls", [(Snapshot, Digest), (SnapshotPyO3, DigestPyO3)] -) -def test_snapshot_equality(snapshot_cls, digest_cls) -> None: +def test_snapshot_equality() -> None: # Only the digest is used for equality. - snapshot = snapshot_cls._create_for_testing( - digest_cls("a" * 64, 1000), ["f.ext", "dir/f.ext"], ["dir"] - ) - assert snapshot == snapshot_cls._create_for_testing( - digest_cls("a" * 64, 1000), ["f.ext", "dir/f.ext"], ["dir"] - ) - assert snapshot == snapshot_cls._create_for_testing( - digest_cls("a" * 64, 1000), ["f.ext", "dir/f.ext"], ["foo"] + snapshot = Snapshot._create_for_testing(Digest("a" * 64, 1000), ["f.ext", "dir/f.ext"], ["dir"]) + assert snapshot == Snapshot._create_for_testing( + Digest("a" * 64, 1000), ["f.ext", "dir/f.ext"], ["dir"] ) - assert snapshot == snapshot_cls._create_for_testing( - digest_cls("a" * 64, 1000), ["f.ext"], ["dir"] + assert snapshot == Snapshot._create_for_testing( + Digest("a" * 64, 1000), ["f.ext", "dir/f.ext"], ["foo"] ) - assert snapshot != snapshot_cls._create_for_testing( - digest_cls("a" * 64, 0), ["f.ext", "dir/f.ext"], ["dir"] + assert snapshot == Snapshot._create_for_testing(Digest("a" * 64, 1000), ["f.ext"], ["dir"]) + assert snapshot != Snapshot._create_for_testing( + Digest("a" * 64, 0), ["f.ext", "dir/f.ext"], ["dir"] ) - assert snapshot != snapshot_cls._create_for_testing( - digest_cls("b" * 64, 1000), ["f.ext", "dir/f.ext"], ["dir"] + assert snapshot != Snapshot._create_for_testing( + Digest("b" * 64, 1000), ["f.ext", "dir/f.ext"], ["dir"] ) with pytest.raises(TypeError): - snapshot < snapshot + snapshot < snapshot # type: ignore[operator] diff --git a/src/python/pants/engine/internals/native_engine.pyi b/src/python/pants/engine/internals/native_engine.pyi index b3d4ec7cc04..eb40f876789 100644 --- a/src/python/pants/engine/internals/native_engine.pyi +++ b/src/python/pants/engine/internals/native_engine.pyi @@ -22,7 +22,6 @@ class RawFdRunner(Protocol): command: str, args: tuple[str, ...], env: dict[str, str], - working_directory: bytes, cancellation_latch: PySessionCancellationLatch, stdin_fileno: int, stdout_fileno: int, @@ -149,6 +148,9 @@ class PyDigest: def fingerprint(self) -> str: ... @property def serialized_bytes_length(self) -> int: ... + def __eq__(self, other: PyDigest | Any) -> bool: ... + def __hash__(self) -> int: ... + def __repr__(self) -> str: ... class PySnapshot: def __init__(self) -> None: ... @@ -162,6 +164,9 @@ class PySnapshot: def dirs(self) -> tuple[str, ...]: ... @property def files(self) -> tuple[str, ...]: ... + def __eq__(self, other: PySnapshot | Any) -> bool: ... + def __hash__(self) -> int: ... + def __repr__(self) -> str: ... class PyExecutionRequest: def __init__( @@ -184,7 +189,7 @@ class PyGeneratorResponseGetMulti: def __init__(self, gets: tuple[PyGeneratorResponseGet, ...]) -> None: ... class PyNailgunServer: - pass + def port(self) -> int: ... class PyRemotingOptions: def __init__(self, **kwargs: Any) -> None: ... diff --git a/src/python/pants/engine/internals/native_engine_pyo3.pyi b/src/python/pants/engine/internals/native_engine_pyo3.pyi index de2ef46dadd..3f67421da2b 100644 --- a/src/python/pants/engine/internals/native_engine_pyo3.pyi +++ b/src/python/pants/engine/internals/native_engine_pyo3.pyi @@ -3,8 +3,6 @@ from __future__ import annotations -from typing import Any, Sequence - from pants.engine.fs import PathGlobs # TODO: black and flake8 disagree about the content of this file: @@ -28,30 +26,6 @@ def default_cache_path() -> str: ... # cast to `tuple()` when not necessary. def match_path_globs(path_globs: PathGlobs, paths: tuple[str, ...]) -> str: ... -class PyDigest: - def __init__(self, fingerprint: str, serialized_bytes_length: int) -> None: ... - @property - def fingerprint(self) -> str: ... - @property - def serialized_bytes_length(self) -> int: ... - def __eq__(self, other: PyDigest | Any) -> bool: ... - def __hash__(self) -> int: ... - -class PySnapshot: - def __init__(self) -> None: ... - @classmethod - def _create_for_testing( - cls, digest: PyDigest, files: Sequence[str], dirs: Sequence[str] - ) -> PySnapshot: ... - @property - def digest(self) -> PyDigest: ... - @property - def dirs(self) -> tuple[str, ...]: ... - @property - def files(self) -> tuple[str, ...]: ... - def __eq__(self, other: PySnapshot | Any) -> bool: ... - def __hash__(self) -> int: ... - # ------------------------------------------------------------------------------ # Workunits # ------------------------------------------------------------------------------ diff --git a/src/python/pants/engine/internals/scheduler.py b/src/python/pants/engine/internals/scheduler.py index bd91bb86202..88aaf06875c 100644 --- a/src/python/pants/engine/internals/scheduler.py +++ b/src/python/pants/engine/internals/scheduler.py @@ -469,12 +469,12 @@ def execute( states = [ Throw( - raw_root.result(), - python_traceback=raw_root.python_traceback(), - engine_traceback=raw_root.engine_traceback(), + raw_root.result, + python_traceback=raw_root.python_traceback, + engine_traceback=raw_root.engine_traceback, ) - if raw_root.is_throw() - else Return(raw_root.result()) + if raw_root.is_throw + else Return(raw_root.result) for raw_root in raw_roots ] diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index ce8cee2efed..ac4a9b9d114 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -412,17 +412,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" -[[package]] -name = "cpython" -version = "0.5.2" -source = "git+https://github.com/pantsbuild/rust-cpython?rev=46d7eff26a705384e41eb6f7b870cd3f5f14b3bc#46d7eff26a705384e41eb6f7b870cd3f5f14b3bc" -dependencies = [ - "libc", - "num-traits", - "paste 1.0.5", - "python3-sys", -] - [[package]] name = "crc32fast" version = "1.2.1" @@ -646,7 +635,6 @@ dependencies = [ "bytes", "cache", "concrete_time", - "cpython", "crossbeam-channel 0.4.4", "derivative", "double-checked-cell-async", @@ -672,6 +660,8 @@ dependencies = [ "petgraph 0.5.1", "process_execution", "protos", + "pyo3", + "pyo3-build-config 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.8.2", "regex", "reqwest", @@ -697,16 +687,12 @@ dependencies = [ name = "engine_pyo3" version = "0.0.1" dependencies = [ - "either", "fs", - "hashing", - "itertools 0.10.1", "mock", "nailgun", "parking_lot", "pyo3", - "pyo3-build-config", - "store", + "pyo3-build-config 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "task_executor", "workunit_store", ] @@ -2005,12 +1991,6 @@ dependencies = [ "proc-macro-hack", ] -[[package]] -name = "paste" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58" - [[package]] name = "paste-impl" version = "0.1.18" @@ -2365,15 +2345,14 @@ dependencies = [ [[package]] name = "pyo3" version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64664505ce285a59b8b7e940fbe54ad65b1758a0810eddc5bc26df6f6ec8c557" +source = "git+https://github.com/davidhewitt/pyo3.git?rev=51b6b947b82e61adad61524f9c58e3b1c89247a5#51b6b947b82e61adad61524f9c58e3b1c89247a5" dependencies = [ "cfg-if 1.0.0", "indoc", "libc", "parking_lot", - "paste 0.1.18", - "pyo3-build-config", + "paste", + "pyo3-build-config 0.15.0 (git+https://github.com/davidhewitt/pyo3.git?rev=51b6b947b82e61adad61524f9c58e3b1c89247a5)", "pyo3-macros", "unindent", ] @@ -2387,11 +2366,18 @@ dependencies = [ "once_cell", ] +[[package]] +name = "pyo3-build-config" +version = "0.15.0" +source = "git+https://github.com/davidhewitt/pyo3.git?rev=51b6b947b82e61adad61524f9c58e3b1c89247a5#51b6b947b82e61adad61524f9c58e3b1c89247a5" +dependencies = [ + "once_cell", +] + [[package]] name = "pyo3-macros" version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "244f21d0a3887a9c02018b94e3b78d693dc7eca5c56839b7796a499cc364deb4" +source = "git+https://github.com/davidhewitt/pyo3.git?rev=51b6b947b82e61adad61524f9c58e3b1c89247a5#51b6b947b82e61adad61524f9c58e3b1c89247a5" dependencies = [ "pyo3-macros-backend", "quote 1.0.10", @@ -2401,24 +2387,14 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d3d18ac41d05199bb82645d56e39f8c8b4909a0137c6f2640f03685b29f672" +source = "git+https://github.com/davidhewitt/pyo3.git?rev=51b6b947b82e61adad61524f9c58e3b1c89247a5#51b6b947b82e61adad61524f9c58e3b1c89247a5" dependencies = [ "proc-macro2 1.0.32", - "pyo3-build-config", + "pyo3-build-config 0.15.0 (git+https://github.com/davidhewitt/pyo3.git?rev=51b6b947b82e61adad61524f9c58e3b1c89247a5)", "quote 1.0.10", "syn 1.0.81", ] -[[package]] -name = "python3-sys" -version = "0.5.2" -source = "git+https://github.com/pantsbuild/rust-cpython?rev=46d7eff26a705384e41eb6f7b870cd3f5f14b3bc#46d7eff26a705384e41eb6f7b870cd3f5f14b3bc" -dependencies = [ - "libc", - "regex", -] - [[package]] name = "quick-error" version = "1.2.3" diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index 0a4da562558..5993f771a76 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -98,11 +98,11 @@ crate-type = ["cdylib"] [features] # NB: To actually load this crate from python, the `extension-module` feature must be enabled. But -# unfortunately, enabling `extension-module` causes tests linked against `cpython` to fail. We +# unfortunately, enabling `extension-module` causes tests linked against `pyo3` to fail. We # define a feature to enable that, but we do not enable it by default: someone building this module # in order to extract `libengine.so` should pass `cargo build .. --features=extension-module`. # see https://github.com/PyO3/pyo3/issues/340 -extension-module = ["cpython/extension-module"] +extension-module = ["pyo3/extension-module"] default = [] [dependencies] @@ -114,8 +114,6 @@ protos = { path = "protos" } bytes = "1.0" cache = { path = "cache" } concrete_time = { path = "concrete_time" } -# TODO: Go back to upstream once https://github.com/dgrunwald/rust-cpython/pull/261 lands. -cpython = { git = "https://github.com/pantsbuild/rust-cpython", rev = "46d7eff26a705384e41eb6f7b870cd3f5f14b3bc" } crossbeam-channel = "0.4" derivative = "2.2" double-checked-cell-async = "2.0" @@ -138,6 +136,8 @@ num_enum = "0.4" parking_lot = "0.11" petgraph = "0.5" process_execution = { path = "process_execution" } +# TODO: Switch back to official PyO3 once https://github.com/PyO3/pyo3/pull/1990 is released. +pyo3 = { git = "https://github.com/davidhewitt/pyo3.git", rev = "51b6b947b82e61adad61524f9c58e3b1c89247a5" } rand = "0.8" regex = "1" reqwest = { version = "0.11", default_features = false, features = ["stream", "rustls-tls"] } @@ -163,6 +163,9 @@ testutil = { path = "./testutil" } fs = { path = "./fs" } env_logger = "0.5.4" +[build-dependencies] +pyo3-build-config = "0.15" + [patch.crates-io] # TODO: Waiting for release after https://github.com/mitsuhiko/console/pull/93. console = { git = "https://github.com/mitsuhiko/console", rev = "3232775295149e6f0aace26d4cfe61013ed3e2d8" } diff --git a/src/rust/engine/build.rs b/src/rust/engine/build.rs index e2cda38aa65..1ada69e65cb 100644 --- a/src/rust/engine/build.rs +++ b/src/rust/engine/build.rs @@ -26,18 +26,9 @@ #![allow(clippy::mutex_atomic)] fn main() { + pyo3_build_config::add_extension_module_link_args(); + // NB: The native extension only works with the Python interpreter version it was built with // (e.g. Python 3.7 vs 3.8). println!("cargo:rerun-if-env-changed=PY"); - - if cfg!(target_os = "macos") { - // N.B. On OSX, we force weak linking by passing the param `-undefined dynamic_lookup` to - // the underlying linker. This avoids "missing symbol" errors for Python symbols - // (e.g. `_PyImport_ImportModule`) at build time when bundling the cpython sources. - // The missing symbols will instead by dynamically resolved in the address space of the parent - // binary (e.g. `python`) at runtime. We do this to avoid needing to link to libpython - // (which would constrain us to specific versions of Python). - println!("cargo:rustc-cdylib-link-arg=-undefined"); - println!("cargo:rustc-cdylib-link-arg=dynamic_lookup"); - } } diff --git a/src/rust/engine/engine_pyo3/Cargo.toml b/src/rust/engine/engine_pyo3/Cargo.toml index 855167074ea..e7d5c50e3e9 100644 --- a/src/rust/engine/engine_pyo3/Cargo.toml +++ b/src/rust/engine/engine_pyo3/Cargo.toml @@ -18,14 +18,11 @@ extension-module = ["pyo3/extension-module"] default = [] [dependencies] -either = "1.6" fs = { path = "../fs" } -hashing = { path = "../hashing" } -itertools = "0.10" nailgun = { path = "../nailgun" } parking_lot = "0.11" -pyo3 = "0.15" -store = { path = "../fs/store" } +# TODO: Switch back to official PyO3 once https://github.com/PyO3/pyo3/pull/1990 is released. +pyo3 = { git = "https://github.com/davidhewitt/pyo3.git", rev = "51b6b947b82e61adad61524f9c58e3b1c89247a5" } task_executor = { path = "../task_executor" } testutil_mock = { package = "mock", path = "../testutil/mock" } workunit_store = { path = "../workunit_store" } diff --git a/src/rust/engine/engine_pyo3/src/externs/interface/fs.rs b/src/rust/engine/engine_pyo3/src/externs/interface/fs.rs index 214d570546b..cd7d3dd65b9 100644 --- a/src/rust/engine/engine_pyo3/src/externs/interface/fs.rs +++ b/src/rust/engine/engine_pyo3/src/externs/interface/fs.rs @@ -3,23 +3,14 @@ use std::path::Path; -use pyo3::basic::CompareOp; use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::{PyString, PyTuple, PyType}; -use either::Either; -use fs::PathStat; use fs::{GlobExpansionConjunction, PathGlobs, PreparedPathGlobs, StrictGlobMatching}; -use hashing::{Digest, Fingerprint}; -use itertools::Itertools; -use store::Snapshot; pub(crate) fn register(m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(match_path_globs, m)?)?; m.add_function(wrap_pyfunction!(default_cache_path, m)?)?; - m.add_class::()?; - m.add_class::()?; Ok(()) } @@ -87,138 +78,6 @@ fn match_path_globs( }) } -// ----------------------------------------------------------------------------- -// Digest -// ----------------------------------------------------------------------------- - -#[pyclass] -#[derive(Clone)] -struct PyDigest(Digest); - -#[pymethods] -impl PyDigest { - #[new] - fn __new__(fingerprint: &str, serialized_bytes_length: usize) -> PyResult { - let fingerprint = Fingerprint::from_hex_string(fingerprint) - .map_err(|e| PyValueError::new_err(format!("Invalid digest hex: {}", e)))?; - Ok(Self(Digest::new(fingerprint, serialized_bytes_length))) - } - - fn __hash__(&self) -> u64 { - self.0.hash.prefix_hash() - } - - fn __repr__(&self) -> String { - format!("Digest('{}', {})", self.0.hash.to_hex(), self.0.size_bytes) - } - - fn __richcmp__(&self, other: &PyDigest, op: CompareOp, py: Python) -> PyObject { - match op { - CompareOp::Eq => (self.0 == other.0).into_py(py), - CompareOp::Ne => (self.0 != other.0).into_py(py), - _ => py.NotImplemented(), - } - } - - #[getter] - fn fingerprint(&self) -> String { - self.0.hash.to_hex() - } - - #[getter] - fn serialized_bytes_length(&self) -> usize { - self.0.size_bytes - } -} - -// ----------------------------------------------------------------------------- -// Snapshot -// ----------------------------------------------------------------------------- - -#[pyclass] -struct PySnapshot(Snapshot); - -#[pymethods] -impl PySnapshot { - #[new] - fn __new__() -> Self { - Self(Snapshot::empty()) - } - - #[classmethod] - fn _create_for_testing( - _cls: &PyType, - py_digest: PyDigest, - files: Vec, - dirs: Vec, - ) -> Self { - let snapshot = unsafe { Snapshot::create_for_testing_ffi(py_digest.0, files, dirs) }; - Self(snapshot) - } - - fn __hash__(&self) -> u64 { - self.0.digest.hash.prefix_hash() - } - - fn __repr__(&self) -> PyResult { - let (dirs, files): (Vec<_>, Vec<_>) = self.0.path_stats.iter().partition_map(|ps| match ps { - PathStat::Dir { path, .. } => Either::Left(path.to_string_lossy()), - PathStat::File { path, .. } => Either::Right(path.to_string_lossy()), - }); - - Ok(format!( - "Snapshot(digest=({}, {}), dirs=({}), files=({}))", - self.0.digest.hash.to_hex(), - self.0.digest.size_bytes, - dirs.join(","), - files.join(",") - )) - } - - fn __richcmp__(&self, other: &PySnapshot, op: CompareOp, py: Python) -> PyObject { - match op { - CompareOp::Eq => (self.0.digest == other.0.digest).into_py(py), - CompareOp::Ne => (self.0.digest != other.0.digest).into_py(py), - _ => py.NotImplemented(), - } - } - - #[getter] - fn digest(&self) -> PyDigest { - PyDigest(self.0.digest) - } - - #[getter] - fn files<'py>(&self, py: Python<'py>) -> &'py PyTuple { - let files = self - .0 - .path_stats - .iter() - .filter_map(|ps| match ps { - PathStat::File { path, .. } => path.to_str(), - _ => None, - }) - .map(|ps| PyString::new(py, ps)) - .collect::>(); - PyTuple::new(py, files) - } - - #[getter] - fn dirs<'py>(&self, py: Python<'py>) -> &'py PyTuple { - let dirs = self - .0 - .path_stats - .iter() - .filter_map(|ps| match ps { - PathStat::Dir { path, .. } => path.to_str(), - _ => None, - }) - .map(|ps| PyString::new(py, ps)) - .collect::>(); - PyTuple::new(py, dirs) - } -} - // ----------------------------------------------------------------------------- // Utils // ----------------------------------------------------------------------------- diff --git a/src/rust/engine/src/externs/engine_aware.rs b/src/rust/engine/src/externs/engine_aware.rs index 491ba185a39..a7420c90de1 100644 --- a/src/rust/engine/src/externs/engine_aware.rs +++ b/src/rust/engine/src/externs/engine_aware.rs @@ -7,7 +7,8 @@ use crate::nodes::{lift_directory_digest, lift_file_digest}; use crate::python::{TypeId, Value}; use crate::Types; -use cpython::{ObjectProtocol, PyDict, PyString, Python}; +use pyo3::prelude::*; +use pyo3::types::PyDict; use workunit_store::{ ArtifactOutput, Level, RunningWorkunit, UserMetadataItem, UserMetadataPyValue, }; @@ -25,12 +26,12 @@ pub(crate) struct EngineAwareReturnType { } impl EngineAwareReturnType { - pub(crate) fn from_task_result(py: Python, task_result: &Value, context: &Context) -> Self { + pub(crate) fn from_task_result(task_result: &PyAny, context: &Context) -> Self { Self { - level: Self::level(py, task_result), - message: Self::message(py, task_result), - artifacts: Self::artifacts(py, &context.core.types, task_result).unwrap_or_else(Vec::new), - metadata: metadata(py, context, task_result).unwrap_or_else(Vec::new), + level: Self::level(task_result), + message: Self::message(task_result), + artifacts: Self::artifacts(&context.core.types, task_result).unwrap_or_else(Vec::new), + metadata: metadata(context, task_result).unwrap_or_else(Vec::new), } } @@ -46,91 +47,83 @@ impl EngineAwareReturnType { }); } - fn level(py: Python, value: &Value) -> Option { - let level_val = externs::call_method0(py, value, "level").ok()?; - if level_val.is_none(py) { + fn level(obj: &PyAny) -> Option { + let level_val = obj.call_method0("level").ok()?; + if level_val.is_none() { return None; } - externs::val_to_log_level(&level_val).ok() + externs::val_to_log_level(level_val).ok() } - fn message(py: Python, value: &Value) -> Option { - let msg_val = externs::call_method0(py, value, "message").ok()?; - if msg_val.is_none(py) { + fn message(obj: &PyAny) -> Option { + let msg_val = obj.call_method0("message").ok()?; + if msg_val.is_none() { return None; } - msg_val.extract(py).ok() + msg_val.extract().ok() } - fn artifacts(py: Python, types: &Types, value: &Value) -> Option> { - let artifacts_val = externs::call_method0(py, value, "artifacts").ok()?; - if artifacts_val.is_none(py) { + fn artifacts(types: &Types, obj: &PyAny) -> Option> { + let artifacts_val = obj.call_method0("artifacts").ok()?; + if artifacts_val.is_none() { return None; } - let artifacts_dict: &PyDict = artifacts_val.cast_as::(py).ok()?; + let artifacts_dict = artifacts_val.cast_as::().ok()?; let mut output = Vec::new(); - for (key, value) in artifacts_dict.items(py).into_iter() { - let key_name: String = key.cast_as::(py).ok()?.to_string_lossy(py).into(); - - let artifact_output = if TypeId::new(&value.get_type(py)) == types.file_digest { - lift_file_digest(types, &value).map(ArtifactOutput::FileDigest) + for kv_pair in artifacts_dict.items().into_iter() { + let (key, value): (String, &PyAny) = kv_pair.extract().ok()?; + let artifact_output = if TypeId::new(value.get_type()) == types.file_digest { + lift_file_digest(types, value).map(ArtifactOutput::FileDigest) } else { - let digest_value = value.getattr(py, "digest").ok()?; - lift_directory_digest(&digest_value).map(ArtifactOutput::Snapshot) + let digest_value = value.getattr("digest").ok()?; + lift_directory_digest(digest_value).map(ArtifactOutput::Snapshot) } .ok()?; - output.push((key_name, artifact_output)); + output.push((key, artifact_output)); } Some(output) } - pub(crate) fn is_cacheable(py: Python, value: &Value) -> Option { - externs::call_method0(py, value, "cacheable") - .ok()? - .extract(py) - .ok() + pub(crate) fn is_cacheable(obj: &PyAny) -> Option { + obj.call_method0("cacheable").ok()?.extract().ok() } } pub struct EngineAwareParameter; impl EngineAwareParameter { - pub fn debug_hint(py: Python, value: &Value) -> Option { - let hint = externs::call_method0(py, value, "debug_hint").ok()?; - if hint.is_none(py) { + pub fn debug_hint(obj: &PyAny) -> Option { + let hint = obj.call_method0("debug_hint").ok()?; + if hint.is_none() { return None; } - hint.extract(py).ok() + hint.extract().ok() } - pub fn metadata(py: Python, context: &Context, value: &Value) -> Vec<(String, UserMetadataItem)> { - metadata(py, context, value).unwrap_or_else(Vec::new) + pub fn metadata(context: &Context, obj: &PyAny) -> Vec<(String, UserMetadataItem)> { + metadata(context, obj).unwrap_or_else(Vec::new) } } -fn metadata( - py: Python, - context: &Context, - value: &Value, -) -> Option> { - let metadata_val = externs::call_method0(py, value, "metadata").ok()?; - if metadata_val.is_none(py) { +fn metadata(context: &Context, obj: &PyAny) -> Option> { + let metadata_val = obj.call_method0("metadata").ok()?; + if metadata_val.is_none() { return None; } let mut output = Vec::new(); - let metadata_dict: &PyDict = metadata_val.cast_as::(py).ok()?; + let metadata_dict = metadata_val.cast_as::().ok()?; - for (key, value) in metadata_dict.items(py).into_iter() { - let key_name: String = key.extract(py).ok()?; + for kv_pair in metadata_dict.items().into_iter() { + let (key, value): (String, &PyAny) = kv_pair.extract().ok()?; let py_value_handle = UserMetadataPyValue::new(); let umi = UserMetadataItem::PyValue(py_value_handle.clone()); context.session.with_metadata_map(|map| { - map.insert(py_value_handle.clone(), value.into()); + map.insert(py_value_handle.clone(), Value::new(value.into_py(obj.py()))); }); - output.push((key_name, umi)); + output.push((key, umi)); } Some(output) } diff --git a/src/rust/engine/src/externs/fs.rs b/src/rust/engine/src/externs/fs.rs index 772f719f8aa..89e78dd6fa6 100644 --- a/src/rust/engine/src/externs/fs.rs +++ b/src/rust/engine/src/externs/fs.rs @@ -1,158 +1,145 @@ // Copyright 2020 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -// File-specific allowances to silence internal warnings of `py_class!`. -#![allow( - unused_braces, - clippy::manual_strip, - clippy::used_underscore_binding, - clippy::transmute_ptr_to_ptr, - clippy::zero_ptr -)] - -use std::borrow::Cow; - -use cpython::{ - exc, py_class, CompareOp, PyErr, PyObject, PyResult, PyString, PyTuple, Python, PythonObject, - ToPyObject, -}; use either::Either; +use itertools::Itertools; +use pyo3::basic::CompareOp; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; +use pyo3::types::{PyString, PyTuple, PyType}; + use fs::PathStat; use hashing::{Digest, Fingerprint}; -use itertools::Itertools; use store::Snapshot; -/// -/// Data members and `create_instance` methods are module-private by default, so we expose them -/// with public top-level functions. -/// -/// TODO: See https://github.com/dgrunwald/rust-cpython/issues/242 -/// - -pub fn to_py_digest(py: Python, digest: Digest) -> PyResult { - PyDigest::create_instance(py, digest) -} - -pub fn from_py_digest(digest: &PyObject) -> PyResult { - let gil = Python::acquire_gil(); - let py = gil.python(); - let py_digest = digest.extract::(py)?; - Ok(*py_digest.digest(py)) -} - -py_class!(pub class PyDigest |py| { - data digest: Digest; - def __new__(_cls, fingerprint: Cow, serialized_bytes_length: usize) -> PyResult { - let fingerprint = Fingerprint::from_hex_string(&fingerprint) - .map_err(|e| { - PyErr::new::(py, format!("Invalid digest hex: {}", e)) - })?; - Self::create_instance(py, Digest::new(fingerprint, serialized_bytes_length)) - } - - @property def fingerprint(&self) -> PyResult { - Ok(self.digest(py).hash.to_hex()) - } - - @property def serialized_bytes_length(&self) -> PyResult { - Ok(self.digest(py).size_bytes) - } - - def __richcmp__(&self, other: PyDigest, op: CompareOp) -> PyResult { - match op { - CompareOp::Eq => { - let res = self.digest(py) == other.digest(py); - Ok(res.to_py_object(py).into_object()) - }, - CompareOp::Ne => { - let res = self.digest(py) != other.digest(py); - Ok(res.to_py_object(py).into_object()) - } - _ => Ok(py.NotImplemented()), - } - } - - def __hash__(&self) -> PyResult { - Ok(self.digest(py).hash.prefix_hash()) +// ----------------------------------------------------------------------------- +// Digest +// ----------------------------------------------------------------------------- + +#[pyclass] +#[derive(Clone)] +pub struct PyDigest(pub Digest); + +#[pymethods] +impl PyDigest { + #[new] + fn __new__(fingerprint: &str, serialized_bytes_length: usize) -> PyResult { + let fingerprint = Fingerprint::from_hex_string(fingerprint) + .map_err(|e| PyValueError::new_err(format!("Invalid digest hex: {}", e)))?; + Ok(Self(Digest::new(fingerprint, serialized_bytes_length))) + } + + fn __hash__(&self) -> u64 { + self.0.hash.prefix_hash() + } + + fn __repr__(&self) -> String { + format!("Digest('{}', {})", self.0.hash.to_hex(), self.0.size_bytes) + } + + fn __richcmp__(&self, other: &PyDigest, op: CompareOp, py: Python) -> PyObject { + match op { + CompareOp::Eq => (self.0 == other.0).into_py(py), + CompareOp::Ne => (self.0 != other.0).into_py(py), + _ => py.NotImplemented(), } + } - def __repr__(&self) -> PyResult { - Ok(format!("Digest('{}', {})", self.digest(py).hash.to_hex(), self.digest(py).size_bytes)) - } -}); + #[getter] + fn fingerprint(&self) -> String { + self.0.hash.to_hex() + } -pub fn to_py_snapshot(py: Python, snapshot: Snapshot) -> PyResult { - PySnapshot::create_instance(py, snapshot) + #[getter] + fn serialized_bytes_length(&self) -> usize { + self.0.size_bytes + } } -py_class!(pub class PySnapshot |py| { - data snapshot: Snapshot; - def __new__(_cls) -> PyResult { - Self::create_instance(py, Snapshot::empty()) - } - - @classmethod def _create_for_testing( - _cls, - py_digest: PyDigest, - files: Vec, - dirs: Vec, - ) -> PyResult { - let snapshot = unsafe { - Snapshot::create_for_testing_ffi(*py_digest.digest(py), files, dirs) - }; - Self::create_instance(py, snapshot) - } - - @property def digest(&self) -> PyResult { - to_py_digest(py, self.snapshot(py).digest) +// ----------------------------------------------------------------------------- +// Snapshot +// ----------------------------------------------------------------------------- + +#[pyclass] +pub struct PySnapshot(pub Snapshot); + +#[pymethods] +impl PySnapshot { + #[new] + fn __new__() -> Self { + Self(Snapshot::empty()) + } + + #[classmethod] + fn _create_for_testing( + _cls: &PyType, + py_digest: PyDigest, + files: Vec, + dirs: Vec, + ) -> Self { + let snapshot = unsafe { Snapshot::create_for_testing_ffi(py_digest.0, files, dirs) }; + Self(snapshot) + } + + fn __hash__(&self) -> u64 { + self.0.digest.hash.prefix_hash() + } + + fn __repr__(&self) -> PyResult { + let (dirs, files): (Vec<_>, Vec<_>) = self.0.path_stats.iter().partition_map(|ps| match ps { + PathStat::Dir { path, .. } => Either::Left(path.to_string_lossy()), + PathStat::File { path, .. } => Either::Right(path.to_string_lossy()), + }); + + Ok(format!( + "Snapshot(digest=({}, {}), dirs=({}), files=({}))", + self.0.digest.hash.to_hex(), + self.0.digest.size_bytes, + dirs.join(","), + files.join(",") + )) + } + + fn __richcmp__(&self, other: &PySnapshot, op: CompareOp, py: Python) -> PyObject { + match op { + CompareOp::Eq => (self.0.digest == other.0.digest).into_py(py), + CompareOp::Ne => (self.0.digest != other.0.digest).into_py(py), + _ => py.NotImplemented(), } - - @property def files(&self) -> PyResult { - let files = self.snapshot(py).path_stats.iter().filter_map(|ps| match ps { + } + + #[getter] + fn digest(&self) -> PyDigest { + PyDigest(self.0.digest) + } + + #[getter] + fn files<'py>(&self, py: Python<'py>) -> &'py PyTuple { + let files = self + .0 + .path_stats + .iter() + .filter_map(|ps| match ps { PathStat::File { path, .. } => path.to_str(), _ => None, - }).map(|ps| PyString::new(py, ps).into_object()).collect::>(); - Ok(PyTuple::new(py, &files)) - } - - @property def dirs(&self) -> PyResult { - let dirs = self.snapshot(py).path_stats.iter().filter_map(|ps| match ps { + }) + .map(|ps| PyString::new(py, ps)) + .collect::>(); + PyTuple::new(py, files) + } + + #[getter] + fn dirs<'py>(&self, py: Python<'py>) -> &'py PyTuple { + let dirs = self + .0 + .path_stats + .iter() + .filter_map(|ps| match ps { PathStat::Dir { path, .. } => path.to_str(), _ => None, - }).map(|ps| PyString::new(py, ps).into_object()).collect::>(); - Ok(PyTuple::new(py, &dirs)) - } - - def __richcmp__(&self, other: PySnapshot, op: CompareOp) -> PyResult { - match op { - CompareOp::Eq => { - let res = self.snapshot(py).digest == other.snapshot(py).digest; - Ok(res.to_py_object(py).into_object()) - }, - CompareOp::Ne => { - let res = self.snapshot(py).digest != other.snapshot(py).digest; - Ok(res.to_py_object(py).into_object()) - } - _ => Ok(py.NotImplemented()), - } - } - - def __hash__(&self) -> PyResult { - Ok(self.snapshot(py).digest.hash.prefix_hash()) - } - - def __repr__(&self) -> PyResult { - let (dirs, files): (Vec<_>, Vec<_>) = self.snapshot(py).path_stats.iter().partition_map(|ps| match ps { - PathStat::Dir { path, .. } => Either::Left(path.to_string_lossy()), - PathStat::File { path, .. } => Either::Right(path.to_string_lossy()), - }); - - Ok(format!( - "Snapshot(digest=({}, {}), dirs=({}), files=({}))", - self.snapshot(py).digest.hash.to_hex(), - self.snapshot(py).digest.size_bytes, - dirs.join(","), - files.join(",") - )) - } -}); + }) + .map(|ps| PyString::new(py, ps)) + .collect::>(); + PyTuple::new(py, dirs) + } +} diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index de9da5beff3..7784715c3eb 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -1,28 +1,16 @@ // Copyright 2020 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -// File-specific allowances to silence internal warnings of `py_class!`. -#![allow( - unused_braces, - clippy::manual_strip, - clippy::used_underscore_binding, - clippy::transmute_ptr_to_ptr, - clippy::zero_ptr -)] +// File-specific allowances to silence internal warnings of `[pyclass]`. +#![allow(clippy::used_underscore_binding)] -/// -/// This crate is a wrapper around the engine crate which exposes a python module via cpython. -/// -/// The engine crate contains some cpython interop which we use, notably externs which are functions -/// and types from Python which we can read from our Rust. This particular wrapper crate is just for -/// how we expose ourselves back to Python. +/// This crate is a wrapper around the engine crate which exposes a Python module via PyO3. use std::any::Any; use std::cell::RefCell; use std::collections::hash_map::HashMap; use std::convert::TryInto; use std::fs::File; use std::io; -use std::os::unix::ffi::OsStrExt; use std::panic; use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -30,13 +18,8 @@ use std::sync::Arc; use std::time::Duration; use async_latch::AsyncLatch; -use cpython::{ - exc, py_class, py_exception, py_fn, py_module_initializer, NoArgs, PyBytes, PyClone, PyDict, - PyErr, PyList, PyObject, PyResult as CPyResult, PyString, PyTuple, PyType, Python, PythonObject, - ToPyObject, -}; +use futures::future; use futures::future::FutureExt; -use futures::future::{self, TryFutureExt}; use futures::Future; use hashing::Digest; use log::{self, debug, error, warn, Log}; @@ -44,6 +27,13 @@ use logging::logger::PANTS_LOGGER; use logging::{Logger, PythonLogLevel}; use petgraph::graph::{DiGraph, Graph}; use process_execution::RemoteCacheWarningsBehavior; +use pyo3::exceptions::{PyException, PyIOError, PyKeyboardInterrupt, PyValueError}; +use pyo3::prelude::{ + pyclass, pyfunction, pymethods, pymodule, wrap_pyfunction, Py, PyModule, PyObject, + PyResult as PyO3Result, Python, +}; +use pyo3::types::{PyBytes, PyDict, PyList, PyString, PyTuple, PyType}; +use pyo3::{create_exception, IntoPy, PyAny}; use regex::Regex; use rule_graph::{self, RuleGraph}; use task_executor::Executor; @@ -57,488 +47,232 @@ use crate::{ Scheduler, Session, Tasks, TypeId, Types, Value, }; -py_exception!(native_engine, PollTimeout); - -py_module_initializer!(native_engine, |py, m| { - m.add(py, "PollTimeout", py.get_type::()) - .unwrap(); - - m.add( - py, - "stdio_initialize", - py_fn!( - py, - stdio_initialize( - a: u64, - b: bool, - d: bool, - e: PyDict, - f: Vec, - g: Vec, - h: String - ) - ), - )?; - m.add( - py, - "stdio_thread_console_set", - py_fn!( - py, - stdio_thread_console_set(stdin_fileno: i32, stdout_fileno: i32, stderr_fileno: i32) - ), - )?; - m.add( - py, - "stdio_thread_console_color_mode_set", - py_fn!(py, stdio_thread_console_color_mode_set(use_color: bool)), - )?; - m.add( - py, - "stdio_thread_console_clear", - py_fn!(py, stdio_thread_console_clear()), - )?; - m.add( - py, - "stdio_thread_get_destination", - py_fn!(py, stdio_thread_get_destination()), - )?; - m.add( - py, - "stdio_thread_set_destination", - py_fn!(py, stdio_thread_set_destination(a: PyStdioDestination)), - )?; +#[pymodule] +fn native_engine(py: Python, m: &PyModule) -> PyO3Result<()> { + m.add("PollTimeout", py.get_type::())?; + + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + + m.add_class::()?; + m.add_class::()?; + + m.add_function(wrap_pyfunction!(stdio_initialize, m)?)?; + m.add_function(wrap_pyfunction!(stdio_thread_console_set, m)?)?; + m.add_function(wrap_pyfunction!(stdio_thread_console_color_mode_set, m)?)?; + m.add_function(wrap_pyfunction!(stdio_thread_console_clear, m)?)?; + m.add_function(wrap_pyfunction!(stdio_thread_get_destination, m)?)?; + m.add_function(wrap_pyfunction!(stdio_thread_set_destination, m)?)?; + + m.add_function(wrap_pyfunction!(flush_log, m)?)?; + m.add_function(wrap_pyfunction!(write_log, m)?)?; + m.add_function(wrap_pyfunction!(set_per_run_log_path, m)?)?; + m.add_function(wrap_pyfunction!(teardown_dynamic_ui, m)?)?; + m.add_function(wrap_pyfunction!(maybe_set_panic_handler, m)?)?; + + m.add_function(wrap_pyfunction!(task_side_effected, m)?)?; + + m.add_function(wrap_pyfunction!(tasks_task_begin, m)?)?; + m.add_function(wrap_pyfunction!(tasks_task_end, m)?)?; + m.add_function(wrap_pyfunction!(tasks_add_get, m)?)?; + m.add_function(wrap_pyfunction!(tasks_add_union, m)?)?; + m.add_function(wrap_pyfunction!(tasks_add_select, m)?)?; + m.add_function(wrap_pyfunction!(tasks_add_query, m)?)?; + + m.add_function(wrap_pyfunction!(write_digest, m)?)?; + m.add_function(wrap_pyfunction!(capture_snapshots, m)?)?; + + m.add_function(wrap_pyfunction!(graph_invalidate_paths, m)?)?; + m.add_function(wrap_pyfunction!(graph_invalidate_all_paths, m)?)?; + m.add_function(wrap_pyfunction!(graph_invalidate_all, m)?)?; + m.add_function(wrap_pyfunction!(graph_len, m)?)?; + m.add_function(wrap_pyfunction!(graph_visualize, m)?)?; + + m.add_function(wrap_pyfunction!(nailgun_server_create, m)?)?; + m.add_function(wrap_pyfunction!(nailgun_server_await_shutdown, m)?)?; + + m.add_function(wrap_pyfunction!(garbage_collect_store, m)?)?; + m.add_function(wrap_pyfunction!(lease_files_in_graph, m)?)?; + m.add_function(wrap_pyfunction!(check_invalidation_watcher_liveness, m)?)?; + + m.add_function(wrap_pyfunction!(validate_reachability, m)?)?; + m.add_function(wrap_pyfunction!(rule_graph_consumed_types, m)?)?; + m.add_function(wrap_pyfunction!(rule_graph_visualize, m)?)?; + m.add_function(wrap_pyfunction!(rule_subgraph_visualize, m)?)?; + + m.add_function(wrap_pyfunction!(execution_add_root_select, m)?)?; + + m.add_function(wrap_pyfunction!(session_new_run_id, m)?)?; + m.add_function(wrap_pyfunction!(session_poll_workunits, m)?)?; + m.add_function(wrap_pyfunction!(session_run_interactive_process, m)?)?; + m.add_function(wrap_pyfunction!(session_get_observation_histograms, m)?)?; + m.add_function(wrap_pyfunction!(session_record_test_observation, m)?)?; + m.add_function(wrap_pyfunction!(session_isolated_shallow_clone, m)?)?; + + m.add_function(wrap_pyfunction!(single_file_digests_to_bytes, m)?)?; + m.add_function(wrap_pyfunction!(ensure_remote_has_recursive, m)?)?; + + m.add_function(wrap_pyfunction!(scheduler_execute, m)?)?; + m.add_function(wrap_pyfunction!(scheduler_metrics, m)?)?; + m.add_function(wrap_pyfunction!(scheduler_create, m)?)?; + m.add_function(wrap_pyfunction!(scheduler_shutdown, m)?)?; + + m.add_function(wrap_pyfunction!(strongly_connected_components, m)?)?; - m.add(py, "flush_log", py_fn!(py, flush_log()))?; - m.add( - py, - "write_log", - py_fn!(py, write_log(msg: String, level: u64, target: String)), - )?; - m.add( - py, - "set_per_run_log_path", - py_fn!(py, set_per_run_log_path(a: Option)), - )?; - - m.add(py, "task_side_effected", py_fn!(py, task_side_effected()))?; - m.add( - py, - "teardown_dynamic_ui", - py_fn!(py, teardown_dynamic_ui(a: PyScheduler, b: PySession)), - )?; - - m.add( - py, - "maybe_set_panic_handler", - py_fn!(py, maybe_set_panic_handler()), - )?; - - m.add( - py, - "write_digest", - py_fn!( - py, - write_digest(a: PyScheduler, b: PySession, c: PyObject, d: String) - ), - )?; - m.add( - py, - "capture_snapshots", - py_fn!( - py, - capture_snapshots(a: PyScheduler, b: PySession, c: PyObject) - ), - )?; - - m.add( - py, - "graph_invalidate_paths", - py_fn!(py, graph_invalidate_paths(a: PyScheduler, b: Vec)), - )?; - m.add( - py, - "graph_invalidate_all_paths", - py_fn!(py, graph_invalidate_all_paths(a: PyScheduler)), - )?; - m.add( - py, - "graph_invalidate_all", - py_fn!(py, graph_invalidate_all(a: PyScheduler)), - )?; - m.add(py, "graph_len", py_fn!(py, graph_len(a: PyScheduler)))?; - m.add( - py, - "graph_visualize", - py_fn!(py, graph_visualize(a: PyScheduler, b: PySession, d: String)), - )?; - - m.add( - py, - "nailgun_server_create", - py_fn!( - py, - nailgun_server_create(a: PyExecutor, b: u16, c: PyObject) - ), - )?; - m.add( - py, - "nailgun_server_await_shutdown", - py_fn!(py, nailgun_server_await_shutdown(a: PyNailgunServer)), - )?; - - m.add( - py, - "garbage_collect_store", - py_fn!(py, garbage_collect_store(a: PyScheduler, b: usize)), - )?; - m.add( - py, - "lease_files_in_graph", - py_fn!(py, lease_files_in_graph(a: PyScheduler, b: PySession)), - )?; - m.add( - py, - "check_invalidation_watcher_liveness", - py_fn!(py, check_invalidation_watcher_liveness(a: PyScheduler)), - )?; - - m.add( - py, - "validate_reachability", - py_fn!(py, validate_reachability(a: PyScheduler)), - )?; - m.add( - py, - "rule_graph_consumed_types", - py_fn!( - py, - rule_graph_consumed_types(a: PyScheduler, b: Vec, c: PyType) - ), - )?; - m.add( - py, - "rule_graph_visualize", - py_fn!(py, rule_graph_visualize(a: PyScheduler, b: String)), - )?; - m.add( - py, - "rule_subgraph_visualize", - py_fn!( - py, - rule_subgraph_visualize(a: PyScheduler, b: Vec, c: PyType, d: String) - ), - )?; - - m.add( - py, - "execution_add_root_select", - py_fn!( - py, - execution_add_root_select( - a: PyScheduler, - b: PyExecutionRequest, - c: Vec, - d: PyType - ) - ), - )?; + Ok(()) +} - m.add( - py, - "session_new_run_id", - py_fn!(py, session_new_run_id(a: PySession)), - )?; - m.add( - py, - "session_poll_workunits", - py_fn!( - py, - session_poll_workunits(a: PyScheduler, b: PySession, c: u64) - ), - )?; - m.add( - py, - "session_run_interactive_process", - py_fn!( - py, - session_run_interactive_process(a: PySession, b: PyObject) - ), - )?; - m.add( - py, - "session_get_observation_histograms", - py_fn!( - py, - session_get_observation_histograms(a: PyScheduler, b: PySession) - ), - )?; - m.add( - py, - "session_record_test_observation", - py_fn!( - py, - session_record_test_observation(a: PyScheduler, b: PySession, c: u64) - ), - )?; - m.add( - py, - "session_isolated_shallow_clone", - py_fn!(py, session_isolated_shallow_clone(a: PySession, b: String)), - )?; +create_exception!(native_engine, PollTimeout, PyException); - m.add( - py, - "tasks_task_begin", - py_fn!( - py, - tasks_task_begin( - tasks: PyTasks, - func: PyObject, - return_type: PyType, - side_effecting: bool, - engine_aware_return_type: bool, - cacheable: bool, - name: String, - desc: String, - level: u64 - ) - ), - )?; - m.add(py, "tasks_task_end", py_fn!(py, tasks_task_end(a: PyTasks)))?; - m.add( - py, - "tasks_add_get", - py_fn!(py, tasks_add_get(a: PyTasks, b: PyType, c: PyType)), - )?; - m.add( - py, - "tasks_add_union", - py_fn!(py, tasks_add_union(a: PyTasks, b: PyType, c: Vec)), - )?; - m.add( - py, - "tasks_add_select", - py_fn!(py, tasks_add_select(a: PyTasks, b: PyType)), - )?; - m.add( - py, - "tasks_add_query", - py_fn!(py, tasks_add_query(a: PyTasks, b: PyType, c: Vec)), - )?; +#[pyclass] +struct PyTasks(RefCell); - m.add( - py, - "scheduler_execute", - py_fn!( - py, - scheduler_execute(a: PyScheduler, b: PySession, c: PyExecutionRequest) - ), - )?; - m.add( - py, - "scheduler_metrics", - py_fn!(py, scheduler_metrics(a: PyScheduler, b: PySession)), - )?; - m.add( - py, - "scheduler_create", - py_fn!( - py, - scheduler_create( - executor_ptr: PyExecutor, - tasks_ptr: PyTasks, - types_ptr: PyTypes, - build_root_buf: String, - local_execution_root_dir_buf: String, - named_caches_dir_buf: String, - ca_certs_path: Option, - ignore_patterns: Vec, - use_gitignore: bool, - watch_filesystem: bool, - remoting_options: PyRemotingOptions, - local_store_options: PyLocalStoreOptions, - exec_strategy_opts: PyExecutionStrategyOptions - ) - ), - )?; - m.add( - py, - "scheduler_shutdown", - py_fn!(py, scheduler_shutdown(a: PyScheduler, b: u64)), - )?; +#[pymethods] +impl PyTasks { + #[new] + fn __new__() -> Self { + Self(RefCell::new(Tasks::new())) + } +} - m.add( - py, - "single_file_digests_to_bytes", - py_fn!(py, single_file_digests_to_bytes(a: PyScheduler, b: PyList)), - )?; +#[pyclass] +struct PyTypes(RefCell>); + +#[pymethods] +impl PyTypes { + #[new] + fn __new__( + file_digest: &PyType, + snapshot: &PyType, + paths: &PyType, + file_content: &PyType, + file_entry: &PyType, + directory: &PyType, + digest_contents: &PyType, + digest_entries: &PyType, + path_globs: &PyType, + merge_digests: &PyType, + add_prefix: &PyType, + remove_prefix: &PyType, + create_digest: &PyType, + digest_subset: &PyType, + download_file: &PyType, + platform: &PyType, + multi_platform_process: &PyType, + process_result: &PyType, + process_result_metadata: &PyType, + coroutine: &PyType, + session_values: &PyType, + run_id: &PyType, + interactive_process: &PyType, + interactive_process_result: &PyType, + engine_aware_parameter: &PyType, + py: Python, + ) -> Self { + Self(RefCell::new(Some(Types { + directory_digest: TypeId::new(py.get_type::()), + file_digest: TypeId::new(file_digest), + snapshot: TypeId::new(snapshot), + paths: TypeId::new(paths), + file_content: TypeId::new(file_content), + file_entry: TypeId::new(file_entry), + directory: TypeId::new(directory), + digest_contents: TypeId::new(digest_contents), + digest_entries: TypeId::new(digest_entries), + path_globs: TypeId::new(path_globs), + merge_digests: TypeId::new(merge_digests), + add_prefix: TypeId::new(add_prefix), + remove_prefix: TypeId::new(remove_prefix), + create_digest: TypeId::new(create_digest), + digest_subset: TypeId::new(digest_subset), + download_file: TypeId::new(download_file), + platform: TypeId::new(platform), + multi_platform_process: TypeId::new(multi_platform_process), + process_result: TypeId::new(process_result), + process_result_metadata: TypeId::new(process_result_metadata), + coroutine: TypeId::new(coroutine), + session_values: TypeId::new(session_values), + run_id: TypeId::new(run_id), + interactive_process: TypeId::new(interactive_process), + interactive_process_result: TypeId::new(interactive_process_result), + engine_aware_parameter: TypeId::new(engine_aware_parameter), + }))) + } +} - m.add( - py, - "ensure_remote_has_recursive", - py_fn!(py, ensure_remote_has_recursive(a: PyScheduler, b: PyList)), - )?; +#[pyclass] +struct PyExecutor(task_executor::Executor); - m.add( - py, - "strongly_connected_components", - py_fn!( - py, - strongly_connected_components(a: Vec<(PyObject, Vec)>) - ), - )?; - - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - - m.add_class::(py)?; - m.add_class::(py)?; - m.add_class::(py)?; - - m.add_class::(py)?; - m.add_class::(py)?; - - m.add_class::(py)?; +#[pymethods] +impl PyExecutor { + #[new] + fn __new__(core_threads: usize, max_threads: usize) -> PyO3Result { + let executor = Executor::global(core_threads, max_threads).map_err(PyException::new_err)?; + Ok(Self(executor)) + } +} - Ok(()) -}); +#[pyclass] +struct PyScheduler(Scheduler); -py_class!(class PyTasks |py| { - data tasks: RefCell; - def __new__(_cls) -> CPyResult { - Self::create_instance(py, RefCell::new(Tasks::new())) - } -}); - -py_class!(class PyTypes |py| { - data types: RefCell>; - - def __new__( - _cls, - file_digest: PyType, - snapshot: PyType, - paths: PyType, - file_content: PyType, - file_entry: PyType, - directory: PyType, - digest_contents: PyType, - digest_entries: PyType, - path_globs: PyType, - merge_digests: PyType, - add_prefix: PyType, - remove_prefix: PyType, - create_digest: PyType, - digest_subset: PyType, - download_file: PyType, - platform: PyType, - multi_platform_process: PyType, - process_result: PyType, - process_result_metadata: PyType, - coroutine: PyType, - session_values: PyType, - run_id: PyType, - interactive_process: PyType, - interactive_process_result: PyType, - engine_aware_parameter: PyType - ) -> CPyResult { - Self::create_instance( - py, - RefCell::new(Some(Types { - directory_digest: TypeId::new(&py.get_type::()), - file_digest: TypeId::new(&file_digest), - snapshot: TypeId::new(&snapshot), - paths: TypeId::new(&paths), - file_content: TypeId::new(&file_content), - file_entry: TypeId::new(&file_entry), - directory: TypeId::new(&directory), - digest_contents: TypeId::new(&digest_contents), - digest_entries: TypeId::new(&digest_entries), - path_globs: TypeId::new(&path_globs), - merge_digests: TypeId::new(&merge_digests), - add_prefix: TypeId::new(&add_prefix), - remove_prefix: TypeId::new(&remove_prefix), - create_digest: TypeId::new(&create_digest), - digest_subset: TypeId::new(&digest_subset), - download_file: TypeId::new(&download_file), - platform: TypeId::new(&platform), - multi_platform_process: TypeId::new(&multi_platform_process), - process_result: TypeId::new(&process_result), - process_result_metadata: TypeId::new(&process_result_metadata), - coroutine: TypeId::new(&coroutine), - session_values: TypeId::new(&session_values), - run_id: TypeId::new(&run_id), - interactive_process: TypeId::new(&interactive_process), - interactive_process_result: TypeId::new(&interactive_process_result), - engine_aware_parameter: TypeId::new(&engine_aware_parameter), - })), - ) - } -}); +#[pyclass] +struct PyStdioDestination(Arc); -py_class!(pub class PyExecutor |py| { - data executor: Executor; - def __new__(_cls, core_threads: usize, max_threads: usize) -> CPyResult { - let executor = Executor::global(core_threads, max_threads).map_err(|e| PyErr::new::(py, (e,)))?; - Self::create_instance(py, executor) - } -}); - -py_class!(class PyScheduler |py| { - data scheduler: Scheduler; -}); - -py_class!(class PyStdioDestination |py| { - data destination: Arc; -}); - -// Represents configuration related to process execution strategies. -// -// The data stored by PyExecutionStrategyOptions originally was passed directly into -// scheduler_create but has been broken out separately because the large number of options -// became unwieldy. -py_class!(class PyExecutionStrategyOptions |py| { - data options: ExecutionStrategyOptions; - - def __new__( - _cls, +/// Represents configuration related to process execution strategies. +/// +/// The data stored by PyExecutionStrategyOptions originally was passed directly into +/// scheduler_create but has been broken out separately because the large number of options +/// became unwieldy. +#[pyclass] +struct PyExecutionStrategyOptions(ExecutionStrategyOptions); + +#[pymethods] +impl PyExecutionStrategyOptions { + #[new] + fn __new__( local_parallelism: usize, remote_parallelism: usize, local_cleanup: bool, local_cache: bool, local_enable_nailgun: bool, remote_cache_read: bool, - remote_cache_write: bool - ) -> CPyResult { - Self::create_instance(py, - ExecutionStrategyOptions { - local_parallelism, - remote_parallelism, - local_cleanup, - local_cache, - local_enable_nailgun, - remote_cache_read, - remote_cache_write, - } - ) + remote_cache_write: bool, + ) -> Self { + Self(ExecutionStrategyOptions { + local_parallelism, + remote_parallelism, + local_cleanup, + local_cache, + local_enable_nailgun, + remote_cache_read, + remote_cache_write, + }) } -}); +} -// Represents configuration related to remote execution and caching. -py_class!(class PyRemotingOptions |py| { - data options: RemotingOptions; +/// Represents configuration related to remote execution and caching. +#[pyclass] +struct PyRemotingOptions(RemotingOptions); - def __new__( - _cls, +#[pymethods] +impl PyRemotingOptions { + #[new] + fn __new__( execution_enable: bool, store_address: Option, execution_address: Option, @@ -558,176 +292,173 @@ py_class!(class PyRemotingOptions |py| { execution_headers: Vec<(String, String)>, execution_overall_deadline_secs: u64, execution_rpc_concurrency: usize, - ) -> CPyResult { - Self::create_instance(py, - RemotingOptions { - execution_enable, - store_address, - execution_address, - execution_process_cache_namespace, - instance_name, - root_ca_certs_path: root_ca_certs_path.map(PathBuf::from), - store_headers: store_headers.into_iter().collect(), - store_chunk_bytes, - store_chunk_upload_timeout: Duration::from_secs(store_chunk_upload_timeout), - store_rpc_retries, - store_rpc_concurrency, - store_batch_api_size_limit, - cache_warnings_behavior: RemoteCacheWarningsBehavior::from_str(&cache_warnings_behavior).unwrap(), - cache_eager_fetch, - cache_rpc_concurrency, - execution_extra_platform_properties, - execution_headers: execution_headers.into_iter().collect(), - execution_overall_deadline: Duration::from_secs(execution_overall_deadline_secs), - execution_rpc_concurrency, - } - ) + ) -> Self { + Self(RemotingOptions { + execution_enable, + store_address, + execution_address, + execution_process_cache_namespace, + instance_name, + root_ca_certs_path: root_ca_certs_path.map(PathBuf::from), + store_headers: store_headers.into_iter().collect(), + store_chunk_bytes, + store_chunk_upload_timeout: Duration::from_secs(store_chunk_upload_timeout), + store_rpc_retries, + store_rpc_concurrency, + store_batch_api_size_limit, + cache_warnings_behavior: RemoteCacheWarningsBehavior::from_str(&cache_warnings_behavior) + .unwrap(), + cache_eager_fetch, + cache_rpc_concurrency, + execution_extra_platform_properties, + execution_headers: execution_headers.into_iter().collect(), + execution_overall_deadline: Duration::from_secs(execution_overall_deadline_secs), + execution_rpc_concurrency, + }) } -}); +} -py_class!(class PyLocalStoreOptions |py| { - data options: LocalStoreOptions; +#[pyclass] +struct PyLocalStoreOptions(LocalStoreOptions); - def __new__( - _cls, +#[pymethods] +impl PyLocalStoreOptions { + #[new] + fn __new__( store_dir: String, process_cache_max_size_bytes: usize, files_max_size_bytes: usize, directories_max_size_bytes: usize, lease_time_millis: u64, shard_count: u8, - ) -> CPyResult { + ) -> PyO3Result { if shard_count.count_ones() != 1 { - let err_string = format!("The local store shard count must be a power of two: got {}", shard_count); - return Err(PyErr::new::(py, (err_string,))); + return Err(PyValueError::new_err(format!( + "The local store shard count must be a power of two: got {}", + shard_count + ))); } - Self::create_instance(py, - LocalStoreOptions { - store_dir: PathBuf::from(store_dir), - process_cache_max_size_bytes, - files_max_size_bytes, - directories_max_size_bytes, - lease_time: Duration::from_millis(lease_time_millis), - shard_count, - } - ) + Ok(Self(LocalStoreOptions { + store_dir: PathBuf::from(store_dir), + process_cache_max_size_bytes, + files_max_size_bytes, + directories_max_size_bytes, + lease_time: Duration::from_millis(lease_time_millis), + shard_count, + })) } -}); - -py_class!(class PySession |py| { - data session: Session; - def __new__(_cls, - scheduler: PyScheduler, - should_render_ui: bool, - build_id: String, - session_values: PyObject, - cancellation_latch: PySessionCancellationLatch, - ) -> CPyResult { - // NB: Session creation interacts with the Graph, which must not be accessed while the GIL is - // held. - let core = scheduler.scheduler(py).core.clone(); - let cancellation_latch = cancellation_latch.cancelled(py).clone(); - let session = py.allow_threads(|| Session::new( +} + +#[pyclass] +struct PySession(Session); + +#[pymethods] +impl PySession { + #[new] + fn __new__( + scheduler: &PyScheduler, + should_render_ui: bool, + build_id: String, + session_values: PyObject, + cancellation_latch: &PySessionCancellationLatch, + py: Python, + ) -> PyO3Result { + let core = scheduler.0.core.clone(); + let cancellation_latch = cancellation_latch.0.clone(); + // NB: Session creation interacts with the Graph, which must not be accessed while the GIL is + // held. + let session = py + .allow_threads(|| { + Session::new( core, should_render_ui, build_id, session_values.into(), cancellation_latch, - )).map_err(|err_str| PyErr::new::(py, (err_str,)))?; - Self::create_instance(py, session) - } - - def cancel(&self) -> PyUnitResult { - self.session(py).cancel(); - Ok(None) - } + ) + }) + .map_err(PyException::new_err)?; + Ok(Self(session)) + } - def is_cancelled(&self) -> CPyResult { - Ok(self.session(py).is_cancelled()) - } -}); + fn cancel(&self) { + self.0.cancel() + } -py_class!(class PySessionCancellationLatch |py| { - data cancelled: AsyncLatch; - def __new__(_cls) -> CPyResult { - Self::create_instance(py, AsyncLatch::new()) - } + fn is_cancelled(&self) -> bool { + self.0.is_cancelled() + } +} - def is_cancelled(&self) -> CPyResult { - Ok(self.cancelled(py).poll_triggered()) - } -}); - -py_class!(class PyNailgunServer |py| { - data server: RefCell>; - data executor: Executor; - - def port(&self) -> CPyResult { - let borrowed_server = self.server(py).borrow(); - let server = borrowed_server.as_ref().ok_or_else(|| { - PyErr::new::(py, ("Cannot get the port of a server that has already shut down.",)) - })?; - Ok(server.port()) - } -}); - -py_class!(class PyExecutionRequest |py| { - data execution_request: RefCell; - def __new__( - _cls, - poll: bool, - poll_delay_in_ms: Option, - timeout_in_ms: Option, - ) -> CPyResult { - let request = ExecutionRequest { - poll, - poll_delay: poll_delay_in_ms.map(Duration::from_millis), - timeout: timeout_in_ms.map(Duration::from_millis), - ..ExecutionRequest::default() - }; - Self::create_instance(py, RefCell::new(request)) - } -}); +#[pyclass] +struct PySessionCancellationLatch(AsyncLatch); -py_class!(class PyResult |py| { - data _is_throw: bool; - data _result: PyObject; - data _python_traceback: PyString; - data _engine_traceback: PyList; +#[pymethods] +impl PySessionCancellationLatch { + #[new] + fn __new__() -> Self { + Self(AsyncLatch::new()) + } - def __new__(_cls, is_throw: bool, result: PyObject, python_traceback: PyString, engine_traceback: PyList) -> CPyResult { - Self::create_instance(py, is_throw, result, python_traceback, engine_traceback) - } + fn is_cancelled(&self) -> bool { + self.0.poll_triggered() + } +} - def is_throw(&self) -> CPyResult { - Ok(*self._is_throw(py)) - } +#[pyclass] +struct PyNailgunServer { + server: RefCell>, + executor: Executor, +} - def result(&self) -> CPyResult { - Ok(self._result(py).clone_ref(py)) - } +#[pymethods] +impl PyNailgunServer { + fn port(&self) -> PyO3Result { + let borrowed_server = self.server.borrow(); + let server = borrowed_server.as_ref().ok_or_else(|| { + PyException::new_err("Cannot get the port of a server that has already shut down.") + })?; + Ok(server.port()) + } +} - def python_traceback(&self) -> CPyResult { - Ok(self._python_traceback(py).clone_ref(py)) - } +#[pyclass] +struct PyExecutionRequest(RefCell); + +#[pymethods] +impl PyExecutionRequest { + #[new] + fn __new__(poll: bool, poll_delay_in_ms: Option, timeout_in_ms: Option) -> Self { + let request = ExecutionRequest { + poll, + poll_delay: poll_delay_in_ms.map(Duration::from_millis), + timeout: timeout_in_ms.map(Duration::from_millis), + ..ExecutionRequest::default() + }; + Self(RefCell::new(request)) + } +} - def engine_traceback(&self) -> CPyResult { - Ok(self._engine_traceback(py).clone_ref(py)) - } -}); +#[pyclass] +struct PyResult { + #[pyo3(get)] + is_throw: bool, + #[pyo3(get)] + result: PyObject, + #[pyo3(get)] + python_traceback: Option, + #[pyo3(get)] + engine_traceback: Vec, +} -fn py_result_from_root(py: Python, result: Result) -> CPyResult { +fn py_result_from_root(py: Python, result: Result) -> PyResult { match result { - Ok(val) => { - let engine_traceback: Vec = vec![]; - PyResult::create_instance( - py, - false, - val.into(), - "".to_py_object(py), - engine_traceback.to_py_object(py), - ) - } + Ok(val) => PyResult { + is_throw: false, + result: val.into(), + python_traceback: None, + engine_traceback: vec![], + }, Err(f) => { let (val, python_traceback, engine_traceback) = match f { f @ Failure::Invalidated => { @@ -745,78 +476,47 @@ fn py_result_from_root(py: Python, result: Result) -> CPyResult< engine_traceback, } => (val, python_traceback, engine_traceback), }; - PyResult::create_instance( - py, - true, - val.into(), - python_traceback.to_py_object(py), - engine_traceback.to_py_object(py), - ) + PyResult { + is_throw: true, + result: val.into(), + python_traceback: Some(python_traceback), + engine_traceback, + } } } } -// TODO: It's not clear how to return "nothing" (None) in a CPyResult, so this is a placeholder. -type PyUnitResult = CPyResult>; - +#[pyfunction] fn nailgun_server_create( - py: Python, - executor_ptr: PyExecutor, + executor_ptr: &PyExecutor, port: u16, runner: PyObject, -) -> CPyResult { - with_executor(py, &executor_ptr, |executor| { +) -> PyO3Result { + with_executor(executor_ptr, |executor| { let server_future = { - let runner: Value = runner.into(); let executor = executor.clone(); nailgun::Server::new(executor, port, move |exe: nailgun::RawFdExecution| { let gil = Python::acquire_gil(); let py = gil.python(); - let command = externs::store_utf8(py, &exe.cmd.command); - let args = externs::store_tuple( - py, - exe - .cmd - .args - .iter() - .map(|s| externs::store_utf8(py, s)) - .collect::>(), - ); - let env = externs::store_dict( - py, - exe - .cmd - .env - .iter() - .map(|(k, v)| (externs::store_utf8(py, k), externs::store_utf8(py, v))) - .collect::>(), - ) - .unwrap(); - let working_dir = externs::store_bytes(py, exe.cmd.working_dir.as_os_str().as_bytes()); - let stdin_fd = externs::store_i64(py, exe.stdin_fd.into()); - let stdout_fd = externs::store_i64(py, exe.stdout_fd.into()); - let stderr_fd = externs::store_i64(py, exe.stderr_fd.into()); - let cancellation_latch = PySessionCancellationLatch::create_instance(py, exe.cancelled) - .unwrap() - .into_object() - .into(); - let runner_args = vec![ - command, - args, - env, - working_dir, - cancellation_latch, - stdin_fd, - stdout_fd, - stderr_fd, - ]; - match externs::call_function(&runner, &runner_args) { + let result = runner.as_ref(py).call1(( + exe.cmd.command, + PyTuple::new(py, exe.cmd.args), + exe.cmd.env.into_iter().collect::>(), + PySessionCancellationLatch(exe.cancelled), + exe.stdin_fd as i64, + exe.stdout_fd as i64, + exe.stderr_fd as i64, + )); + match result { Ok(exit_code) => { - let code: i32 = exit_code.extract(py).unwrap(); + let code: i32 = exit_code.extract().unwrap(); nailgun::ExitCode(code) } Err(e) => { - error!("Uncaught exception in nailgun handler: {:#?}", e); + error!( + "Uncaught exception in nailgun handler: {:#?}", + Failure::from_py_err_with_gil(py, e) + ); nailgun::ExitCode(1) } } @@ -825,38 +525,43 @@ fn nailgun_server_create( let server = executor .block_on(server_future) - .map_err(|e| PyErr::new::(py, (e,)))?; - PyNailgunServer::create_instance(py, RefCell::new(Some(server)), executor.clone()) + .map_err(PyException::new_err)?; + Ok(PyNailgunServer { + server: RefCell::new(Some(server)), + executor: executor.clone(), + }) }) } -fn nailgun_server_await_shutdown(py: Python, nailgun_server_ptr: PyNailgunServer) -> PyUnitResult { - with_nailgun_server(py, nailgun_server_ptr, |nailgun_server, executor| { - let executor = executor.clone(); - let shutdown_result = if let Some(server) = nailgun_server.borrow_mut().take() { - py.allow_threads(|| executor.block_on(server.shutdown())) - } else { - Ok(()) - }; - shutdown_result.map_err(|e| PyErr::new::(py, (e,)))?; - Ok(None) - }) +#[pyfunction] +fn nailgun_server_await_shutdown( + py: Python, + nailgun_server_ptr: &PyNailgunServer, +) -> PyO3Result<()> { + if let Some(server) = nailgun_server_ptr.server.borrow_mut().take() { + let executor = nailgun_server_ptr.executor.clone(); + py.allow_threads(|| executor.block_on(server.shutdown())) + .map_err(PyException::new_err) + } else { + Ok(()) + } } +#[pyfunction] fn strongly_connected_components( py: Python, adjacency_lists: Vec<(PyObject, Vec)>, -) -> CPyResult>> { +) -> PyO3Result>> { let mut graph: DiGraph = Graph::new(); let mut node_ids: HashMap = HashMap::new(); for (node, adjacency_list) in adjacency_lists { - let node_key = Key::from_value(node.clone_ref(py).into())?; + let node_key = Key::from_value(node.into())?; let node_id = *node_ids .entry(node_key) .or_insert_with(|| graph.add_node(node_key)); for dependency in adjacency_list { - let dependency_key = Key::from_value(dependency.clone_ref(py).into())?; + let dependency_key = Key::from_value(dependency.into())?; let dependency_id = node_ids .entry(dependency_key) .or_insert_with(|| graph.add_node(dependency_key)); @@ -883,11 +588,11 @@ fn strongly_connected_components( /// The given Tasks struct will be cloned, so no additional mutation of the reference will /// affect the created Scheduler. /// +#[pyfunction] fn scheduler_create( - py: Python, - executor_ptr: PyExecutor, - tasks_ptr: PyTasks, - types_ptr: PyTypes, + executor_ptr: &PyExecutor, + tasks_ptr: &PyTasks, + types_ptr: &PyTypes, build_root_buf: String, local_execution_root_dir_buf: String, named_caches_dir_buf: String, @@ -895,22 +600,22 @@ fn scheduler_create( ignore_patterns: Vec, use_gitignore: bool, watch_filesystem: bool, - remoting_options: PyRemotingOptions, - local_store_options: PyLocalStoreOptions, - exec_strategy_opts: PyExecutionStrategyOptions, -) -> CPyResult { + remoting_options: &PyRemotingOptions, + local_store_options: &PyLocalStoreOptions, + exec_strategy_opts: &PyExecutionStrategyOptions, +) -> PyO3Result { match fs::increase_limits() { Ok(msg) => debug!("{}", msg), Err(e) => warn!("{}", e), } - let core: Result = with_executor(py, &executor_ptr, |executor| { + let core: Result = with_executor(executor_ptr, |executor| { let types = types_ptr - .types(py) + .0 .borrow_mut() .take() .ok_or_else(|| "An instance of PyTypes may only be used once.".to_owned())?; let intrinsics = Intrinsics::new(&types); - let mut tasks = tasks_ptr.tasks(py).replace(Tasks::new()); + let mut tasks = tasks_ptr.0.replace(Tasks::new()); tasks.intrinsics_set(&intrinsics); // NOTE: Enter the Tokio runtime so that libraries like Tonic (for gRPC) are able to @@ -929,23 +634,21 @@ fn scheduler_create( PathBuf::from(local_execution_root_dir_buf), PathBuf::from(named_caches_dir_buf), ca_certs_path_buf.map(PathBuf::from), - local_store_options.options(py).clone(), - remoting_options.options(py).clone(), - exec_strategy_opts.options(py).clone(), + local_store_options.0.clone(), + remoting_options.0.clone(), + exec_strategy_opts.0.clone(), ) }) }); - PyScheduler::create_instance( - py, - Scheduler::new(core.map_err(|e| PyErr::new::(py, (e,)))?), - ) + let scheduler = Scheduler::new(core.map_err(PyValueError::new_err)?); + Ok(PyScheduler(scheduler)) } async fn workunit_to_py_value( workunit: &Workunit, core: &Arc, session: &Session, -) -> CPyResult { +) -> PyO3Result { let mut dict_entries = { let gil = Python::acquire_gil(); let py = gil.python(); @@ -1030,14 +733,10 @@ async fn workunit_to_py_value( ArtifactOutput::Snapshot(digest) => { let snapshot = store::Snapshot::from_digest(store, *digest) .await - .map_err(|err_str| { - let gil = Python::acquire_gil(); - PyErr::new::(gil.python(), (err_str,)) - })?; + .map_err(PyException::new_err)?; let gil = Python::acquire_gil(); let py = gil.python(); - crate::nodes::Snapshot::store_snapshot(py, snapshot) - .map_err(|err_str| PyErr::new::(py, (err_str,)))? + crate::nodes::Snapshot::store_snapshot(py, snapshot).map_err(PyException::new_err)? } }; @@ -1122,7 +821,7 @@ async fn workunits_to_py_tuple_value( workunits: Vec, core: &Arc, session: &Session, -) -> CPyResult { +) -> PyO3Result { let mut workunit_values = Vec::new(); for workunit in workunits { let py_value = workunit_to_py_value(&workunit, core, session).await?; @@ -1132,17 +831,18 @@ async fn workunits_to_py_tuple_value( Ok(externs::store_tuple(py, workunit_values)) } +#[pyfunction] fn session_poll_workunits( py: Python, - scheduler_ptr: PyScheduler, - session_ptr: PySession, + scheduler_ptr: &PyScheduler, + session_ptr: &PySession, max_log_verbosity_level: u64, -) -> CPyResult { +) -> PyO3Result { let py_level: PythonLogLevel = max_log_verbosity_level .try_into() - .map_err(|e| PyErr::new::(py, (format!("{}", e),)))?; - with_scheduler(py, scheduler_ptr, |scheduler| { - with_session(py, session_ptr, |session| { + .map_err(|e| PyException::new_err(format!("{}", e)))?; + with_scheduler(scheduler_ptr, |scheduler| { + with_session(session_ptr, |session| { let core = scheduler.core.clone(); let (started, completed) = py.allow_threads(|| session.workunit_store().latest_workunits(py_level.into())); @@ -1164,12 +864,13 @@ fn session_poll_workunits( }) } +#[pyfunction] fn session_run_interactive_process( py: Python, - session_ptr: PySession, + session_ptr: &PySession, interactive_process: PyObject, -) -> CPyResult { - with_session(py, session_ptr, |session| { +) -> PyO3Result { + with_session(session_ptr, |session| { let core = session.core().clone(); let context = Context::new(core.clone(), session.clone()); let interactive_process: Value = interactive_process.into(); @@ -1192,17 +893,18 @@ fn session_run_interactive_process( )) }) .map(|v| v.into()) - .map_err(|e| PyErr::new::(py, (e.to_string(),))) + .map_err(|e| PyException::new_err(e.to_string())) }) } +#[pyfunction] fn scheduler_metrics( py: Python, - scheduler_ptr: PyScheduler, - session_ptr: PySession, -) -> CPyResult { - with_scheduler(py, scheduler_ptr, |scheduler| { - with_session(py, session_ptr, |session| { + scheduler_ptr: &PyScheduler, + session_ptr: &PySession, +) -> PyO3Result { + with_scheduler(scheduler_ptr, |scheduler| { + with_session(session_ptr, |session| { let values = scheduler .metrics(session) .into_iter() @@ -1218,8 +920,9 @@ fn scheduler_metrics( }) } -fn scheduler_shutdown(py: Python, scheduler_ptr: PyScheduler, timeout_secs: u64) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { +#[pyfunction] +fn scheduler_shutdown(py: Python, scheduler_ptr: &PyScheduler, timeout_secs: u64) { + with_scheduler(scheduler_ptr, |scheduler| { py.allow_threads(|| { scheduler .core @@ -1227,80 +930,78 @@ fn scheduler_shutdown(py: Python, scheduler_ptr: PyScheduler, timeout_secs: u64) .block_on(scheduler.core.shutdown(Duration::from_secs(timeout_secs))); }) }); - Ok(None) } -fn scheduler_execute( - py: Python, - scheduler_ptr: PyScheduler, - session_ptr: PySession, - execution_request_ptr: PyExecutionRequest, -) -> CPyResult { - with_scheduler(py, scheduler_ptr, |scheduler| { - with_execution_request(py, execution_request_ptr, |execution_request| { - with_session(py, session_ptr, |session| { +#[pyfunction] +fn scheduler_execute<'py>( + py: Python<'py>, + scheduler_ptr: &PyScheduler, + session_ptr: &PySession, + execution_request_ptr: &PyExecutionRequest, +) -> PyO3Result<&'py PyTuple> { + with_scheduler(scheduler_ptr, |scheduler| { + with_execution_request(execution_request_ptr, |execution_request| { + with_session(session_ptr, |session| { // TODO: A parent_id should be an explicit argument. session.workunit_store().init_thread_state(None); - py.allow_threads(|| scheduler.execute(execution_request, session)) + let execute_result = py.allow_threads(|| scheduler.execute(execution_request, session)); + execute_result .map(|root_results| { let py_results = root_results .into_iter() - .map(|rr| py_result_from_root(py, rr).unwrap().into_object()) + .map(|err| Py::new(py, py_result_from_root(py, err)).unwrap()) .collect::>(); PyTuple::new(py, &py_results) }) .map_err(|e| match e { - ExecutionTermination::KeyboardInterrupt => { - PyErr::new::(py, NoArgs) - } - ExecutionTermination::PollTimeout => PyErr::new::(py, NoArgs), - ExecutionTermination::Fatal(msg) => PyErr::new::(py, (msg,)), + ExecutionTermination::KeyboardInterrupt => PyKeyboardInterrupt::new_err(()), + ExecutionTermination::PollTimeout => PollTimeout::new_err(()), + ExecutionTermination::Fatal(msg) => PyException::new_err(msg), }) }) }) }) } +#[pyfunction] fn execution_add_root_select( - py: Python, - scheduler_ptr: PyScheduler, - execution_request_ptr: PyExecutionRequest, + scheduler_ptr: &PyScheduler, + execution_request_ptr: &PyExecutionRequest, param_vals: Vec, - product: PyType, -) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { - with_execution_request(py, execution_request_ptr, |execution_request| { - let product = TypeId::new(&product); + product: &PyType, +) -> PyO3Result<()> { + with_scheduler(scheduler_ptr, |scheduler| { + with_execution_request(execution_request_ptr, |execution_request| { + let product = TypeId::new(product); let keys = param_vals .into_iter() .map(|p| Key::from_value(p.into())) .collect::, _>>()?; Params::new(keys) .and_then(|params| scheduler.add_root_select(execution_request, params, product)) - .map_err(|e| PyErr::new::(py, (e,))) - .map(|()| None) + .map_err(PyException::new_err) }) }) } +#[pyfunction] fn tasks_task_begin( - py: Python, - tasks_ptr: PyTasks, + tasks_ptr: &PyTasks, func: PyObject, - output_type: PyType, + output_type: &PyType, side_effecting: bool, engine_aware_return_type: bool, cacheable: bool, name: String, desc: String, level: u64, -) -> PyUnitResult { +) -> PyO3Result<()> { let py_level: PythonLogLevel = level .try_into() - .map_err(|e| PyErr::new::(py, (format!("{}", e),)))?; - with_tasks(py, tasks_ptr, |tasks| { + .map_err(|e| PyException::new_err(format!("{}", e)))?; + with_tasks(tasks_ptr, |tasks| { let func = Function(Key::from_value(func.into())?); - let output_type = TypeId::new(&output_type); + let output_type = TypeId::new(output_type); tasks.task_begin( func, output_type, @@ -1311,235 +1012,216 @@ fn tasks_task_begin( if desc.is_empty() { None } else { Some(desc) }, py_level.into(), ); - Ok(None) + Ok(()) }) } -fn tasks_task_end(py: Python, tasks_ptr: PyTasks) -> PyUnitResult { - with_tasks(py, tasks_ptr, |tasks| { +#[pyfunction] +fn tasks_task_end(tasks_ptr: &PyTasks) { + with_tasks(tasks_ptr, |tasks| { tasks.task_end(); - Ok(None) }) } -fn tasks_add_get(py: Python, tasks_ptr: PyTasks, output: PyType, input: PyType) -> PyUnitResult { - with_tasks(py, tasks_ptr, |tasks| { - let output = TypeId::new(&output); - let input = TypeId::new(&input); +#[pyfunction] +fn tasks_add_get(tasks_ptr: &PyTasks, output: &PyType, input: &PyType) { + with_tasks(tasks_ptr, |tasks| { + let output = TypeId::new(output); + let input = TypeId::new(input); tasks.add_get(output, input); - Ok(None) }) } -fn tasks_add_union( - py: Python, - tasks_ptr: PyTasks, - output_type: PyType, - input_types: Vec, -) -> PyUnitResult { - with_tasks(py, tasks_ptr, |tasks| { +#[pyfunction] +fn tasks_add_union(tasks_ptr: &PyTasks, output_type: &PyType, input_types: Vec<&PyType>) { + with_tasks(tasks_ptr, |tasks| { tasks.add_union( - TypeId::new(&output_type), + TypeId::new(output_type), input_types .into_iter() - .map(|type_id| TypeId::new(&type_id)) + .map(|type_id| TypeId::new(type_id)) .collect(), ); - Ok(None) }) } -fn tasks_add_select(py: Python, tasks_ptr: PyTasks, selector: PyType) -> PyUnitResult { - with_tasks(py, tasks_ptr, |tasks| { - let selector = TypeId::new(&selector); +#[pyfunction] +fn tasks_add_select(tasks_ptr: &PyTasks, selector: &PyType) { + with_tasks(tasks_ptr, |tasks| { + let selector = TypeId::new(selector); tasks.add_select(selector); - Ok(None) }) } -fn tasks_add_query( - py: Python, - tasks_ptr: PyTasks, - output_type: PyType, - input_types: Vec, -) -> PyUnitResult { - with_tasks(py, tasks_ptr, |tasks| { +#[pyfunction] +fn tasks_add_query(tasks_ptr: &PyTasks, output_type: &PyType, input_types: Vec<&PyType>) { + with_tasks(tasks_ptr, |tasks| { tasks.query_add( - TypeId::new(&output_type), + TypeId::new(output_type), input_types .into_iter() - .map(|type_id| TypeId::new(&type_id)) + .map(|type_id| TypeId::new(type_id)) .collect(), ); - Ok(None) }) } -fn graph_invalidate_paths( - py: Python, - scheduler_ptr: PyScheduler, - paths: Vec, -) -> CPyResult { - with_scheduler(py, scheduler_ptr, |scheduler| { +#[pyfunction] +fn graph_invalidate_paths(py: Python, scheduler_ptr: &PyScheduler, paths: Vec) -> u64 { + with_scheduler(scheduler_ptr, |scheduler| { let paths = paths.into_iter().map(PathBuf::from).collect(); - py.allow_threads(|| Ok(scheduler.invalidate_paths(&paths) as u64)) + py.allow_threads(|| scheduler.invalidate_paths(&paths) as u64) }) } -fn graph_invalidate_all_paths(py: Python, scheduler_ptr: PyScheduler) -> CPyResult { - with_scheduler(py, scheduler_ptr, |scheduler| { - py.allow_threads(|| Ok(scheduler.invalidate_all_paths() as u64)) +#[pyfunction] +fn graph_invalidate_all_paths(py: Python, scheduler_ptr: &PyScheduler) -> u64 { + with_scheduler(scheduler_ptr, |scheduler| { + py.allow_threads(|| scheduler.invalidate_all_paths() as u64) }) } -fn graph_invalidate_all(py: Python, scheduler_ptr: PyScheduler) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { +#[pyfunction] +fn graph_invalidate_all(py: Python, scheduler_ptr: &PyScheduler) { + with_scheduler(scheduler_ptr, |scheduler| { py.allow_threads(|| scheduler.invalidate_all()); - Ok(None) }) } -fn check_invalidation_watcher_liveness(py: Python, scheduler_ptr: PyScheduler) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { - scheduler - .is_valid() - .map(|()| None) - .map_err(|e| PyErr::new::(py, (e,))) +#[pyfunction] +fn check_invalidation_watcher_liveness(scheduler_ptr: &PyScheduler) -> PyO3Result<()> { + with_scheduler(scheduler_ptr, |scheduler| { + scheduler.is_valid().map_err(PyException::new_err) }) } -fn graph_len(py: Python, scheduler_ptr: PyScheduler) -> CPyResult { - with_scheduler(py, scheduler_ptr, |scheduler| { - py.allow_threads(|| Ok(scheduler.core.graph.len() as u64)) +#[pyfunction] +fn graph_len(py: Python, scheduler_ptr: &PyScheduler) -> u64 { + with_scheduler(scheduler_ptr, |scheduler| { + py.allow_threads(|| scheduler.core.graph.len() as u64) }) } +#[pyfunction] fn graph_visualize( py: Python, - scheduler_ptr: PyScheduler, - session_ptr: PySession, + scheduler_ptr: &PyScheduler, + session_ptr: &PySession, path: String, -) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { - with_session(py, session_ptr, |session| { +) -> PyO3Result<()> { + with_scheduler(scheduler_ptr, |scheduler| { + with_session(session_ptr, |session| { let path = PathBuf::from(path); // NB: See the note on with_scheduler re: allow_threads. py.allow_threads(|| scheduler.visualize(session, path.as_path())) .map_err(|e| { - let e = format!("Failed to visualize to {}: {:?}", path.display(), e); - PyErr::new::(py, (e,)) + PyException::new_err(format!( + "Failed to visualize to {}: {:?}", + path.display(), + e + )) }) - .map(|()| None) }) }) } -fn session_new_run_id(py: Python, session_ptr: PySession) -> PyUnitResult { - with_session(py, session_ptr, |session| { +#[pyfunction] +fn session_new_run_id(session_ptr: &PySession) { + with_session(session_ptr, |session| { session.new_run_id(); - Ok(None) }) } -fn session_get_observation_histograms( - py: Python, - scheduler_ptr: PyScheduler, - session_ptr: PySession, -) -> CPyResult { +#[pyfunction] +fn session_get_observation_histograms<'py>( + py: Python<'py>, + scheduler_ptr: &PyScheduler, + session_ptr: &PySession, +) -> PyO3Result<&'py PyDict> { // Encoding version to return to callers. This should be bumped when the encoded histograms // are encoded in a backwards-incompatible manner. const OBSERVATIONS_VERSION: u64 = 0; - with_scheduler(py, scheduler_ptr, |_scheduler| { - with_session(py, session_ptr, |session| { + with_scheduler(scheduler_ptr, |_scheduler| { + with_session(session_ptr, |session| { let observations = session .workunit_store() .encode_observations() - .map_err(|err| PyErr::new::(py, (err,)))?; + .map_err(PyException::new_err)?; let encoded_observations = PyDict::new(py); for (metric, encoded_histogram) in &observations { encoded_observations.set_item( - py, PyString::new(py, metric.as_str()), PyBytes::new(py, &encoded_histogram[..]), )?; } let result = PyDict::new(py); - result.set_item( - py, - PyString::new(py, "version"), - OBSERVATIONS_VERSION.into_py_object(py).into_object(), - )?; - result.set_item( - py, - PyString::new(py, "histograms"), - encoded_observations.into_object(), - )?; - + result.set_item(PyString::new(py, "version"), OBSERVATIONS_VERSION)?; + result.set_item(PyString::new(py, "histograms"), encoded_observations)?; Ok(result) }) }) } +#[pyfunction] fn session_record_test_observation( - py: Python, - scheduler_ptr: PyScheduler, - session_ptr: PySession, + scheduler_ptr: &PyScheduler, + session_ptr: &PySession, value: u64, -) -> CPyResult { - with_scheduler(py, scheduler_ptr, |_scheduler| { - with_session(py, session_ptr, |session| { +) { + with_scheduler(scheduler_ptr, |_scheduler| { + with_session(session_ptr, |session| { session .workunit_store() .record_observation(ObservationMetric::TestObservation, value); - Ok(py.None()) }) }) } +#[pyfunction] fn session_isolated_shallow_clone( - py: Python, - session_ptr: PySession, + session_ptr: &PySession, build_id: String, -) -> CPyResult { - with_session(py, session_ptr, |session| { +) -> PyO3Result { + with_session(session_ptr, |session| { let session_clone = session .isolated_shallow_clone(build_id) - .map_err(|e| PyErr::new::(py, (e,)))?; - PySession::create_instance(py, session_clone) + .map_err(PyException::new_err)?; + Ok(PySession(session_clone)) }) } -fn validate_reachability(py: Python, scheduler_ptr: PyScheduler) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { +#[pyfunction] +fn validate_reachability(scheduler_ptr: &PyScheduler) -> PyO3Result<()> { + with_scheduler(scheduler_ptr, |scheduler| { scheduler .core .rule_graph .validate_reachability() - .map_err(|e| PyErr::new::(py, (e,))) - .map(|()| None) + .map_err(PyException::new_err) }) } -fn rule_graph_consumed_types( - py: Python, - scheduler_ptr: PyScheduler, - param_types: Vec, - product_type: PyType, -) -> CPyResult> { - with_scheduler(py, scheduler_ptr, |scheduler| { +#[pyfunction] +fn rule_graph_consumed_types<'py>( + py: Python<'py>, + scheduler_ptr: &PyScheduler, + param_types: Vec<&PyType>, + product_type: &PyType, +) -> PyO3Result> { + with_scheduler(scheduler_ptr, |scheduler| { let param_types = param_types .into_iter() - .map(|type_id| TypeId::new(&type_id)) + .map(|type_id| TypeId::new(type_id)) .collect::>(); let subgraph = scheduler .core .rule_graph - .subgraph(param_types, TypeId::new(&product_type)) - .map_err(|e| PyErr::new::(py, (e,)))?; + .subgraph(param_types, TypeId::new(product_type)) + .map_err(PyValueError::new_err)?; Ok( subgraph @@ -1551,33 +1233,35 @@ fn rule_graph_consumed_types( }) } -fn rule_graph_visualize(py: Python, scheduler_ptr: PyScheduler, path: String) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { +#[pyfunction] +fn rule_graph_visualize(scheduler_ptr: &PyScheduler, path: String) -> PyO3Result<()> { + with_scheduler(scheduler_ptr, |scheduler| { let path = PathBuf::from(path); // TODO(#7117): we want to represent union types in the graph visualizer somehow!!! - write_to_file(path.as_path(), &scheduler.core.rule_graph) - .map_err(|e| { - let e = format!("Failed to visualize to {}: {:?}", path.display(), e); - PyErr::new::(py, (e,)) - }) - .map(|()| None) + write_to_file(path.as_path(), &scheduler.core.rule_graph).map_err(|e| { + PyIOError::new_err(format!( + "Failed to visualize to {}: {:?}", + path.display(), + e + )) + }) }) } +#[pyfunction] fn rule_subgraph_visualize( - py: Python, - scheduler_ptr: PyScheduler, - param_types: Vec, - product_type: PyType, + scheduler_ptr: &PyScheduler, + param_types: Vec<&PyType>, + product_type: &PyType, path: String, -) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { +) -> PyO3Result<()> { + with_scheduler(scheduler_ptr, |scheduler| { let param_types = param_types .into_iter() - .map(|py_type| TypeId::new(&py_type)) + .map(|py_type| TypeId::new(py_type)) .collect::>(); - let product_type = TypeId::new(&product_type); + let product_type = TypeId::new(product_type); let path = PathBuf::from(path); // TODO(#7117): we want to represent union types in the graph visualizer somehow!!! @@ -1585,14 +1269,15 @@ fn rule_subgraph_visualize( .core .rule_graph .subgraph(param_types, product_type) - .map_err(|e| PyErr::new::(py, (e,)))?; - - write_to_file(path.as_path(), &subgraph) - .map_err(|e| { - let e = format!("Failed to visualize to {}: {:?}", path.display(), e); - PyErr::new::(py, (e,)) - }) - .map(|()| None) + .map_err(PyValueError::new_err)?; + + write_to_file(path.as_path(), &subgraph).map_err(|e| { + PyIOError::new_err(format!( + "Failed to visualize to {}: {:?}", + path.display(), + e + )) + }) }) } @@ -1608,9 +1293,10 @@ pub(crate) fn generate_panic_string(payload: &(dyn Any + Send)) -> String { } /// Set up a panic handler, unless RUST_BACKTRACE is set. -fn maybe_set_panic_handler(_: Python) -> PyUnitResult { +#[pyfunction] +fn maybe_set_panic_handler() { if std::env::var("RUST_BACKTRACE").unwrap_or_else(|_| "0".to_owned()) != "0" { - return Ok(None); + return; } panic::set_hook(Box::new(|panic_info| { let payload = panic_info.payload(); @@ -1626,34 +1312,33 @@ fn maybe_set_panic_handler(_: Python) -> PyUnitResult { let panic_file_bug_str = "Please set RUST_BACKTRACE=1, re-run, and then file a bug at https://github.com/pantsbuild/pants/issues."; error!("{}", panic_file_bug_str); })); - Ok(None) } +#[pyfunction] fn garbage_collect_store( py: Python, - scheduler_ptr: PyScheduler, + scheduler_ptr: &PyScheduler, target_size_bytes: usize, -) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { +) -> PyO3Result<()> { + with_scheduler(scheduler_ptr, |scheduler| { py.allow_threads(|| { scheduler .core .store() .garbage_collect(target_size_bytes, store::ShrinkBehavior::Fast) }) - .map_err(|e| PyErr::new::(py, (e,))) - .map(|()| None) + .map_err(PyException::new_err) }) } +#[pyfunction] fn lease_files_in_graph( py: Python, - scheduler_ptr: PyScheduler, - session_ptr: PySession, -) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { - with_session(py, session_ptr, |session| { - // NB: See the note on with_scheduler re: allow_threads. + scheduler_ptr: &PyScheduler, + session_ptr: &PySession, +) -> PyO3Result<()> { + with_scheduler(scheduler_ptr, |scheduler| { + with_session(session_ptr, |session| { py.allow_threads(|| { let digests = scheduler.all_digests(session); scheduler @@ -1661,44 +1346,44 @@ fn lease_files_in_graph( .executor .block_on(scheduler.core.store().lease_all_recursively(digests.iter())) }) - .map_err(|e| PyErr::new::(py, (e,))) - .map(|()| None) + .map_err(PyException::new_err) }) }) } +#[pyfunction] fn capture_snapshots( py: Python, - scheduler_ptr: PyScheduler, - session_ptr: PySession, - path_globs_and_root_tuple_wrapper: PyObject, -) -> CPyResult { - with_scheduler(py, scheduler_ptr, |scheduler| { - with_session(py, session_ptr, |session| { + scheduler_ptr: &PyScheduler, + session_ptr: &PySession, + path_globs_and_root_tuple_wrapper: &PyAny, +) -> PyO3Result { + with_scheduler(scheduler_ptr, |scheduler| { + with_session(session_ptr, |session| { // TODO: A parent_id should be an explicit argument. session.workunit_store().init_thread_state(None); let core = scheduler.core.clone(); - let values = externs::collect_iterable(&path_globs_and_root_tuple_wrapper).unwrap(); + let values = externs::collect_iterable(path_globs_and_root_tuple_wrapper).unwrap(); let path_globs_and_roots = values - .iter() + .into_iter() .map(|value| { let root = PathBuf::from(externs::getattr::(value, "root").unwrap()); let path_globs = nodes::Snapshot::lift_prepared_path_globs( - &externs::getattr(value, "path_globs").unwrap(), + externs::getattr(value, "path_globs").unwrap(), ); let digest_hint = { - let maybe_digest: PyObject = externs::getattr(value, "digest_hint").unwrap(); - if maybe_digest.is_none(py) { + let maybe_digest: &PyAny = externs::getattr(value, "digest_hint").unwrap(); + if maybe_digest.is_none() { None } else { - Some(nodes::lift_directory_digest(&Value::new(maybe_digest))?) + Some(nodes::lift_directory_digest(maybe_digest)?) } }; path_globs.map(|path_globs| (path_globs, root, digest_hint)) }) .collect::, _>>() - .map_err(|e| PyErr::new::(py, (e,)))?; + .map_err(PyValueError::new_err)?; let snapshot_futures = path_globs_and_roots .into_iter() @@ -1719,62 +1404,62 @@ fn capture_snapshots( }) .collect::>(); py.allow_threads(|| { - let gil = Python::acquire_gil(); - core.executor.block_on( - future::try_join_all(snapshot_futures) - .map_ok(|values| externs::store_tuple(gil.python(), values).into()), - ) + core + .executor + .block_on(future::try_join_all(snapshot_futures)) }) - .map_err(|e| PyErr::new::(py, (e,))) + .map(|values| Python::with_gil(|py| externs::store_tuple(py, values)).into()) + .map_err(PyException::new_err) }) }) } +#[pyfunction] fn ensure_remote_has_recursive( py: Python, - scheduler_ptr: PyScheduler, - py_digests: PyList, -) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { + scheduler_ptr: &PyScheduler, + py_digests: &PyList, +) -> PyO3Result<()> { + with_scheduler(scheduler_ptr, |scheduler| { let core = scheduler.core.clone(); let store = core.store(); // NB: Supports either a FileDigest or Digest as input. let digests: Vec = py_digests - .iter(py) + .iter() .map(|value| { - crate::nodes::lift_directory_digest(&value) - .or_else(|_| crate::nodes::lift_file_digest(&core.types, &value)) + crate::nodes::lift_directory_digest(value) + .or_else(|_| crate::nodes::lift_file_digest(&core.types, value)) }) .collect::, _>>() - .map_err(|e| PyErr::new::(py, (e,)))?; + .map_err(PyException::new_err)?; - let _upload_summary = py - .allow_threads(|| { - core - .executor - .block_on(store.ensure_remote_has_recursive(digests)) - }) - .map_err(|e| PyErr::new::(py, (e,)))?; - Ok(None) + py.allow_threads(|| { + core + .executor + .block_on(store.ensure_remote_has_recursive(digests)) + }) + .map_err(PyException::new_err)?; + Ok(()) }) } /// This functions assumes that the Digest in question represents the contents of a single File rather than a Directory, /// and will fail on Digests representing a Directory. -fn single_file_digests_to_bytes( - py: Python, - scheduler_ptr: PyScheduler, - py_file_digests: PyList, -) -> CPyResult { - with_scheduler(py, scheduler_ptr, |scheduler| { +#[pyfunction] +fn single_file_digests_to_bytes<'py>( + py: Python<'py>, + scheduler_ptr: &PyScheduler, + py_file_digests: &PyList, +) -> PyO3Result<&'py PyList> { + with_scheduler(scheduler_ptr, |scheduler| { let core = scheduler.core.clone(); let digests: Vec = py_file_digests - .iter(py) - .map(|item| crate::nodes::lift_file_digest(&core.types, &item)) + .iter() + .map(|item| crate::nodes::lift_file_digest(&core.types, item)) .collect::, _>>() - .map_err(|e| PyErr::new::(py, (e,)))?; + .map_err(PyException::new_err)?; let digest_futures = digests.into_iter().map(|digest| { let store = core.store(); @@ -1793,33 +1478,29 @@ fn single_file_digests_to_bytes( }); let bytes_values: Vec = py - .allow_threads(|| { - core.executor.block_on( - future::try_join_all(digest_futures) - .map_ok(|values: Vec| values.into_iter().map(|val| val.into()).collect()), - ) - }) - .map_err(|e| PyErr::new::(py, (e,)))?; + .allow_threads(|| core.executor.block_on(future::try_join_all(digest_futures))) + .map(|values| values.into_iter().map(|val| val.into()).collect()) + .map_err(PyException::new_err)?; let output_list = PyList::new(py, &bytes_values); Ok(output_list) }) } +#[pyfunction] fn write_digest( py: Python, - scheduler_ptr: PyScheduler, - session_ptr: PySession, - digest: PyObject, + scheduler_ptr: &PyScheduler, + session_ptr: &PySession, + digest: &PyAny, path_prefix: String, -) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |scheduler| { - with_session(py, session_ptr, |session| { +) -> PyO3Result<()> { + with_scheduler(scheduler_ptr, |scheduler| { + with_session(session_ptr, |session| { // TODO: A parent_id should be an explicit argument. session.workunit_store().init_thread_state(None); - let lifted_digest = nodes::lift_directory_digest(&digest) - .map_err(|e| PyErr::new::(py, (e,)))?; + let lifted_digest = nodes::lift_directory_digest(digest).map_err(PyValueError::new_err)?; // Python will have already validated that path_prefix is a relative path. let mut destination = PathBuf::new(); @@ -1832,37 +1513,27 @@ fn write_digest( .store() .materialize_directory(destination.clone(), lifted_digest) }) - .map_err(|e| PyErr::new::(py, (e,)))?; - Ok(None) + .map_err(PyValueError::new_err) }) }) } +#[pyfunction] fn stdio_initialize( py: Python, level: u64, show_rust_3rdparty_logs: bool, show_target: bool, - log_levels_by_target: PyDict, + log_levels_by_target: HashMap, literal_filters: Vec, regex_filters: Vec, log_file: String, -) -> CPyResult { - let log_levels_by_target = log_levels_by_target - .items(py) - .iter() - .map(|(k, v)| { - let k: String = k.extract(py).unwrap(); - let v: u64 = v.extract(py).unwrap(); - (k, v) - }) - .collect::>(); +) -> PyO3Result<&PyTuple> { let regex_filters = regex_filters .iter() .map(|re| { Regex::new(re).map_err(|e| { - PyErr::new::( - py, + PyException::new_err( format!( "Failed to parse warning filter. Please check the global option `--ignore-warnings`.\n\n{}", e, @@ -1881,91 +1552,80 @@ fn stdio_initialize( regex_filters, PathBuf::from(log_file), ) - .map_err(|s| { - PyErr::new::(py, (format!("Could not initialize logging: {}", s),)) - })?; + .map_err(|s| PyException::new_err(format!("Could not initialize logging: {}", s)))?; Ok(PyTuple::new( py, &[ - externs::stdio::py_stdio_read()?.into_object(), - externs::stdio::py_stdio_write(true)?.into_object(), - externs::stdio::py_stdio_write(false)?.into_object(), + Py::new(py, externs::stdio::PyStdioRead)?.into_py(py), + Py::new(py, externs::stdio::PyStdioWrite { is_stdout: true })?.into_py(py), + Py::new(py, externs::stdio::PyStdioWrite { is_stdout: false })?.into_py(py), ], )) } -fn stdio_thread_console_set( - _: Python, - stdin_fileno: i32, - stdout_fileno: i32, - stderr_fileno: i32, -) -> PyUnitResult { +#[pyfunction] +fn stdio_thread_console_set(stdin_fileno: i32, stdout_fileno: i32, stderr_fileno: i32) { let destination = stdio::new_console_destination(stdin_fileno, stdout_fileno, stderr_fileno); stdio::set_thread_destination(destination); - Ok(None) } -fn stdio_thread_console_color_mode_set(_: Python, use_color: bool) -> PyUnitResult { +#[pyfunction] +fn stdio_thread_console_color_mode_set(use_color: bool) { stdio::get_destination().stderr_set_use_color(use_color); - Ok(None) } -fn stdio_thread_console_clear(_: Python) -> PyUnitResult { +#[pyfunction] +fn stdio_thread_console_clear() { stdio::get_destination().console_clear(); - Ok(None) } -fn stdio_thread_get_destination(py: Python) -> CPyResult { +#[pyfunction] +fn stdio_thread_get_destination() -> PyStdioDestination { let dest = stdio::get_destination(); - PyStdioDestination::create_instance(py, dest) + PyStdioDestination(dest) } -fn stdio_thread_set_destination(py: Python, stdio_destination: PyStdioDestination) -> PyUnitResult { - stdio::set_thread_destination(stdio_destination.destination(py).clone()); - Ok(None) +#[pyfunction] +fn stdio_thread_set_destination(stdio_destination: &PyStdioDestination) { + stdio::set_thread_destination(stdio_destination.0.clone()); } // TODO: Needs to be thread-local / associated with the Console. -fn set_per_run_log_path(py: Python, log_path: Option) -> PyUnitResult { +#[pyfunction] +fn set_per_run_log_path(py: Python, log_path: Option) { py.allow_threads(|| { PANTS_LOGGER.set_per_run_logs(log_path.map(PathBuf::from)); - Ok(None) }) } -fn write_log(py: Python, msg: String, level: u64, target: String) -> PyUnitResult { +#[pyfunction] +fn write_log(py: Python, msg: String, level: u64, target: String) { py.allow_threads(|| { Logger::log_from_python(&msg, level, &target).expect("Error logging message"); - Ok(None) }) } -fn task_side_effected(py: Python) -> PyUnitResult { - nodes::task_side_effected() - .map(|()| None) - .map_err(|e| PyErr::new::(py, (e,))) +#[pyfunction] +fn task_side_effected() -> PyO3Result<()> { + nodes::task_side_effected().map_err(PyException::new_err) } -fn teardown_dynamic_ui( - py: Python, - scheduler_ptr: PyScheduler, - session_ptr: PySession, -) -> PyUnitResult { - with_scheduler(py, scheduler_ptr, |_scheduler| { - with_session(py, session_ptr, |session| { +#[pyfunction] +fn teardown_dynamic_ui(py: Python, scheduler_ptr: &PyScheduler, session_ptr: &PySession) { + with_scheduler(scheduler_ptr, |_scheduler| { + with_session(session_ptr, |session| { let _ = block_in_place_and_wait(py, || { session.maybe_display_teardown().unit_error().boxed_local() }); - Ok(None) }) }) } -fn flush_log(py: Python) -> PyUnitResult { +#[pyfunction] +fn flush_log(py: Python) { py.allow_threads(|| { PANTS_LOGGER.flush(); - Ok(None) }) } @@ -1986,6 +1646,8 @@ fn write_to_file(path: &Path, graph: &RuleGraph) -> io::Result<()> { fn block_in_place_and_wait(py: Python, f: impl FnOnce() -> F + Sync + Send) -> Result where F: Future>, + T: Send, + E: Send, { py.allow_threads(|| { let future = f(); @@ -2003,66 +1665,43 @@ where /// them. In particular: methods that use the `Graph` should be called outside the GIL. We should /// make this less error prone: see https://github.com/pantsbuild/pants/issues/11722. /// -fn with_scheduler(py: Python, scheduler_ptr: PyScheduler, f: F) -> T +fn with_scheduler(py_scheduler: &PyScheduler, f: F) -> T where F: FnOnce(&Scheduler) -> T, { - let scheduler = scheduler_ptr.scheduler(py); - scheduler.core.executor.enter(|| f(scheduler)) + py_scheduler.0.core.executor.enter(|| f(&py_scheduler.0)) } -/// /// See `with_scheduler`. -/// -fn with_executor(py: Python, executor_ptr: &PyExecutor, f: F) -> T +fn with_executor(py_executor: &PyExecutor, f: F) -> T where F: FnOnce(&Executor) -> T, { - let executor = executor_ptr.executor(py); - f(executor) + f(&py_executor.0) } -/// /// See `with_scheduler`. -/// -fn with_session(py: Python, session_ptr: PySession, f: F) -> T +fn with_session(py_session: &PySession, f: F) -> T where F: FnOnce(&Session) -> T, { - let session = session_ptr.session(py); - f(session) -} - -/// -/// See `with_scheduler`. -/// -fn with_nailgun_server(py: Python, nailgun_server_ptr: PyNailgunServer, f: F) -> T -where - F: FnOnce(&RefCell>, &Executor) -> T, -{ - let nailgun_server = nailgun_server_ptr.server(py); - let executor = nailgun_server_ptr.executor(py); - f(nailgun_server, executor) + f(&py_session.0) } -/// /// See `with_scheduler`. -/// -fn with_execution_request(py: Python, execution_request_ptr: PyExecutionRequest, f: F) -> T +fn with_execution_request(execution_request_ptr: &PyExecutionRequest, f: F) -> T where F: FnOnce(&mut ExecutionRequest) -> T, { - let mut execution_request = execution_request_ptr.execution_request(py).borrow_mut(); + let mut execution_request = execution_request_ptr.0.borrow_mut(); f(&mut execution_request) } -/// /// See `with_scheduler`. -/// -fn with_tasks(py: Python, tasks_ptr: PyTasks, f: F) -> T +fn with_tasks(tasks_ptr: &PyTasks, f: F) -> T where F: FnOnce(&mut Tasks) -> T, { - let mut tasks = tasks_ptr.tasks(py).borrow_mut(); + let mut tasks = tasks_ptr.0.borrow_mut(); f(&mut tasks) } diff --git a/src/rust/engine/src/externs/mod.rs b/src/rust/engine/src/externs/mod.rs index 15213fdf432..50a640b0fe9 100644 --- a/src/rust/engine/src/externs/mod.rs +++ b/src/rust/engine/src/externs/mod.rs @@ -1,12 +1,8 @@ // Copyright 2020 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -// File-specific allowances to silence internal warnings of `py_class!`. -#![allow( - clippy::used_underscore_binding, - clippy::transmute_ptr_to_ptr, - clippy::zero_ptr -)] +// File-specific allowances to silence internal warnings of `[pyclass]`. +#![allow(clippy::used_underscore_binding)] pub mod engine_aware; pub mod fs; @@ -16,32 +12,32 @@ mod interface_tests; mod stdio; use std::collections::BTreeMap; -use std::convert::AsRef; use std::convert::TryInto; use std::fmt; use crate::interning::Interns; use crate::python::{Failure, Key, TypeId, Value}; -use cpython::{ - py_class, CompareOp, FromPyObject, ObjectProtocol, PyBool, PyBytes, PyClone, PyDict, PyErr, - PyObject, PyResult as CPyResult, PyTuple, PyType, Python, PythonObject, ToPyObject, -}; use lazy_static::lazy_static; +use pyo3::basic::CompareOp; +use pyo3::exceptions::PyException; +use pyo3::prelude::*; +use pyo3::types::{PyBool, PyBytes, PyDict, PyTuple, PyType}; +use pyo3::{FromPyObject, ToPyObject}; use logging::PythonLogLevel; -pub fn equals(py: Python, h1: &PyObject, h2: &PyObject) -> bool { +pub fn equals(h1: &PyAny, h2: &PyAny) -> bool { // NB: Although it does not precisely align with Python's definition of equality, we ban matches // between non-equal types to avoid legacy behavior like `assert True == 1`, which is very // surprising in interning, and would likely be surprising anywhere else in the engine where we // compare things. - if h1.get_type(py) != h2.get_type(py) { + if h1.get_type() != h2.get_type() { return false; } - h1.rich_compare(py, h2, CompareOp::Eq) + h1.rich_compare(h2, CompareOp::Eq) .unwrap() - .cast_as::(py) + .cast_as::() .unwrap() .is_true() } @@ -51,57 +47,51 @@ pub fn store_tuple(py: Python, values: Vec) -> Value { .into_iter() .map(|v| v.consume_into_py_object(py)) .collect(); - Value::from(PyTuple::new(py, &arg_handles).into_object()) + Value::from(PyTuple::new(py, &arg_handles).to_object(py)) } /// Store a slice containing 2-tuples of (key, value) as a Python dictionary. -pub fn store_dict(py: Python, keys_and_values: Vec<(Value, Value)>) -> Result { +pub fn store_dict(py: Python, keys_and_values: Vec<(Value, Value)>) -> PyResult { let dict = PyDict::new(py); for (k, v) in keys_and_values { - dict.set_item( - py, - k.consume_into_py_object(py), - v.consume_into_py_object(py), - )?; + dict.set_item(k.consume_into_py_object(py), v.consume_into_py_object(py))?; } - Ok(Value::from(dict.into_object())) + Ok(Value::from(dict.to_object(py))) } /// Store an opaque buffer of bytes to pass to Python. This will end up as a Python `bytes`. pub fn store_bytes(py: Python, bytes: &[u8]) -> Value { - Value::from(PyBytes::new(py, bytes).into_object()) + Value::from(PyBytes::new(py, bytes).to_object(py)) } -/// Store an buffer of utf8 bytes to pass to Python. This will end up as a Python `str`. +/// Store a buffer of utf8 bytes to pass to Python. This will end up as a Python `str`. pub fn store_utf8(py: Python, utf8: &str) -> Value { - Value::from(utf8.to_py_object(py).into_object()) + Value::from(utf8.to_object(py)) } pub fn store_u64(py: Python, val: u64) -> Value { - Value::from(val.to_py_object(py).into_object()) + Value::from(val.to_object(py)) } pub fn store_i64(py: Python, val: i64) -> Value { - Value::from(val.to_py_object(py).into_object()) + Value::from(val.to_object(py)) } pub fn store_bool(py: Python, val: bool) -> Value { - Value::from(val.to_py_object(py).into_object()) + Value::from(val.to_object(py)) } /// /// Gets an attribute of the given value as the given type. /// -pub fn getattr(value: &PyObject, field: &str) -> Result +pub fn getattr<'py, T>(value: &'py PyAny, field: &str) -> Result where - for<'a> T: FromPyObject<'a>, + T: FromPyObject<'py>, { - let gil = Python::acquire_gil(); - let py = gil.python(); value - .getattr(py, field) + .getattr(field) .map_err(|e| format!("Could not get field `{}`: {:?}", field, e))? - .extract::(py) + .extract::() .map_err(|e| { format!( "Field `{}` was not convertible to type {}: {:?}", @@ -115,10 +105,8 @@ where /// /// Collect the Values contained within an outer Python Iterable PyObject. /// -pub fn collect_iterable(value: &PyObject) -> Result, String> { - let gil = Python::acquire_gil(); - let py = gil.python(); - match value.iter(py) { +pub fn collect_iterable(value: &PyAny) -> Result, String> { + match value.iter() { Ok(py_iter) => py_iter .enumerate() .map(|(i, py_res)| { @@ -141,44 +129,37 @@ pub fn collect_iterable(value: &PyObject) -> Result, String> { } /// Read a `FrozenDict[str, str]`. -pub fn getattr_from_str_frozendict(value: &PyObject, field: &str) -> BTreeMap { +pub fn getattr_from_str_frozendict(value: &PyAny, field: &str) -> BTreeMap { let frozendict = getattr(value, field).unwrap(); - let pydict: PyDict = getattr(&frozendict, "_data").unwrap(); - let gil = Python::acquire_gil(); - let py = gil.python(); + let pydict: &PyDict = getattr(frozendict, "_data").unwrap(); pydict - .items(py) + .items() .into_iter() - .map(|(k, v)| (k.extract(py).unwrap(), v.extract(py).unwrap())) + .map(|kv_pair| kv_pair.extract().unwrap()) .collect() } -pub fn getattr_as_optional_string(py: Python, value: &PyObject, field: &str) -> Option { - let v = value.getattr(py, field).unwrap(); - if v.is_none(py) { +pub fn getattr_as_optional_string(value: &PyAny, field: &str) -> Option { + let v = value.getattr(field).unwrap(); + if v.is_none() { return None; } // TODO: It's possible to view a python string as a `Cow`, so we could avoid actually // cloning in some cases. - Some(v.extract(py).unwrap()) + Some(v.extract().unwrap()) } /// Call the equivalent of `str()` on an arbitrary Python object. /// /// Converts `None` to the empty string. -pub fn val_to_str(obj: &PyObject) -> String { - let gil = Python::acquire_gil(); - let py = gil.python(); - - if *obj == py.None() { +pub fn val_to_str(obj: &PyAny) -> String { + if obj.is_none() { return "".to_string(); } - - let pystring = obj.str(py).unwrap(); - pystring.to_string(py).unwrap().into_owned() + obj.str().unwrap().extract().unwrap() } -pub fn val_to_log_level(obj: &PyObject) -> Result { +pub fn val_to_log_level(obj: &PyAny) -> Result { let res: Result = getattr(obj, "_level").and_then(|n: u64| { n.try_into() .map_err(|e: num_enum::TryFromPrimitiveError<_>| { @@ -190,57 +171,48 @@ pub fn val_to_log_level(obj: &PyObject) -> Result { /// Link to the Pants docs using the current version of Pants. pub fn doc_url(py: Python, slug: &str) -> String { - let docutil = py.import("pants.util.docutil").unwrap(); - docutil - .call(py, "doc_url", (slug,), None) - .unwrap() - .extract(py) - .unwrap() + let docutil_module = py.import("pants.util.docutil").unwrap(); + let doc_url_func = docutil_module.getattr("doc_url").unwrap(); + doc_url_func.call1((slug,)).unwrap().extract().unwrap() } pub fn create_exception(py: Python, msg: String) -> Value { - Value::from(PyErr::new::(py, msg).instance(py)) -} - -pub fn call_method0(py: Python, value: &PyObject, method: &str) -> Result { - value.call_method(py, method, PyTuple::new(py, &[]), None) + Value::new(PyException::new_err(msg).into_py(py)) } -pub fn call_function>(func: T, args: &[Value]) -> Result { - let func: &PyObject = func.as_ref(); - let arg_handles: Vec = args.iter().map(|v| v.clone().into()).collect(); - let gil = Python::acquire_gil(); - let args_tuple = PyTuple::new(gil.python(), &arg_handles); - func.call(gil.python(), args_tuple, None) +pub fn call_function<'py>(func: &'py PyAny, args: &[Value]) -> PyResult<&'py PyAny> { + let args: Vec = args.iter().map(|v| v.clone().into()).collect(); + let args_tuple = PyTuple::new(func.py(), &args); + func.call1(args_tuple) } -pub fn generator_send(generator: &Value, arg: &Value) -> Result { - let gil = Python::acquire_gil(); - let py = gil.python(); +pub fn generator_send( + py: Python, + generator: &Value, + arg: &Value, +) -> Result { let selectors = py.import("pants.engine.internals.selectors").unwrap(); - let response = selectors - .call( - py, - "native_engine_generator_send", - (generator as &PyObject, arg as &PyObject), - None, - ) + let native_engine_generator_send = selectors.getattr("native_engine_generator_send").unwrap(); + let response = native_engine_generator_send + .call1((generator.to_object(py), arg.to_object(py))) .map_err(|py_err| Failure::from_py_err_with_gil(py, py_err))?; - if let Ok(b) = response.cast_as::(py) { - Ok(GeneratorResponse::Break(Value::new( - b.val(py).clone_ref(py), - ))) - } else if let Ok(get) = response.cast_as::(py) { + if let Ok(b) = response.extract::>() { + Ok(GeneratorResponse::Break( + Value::new(b.0.clone_ref(py)), + TypeId::new(b.0.as_ref(py).get_type()), + )) + } else if let Ok(get) = response.extract::>() { Ok(GeneratorResponse::Get(Get::new(py, get)?)) - } else if let Ok(get_multi) = response.cast_as::(py) { + } else if let Ok(get_multi) = response.extract::>() { let gets = get_multi - .gets(py) - .iter(py) + .0 + .as_ref(py) + .iter() .map(|g| { let get = g - .cast_as::(py) - .map_err(|e| Failure::from_py_err_with_gil(py, e.into()))?; + .extract::>() + .map_err(|e| Failure::from_py_err_with_gil(py, e))?; Get::new(py, get) }) .collect::, _>>()?; @@ -257,15 +229,12 @@ pub fn generator_send(generator: &Value, arg: &Value) -> Result Value { let py_type = type_id.as_py_type(py); - let arg_handles: Vec = args.iter().map(|v| v.clone().into()).collect(); - let args_tuple = PyTuple::new(py, &arg_handles); - py_type - .call(py, args_tuple, None) - .map(Value::from) + call_function(py_type, args) + .map(|obj| Value::new(obj.into_py(py))) .unwrap_or_else(|e| { panic!( "Core type constructor `{}` failed: {:?}", - py_type.name(py), + py_type.name().unwrap(), e ); }) @@ -275,44 +244,62 @@ lazy_static! { pub static ref INTERNS: Interns = Interns::new(); } -py_class!(pub class PyGeneratorResponseBreak |py| { - data val: PyObject; - def __new__(_cls, val: PyObject) -> CPyResult { - Self::create_instance(py, val) - } -}); - -py_class!(pub class PyGeneratorResponseGet |py| { - data product: PyType; - data declared_subject: PyType; - data subject: PyObject; - def __new__(_cls, product: PyType, declared_subject: PyType, subject: PyObject) -> CPyResult { - Self::create_instance(py, product, declared_subject, subject) - } -}); +#[pyclass] +pub struct PyGeneratorResponseBreak(PyObject); + +#[pymethods] +impl PyGeneratorResponseBreak { + #[new] + fn __new__(val: PyObject) -> Self { + Self(val) + } +} -py_class!(pub class PyGeneratorResponseGetMulti |py| { - data gets: PyTuple; - def __new__(_cls, gets: PyTuple) -> CPyResult { - Self::create_instance(py, gets) +#[pyclass] +pub struct PyGeneratorResponseGet { + product: Py, + declared_subject: Py, + subject: PyObject, +} + +#[pymethods] +impl PyGeneratorResponseGet { + #[new] + fn __new__(product: Py, declared_subject: Py, subject: PyObject) -> Self { + Self { + product, + declared_subject, + subject, } -}); + } +} + +#[pyclass] +pub struct PyGeneratorResponseGetMulti(Py); + +#[pymethods] +impl PyGeneratorResponseGetMulti { + #[new] + fn __new__(gets: Py) -> Self { + Self(gets) + } +} #[derive(Debug)] pub struct Get { pub output: TypeId, - pub input: Key, pub input_type: TypeId, + pub input: Key, } impl Get { - fn new(py: Python, get: &PyGeneratorResponseGet) -> Result { + fn new(py: Python, get: PyRef) -> Result { Ok(Get { - output: get.product(py).into(), + output: TypeId::new(get.product.as_ref(py)), + input_type: TypeId::new(get.declared_subject.as_ref(py)), input: INTERNS - .key_insert(py, get.subject(py).clone_ref(py).into()) + .key_insert(py, get.subject.clone_ref(py).into()) .map_err(|e| Failure::from_py_err_with_gil(py, e))?, - input_type: get.declared_subject(py).into(), }) } } @@ -324,7 +311,7 @@ impl fmt::Display for Get { } pub enum GeneratorResponse { - Break(Value), + Break(Value, TypeId), Get(Get), GetMulti(Vec), } diff --git a/src/rust/engine/src/externs/stdio.rs b/src/rust/engine/src/externs/stdio.rs index b48c994e1b5..f7426748725 100644 --- a/src/rust/engine/src/externs/stdio.rs +++ b/src/rust/engine/src/externs/stdio.rs @@ -1,118 +1,94 @@ // Copyright 2020 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -// File-specific allowances to silence internal warnings of `py_class!`. -#![allow( - unused_braces, - clippy::manual_strip, - clippy::used_underscore_binding, - clippy::transmute_ptr_to_ptr, - clippy::zero_ptr -)] -// TODO: False positive for |py| in py_class!. -#![allow(unused_variables)] +use pyo3::buffer::PyBuffer; +use pyo3::exceptions::PyException; +use pyo3::prelude::*; -use cpython::buffer::PyBuffer; -use cpython::{exc, py_class, PyErr, PyObject, PyResult, Python}; - -/// -/// Data members and `create_instance` methods are module-private by default, so we expose them -/// with public top-level functions. -/// -/// TODO: See https://github.com/dgrunwald/rust-cpython/issues/242 -/// - -/// /// A Python file-like that proxies to the `stdio` module, which implements thread-local input. -/// -pub fn py_stdio_read() -> PyResult { - let gil = Python::acquire_gil(); - PyStdioRead::create_instance(gil.python()) -} - -py_class!(pub class PyStdioRead |py| { - def isatty(&self) -> PyResult { - if let Ok(fd) = self.fileno(py) { - Ok(unsafe { libc::isatty(fd) != 0 }) - } else { - Ok(false) - } - } - - def fileno(&self) -> PyResult { - stdio::get_destination().stdin_as_raw_fd().map_err(|e| PyErr::new::(py, (e,))) - } - - def readinto(&self, obj: PyObject) -> PyResult { - let py_buffer = PyBuffer::get(py, &obj)?; - let mut buffer = vec![0; py_buffer.len_bytes() as usize]; - let read = py.allow_threads(|| { - stdio::get_destination().read_stdin(&mut buffer) - }).map_err(|e| PyErr::new::(py, (e.to_string(),)))?; - // NB: `as_mut_slice` exposes a `&[Cell]`, which we can't use directly in `read`. We use - // `copy_from_slice` instead, which unfortunately involves some extra copying. - py_buffer.copy_from_slice(py, &buffer)?; - Ok(read) - } - - @property - def closed(&self) -> PyResult { - Ok(false) - } - - def readable(&self) -> PyResult { - Ok(true) +#[pyclass] +pub struct PyStdioRead; + +#[pymethods] +impl PyStdioRead { + fn isatty(&self) -> bool { + if let Ok(fd) = self.fileno() { + unsafe { libc::isatty(fd) != 0 } + } else { + false } + } + + fn fileno(&self) -> PyResult { + stdio::get_destination() + .stdin_as_raw_fd() + .map_err(PyException::new_err) + } + + fn readinto(&self, obj: &PyAny, py: Python) -> PyResult { + let py_buffer = PyBuffer::get(obj)?; + let mut buffer = vec![0; py_buffer.len_bytes() as usize]; + let read = py + .allow_threads(|| stdio::get_destination().read_stdin(&mut buffer)) + .map_err(|e| PyException::new_err(e.to_string()))?; + // NB: `as_mut_slice` exposes a `&[Cell]`, which we can't use directly in `read`. We use + // `copy_from_slice` instead, which unfortunately involves some extra copying. + py_buffer.copy_from_slice(py, &buffer)?; + Ok(read) + } + + #[getter] + fn closed(&self) -> bool { + false + } + + fn readable(&self) -> bool { + true + } + + fn seekable(&self) -> bool { + false + } +} - def seekable(&self) -> PyResult { - Ok(false) - } -}); - -/// /// A Python file-like that proxies to the `stdio` module, which implements thread-local output. -/// -pub fn py_stdio_write(is_stdout: bool) -> PyResult { - let gil = Python::acquire_gil(); - PyStdioWrite::create_instance(gil.python(), is_stdout) +#[pyclass] +pub struct PyStdioWrite { + pub(crate) is_stdout: bool, } -py_class!(pub class PyStdioWrite |py| { - data is_stdout: bool; - - def write(&self, payload: &str) -> PyResult { - let is_stdout = *self.is_stdout(py); - py.allow_threads(|| { - let destination = stdio::get_destination(); - if is_stdout { - destination.write_stdout(payload.as_bytes()); - } else { - destination.write_stderr(payload.as_bytes()); - } - }); - Ok(Python::None(py)) - } - - def isatty(&self) -> PyResult { - if let Ok(fd) = self.fileno(py) { - Ok(unsafe { libc::isatty(fd) != 0 }) - } else { - Ok(false) - } - } - - def fileno(&self) -> PyResult { +#[pymethods] +impl PyStdioWrite { + fn write(&self, payload: &str, py: Python) { + py.allow_threads(|| { let destination = stdio::get_destination(); - let fd = if *self.is_stdout(py) { - destination.stdout_as_raw_fd() + if self.is_stdout { + destination.write_stdout(payload.as_bytes()); } else { - destination.stderr_as_raw_fd() - }; - fd.map_err(|e| PyErr::new::(py, (e,))) - } - - def flush(&self) -> PyResult { - // All of our destinations are line-buffered. - Ok(Python::None(py)) + destination.write_stderr(payload.as_bytes()); + } + }); + } + + fn isatty(&self) -> bool { + if let Ok(fd) = self.fileno() { + unsafe { libc::isatty(fd) != 0 } + } else { + false } -}); + } + + fn fileno(&self) -> PyResult { + let destination = stdio::get_destination(); + let fd = if self.is_stdout { + destination.stdout_as_raw_fd() + } else { + destination.stderr_as_raw_fd() + }; + fd.map_err(PyException::new_err) + } + + fn flush(&self) { + // All of our destinations are line-buffered. + } +} diff --git a/src/rust/engine/src/interning.rs b/src/rust/engine/src/interning.rs index 78b3fb5111b..102fcc32320 100644 --- a/src/rust/engine/src/interning.rs +++ b/src/rust/engine/src/interning.rs @@ -5,8 +5,8 @@ use std::collections::HashMap; use std::hash; use std::sync::atomic; -use cpython::{ObjectProtocol, PyErr, Python, ToPyObject}; use parking_lot::{Mutex, RwLock}; +use pyo3::prelude::{PyResult, Python}; use crate::externs; use crate::python::{Fnv, Key, Value}; @@ -51,10 +51,10 @@ impl Interns { Interns::default() } - pub fn key_insert(&self, py: Python, v: Value) -> Result { + pub fn key_insert(&self, py: Python, v: Value) -> PyResult { let (intern_key, type_id) = { - let obj = v.to_py_object(py).into(); - (InternKey(v.hash(py)?, obj), (&v.get_type(py)).into()) + let obj = (*v).as_ref(py); + (InternKey(obj.hash()?, v.clone()), obj.get_type().into()) }; py.allow_threads(|| { @@ -99,8 +99,7 @@ impl Eq for InternKey {} impl PartialEq for InternKey { fn eq(&self, other: &InternKey) -> bool { - let gil = Python::acquire_gil(); - externs::equals(gil.python(), &self.1, &other.1) + Python::with_gil(|py| externs::equals((*self.1).as_ref(py), (*other.1).as_ref(py))) } } diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 3bab346650b..1dcdf7b9eb8 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -1,3 +1,6 @@ +// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + use std::collections::BTreeMap; use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; @@ -15,12 +18,12 @@ use crate::tasks::Intrinsic; use crate::types::Types; use crate::Failure; -use cpython::{ObjectProtocol, Python}; -use fs::{safe_create_dir_all_ioerror, RelativePath}; +use fs::{safe_create_dir_all_ioerror, PreparedPathGlobs, RelativePath}; use futures::future::{self, BoxFuture, FutureExt, TryFutureExt}; use hashing::{Digest, EMPTY_DIGEST}; use indexmap::IndexMap; use process_execution::{CacheDest, CacheName, NamedCaches}; +use pyo3::{PyAny, Python}; use stdio::TryCloneAsFile; use store::{SnapshotOps, SubsetParams}; use tempfile::TempDir; @@ -167,16 +170,16 @@ fn multi_platform_process_request_to_process_result( args: Vec, ) -> BoxFuture<'static, NodeResult> { async move { - let process_val = &args[0]; - // TODO: The platform will be used in a followup. - let _platform_val = &args[1]; - - let process_request = MultiPlatformExecuteProcess::lift(process_val).map_err(|str| { - throw(format!( - "Error lifting MultiPlatformExecuteProcess: {}", - str - )) + let process_request = Python::with_gil(|py| { + let py_process = (*args[0]).as_ref(py); + MultiPlatformExecuteProcess::lift(py_process).map_err(|str| { + throw(format!( + "Error lifting MultiPlatformExecuteProcess: {}", + str + )) + }) })?; + let result = context.get(process_request).await?.0; let maybe_stdout = context @@ -219,7 +222,7 @@ fn multi_platform_process_request_to_process_result( externs::store_bytes(py, &stderr_bytes), Snapshot::store_file_digest(py, &context.core.types, &result.stderr_digest), externs::store_i64(py, result.exit_code.into()), - Snapshot::store_directory_digest(py, &result.output_directory).map_err(throw)?, + Snapshot::store_directory_digest(py, result.output_directory).map_err(throw)?, externs::unsafe_call( py, context.core.types.platform, @@ -249,7 +252,11 @@ fn directory_digest_to_digest_contents( args: Vec, ) -> BoxFuture<'static, NodeResult> { async move { - let digest = lift_directory_digest(&args[0]).map_err(throw)?; + let digest = Python::with_gil(|py| { + let py_digest = (*args[0]).as_ref(py); + lift_directory_digest(py_digest) + }) + .map_err(throw)?; let snapshot = context .core .store() @@ -270,7 +277,11 @@ fn directory_digest_to_digest_entries( args: Vec, ) -> BoxFuture<'static, NodeResult> { async move { - let digest = lift_directory_digest(&args[0]).map_err(throw)?; + let digest = Python::with_gil(|py| { + let py_digest = (*args[0]).as_ref(py); + lift_directory_digest(py_digest) + }) + .map_err(throw)?; let snapshot = context .core .store() @@ -292,19 +303,23 @@ fn remove_prefix_request_to_digest( ) -> BoxFuture<'static, NodeResult> { let core = context.core; let store = core.store(); - async move { - let input_digest = - lift_directory_digest(&externs::getattr(&args[0], "digest").unwrap()).map_err(throw)?; - let prefix: String = externs::getattr(&args[0], "prefix").unwrap(); - let prefix = RelativePath::new(PathBuf::from(prefix)) + let (input_digest, raw_prefix) = Python::with_gil(|py| { + let py_remove_prefix = (*args[0]).as_ref(py); + let py_digest = externs::getattr(py_remove_prefix, "digest").unwrap(); + let input_digest = lift_directory_digest(py_digest).map_err(throw)?; + let prefix: String = externs::getattr(py_remove_prefix, "prefix").unwrap(); + let res: NodeResult<(Digest, String)> = Ok((input_digest, prefix)); + res + })?; + let prefix = RelativePath::new(PathBuf::from(raw_prefix)) .map_err(|e| throw(format!("The `prefix` must be relative: {:?}", e)))?; let digest = store .strip_prefix(input_digest, prefix) .await .map_err(|e| throw(format!("{:?}", e)))?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), &digest).map_err(throw) + Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) } .boxed() } @@ -316,17 +331,22 @@ fn add_prefix_request_to_digest( let core = context.core; let store = core.store(); async move { - let input_digest = - lift_directory_digest(&externs::getattr(&args[0], "digest").unwrap()).map_err(throw)?; - let prefix: String = externs::getattr(&args[0], "prefix").unwrap(); - let prefix = RelativePath::new(PathBuf::from(prefix)) + let (input_digest, raw_prefix) = Python::with_gil(|py| { + let py_add_prefix = (*args[0]).as_ref(py); + let py_digest = externs::getattr(py_add_prefix, "digest").unwrap(); + let input_digest = lift_directory_digest(py_digest).map_err(throw)?; + let prefix: String = externs::getattr(py_add_prefix, "prefix").unwrap(); + let res: NodeResult<(Digest, String)> = Ok((input_digest, prefix)); + res + })?; + let prefix = RelativePath::new(PathBuf::from(raw_prefix)) .map_err(|e| throw(format!("The `prefix` must be relative: {:?}", e)))?; let digest = store .add_prefix(input_digest, prefix) .await .map_err(|e| throw(format!("{:?}", e)))?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), &digest).map_err(throw) + Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) } .boxed() } @@ -334,7 +354,10 @@ fn add_prefix_request_to_digest( fn digest_to_snapshot(context: Context, args: Vec) -> BoxFuture<'static, NodeResult> { let store = context.core.store(); async move { - let digest = lift_directory_digest(&args[0])?; + let digest = Python::with_gil(|py| { + let py_digest = (*args[0]).as_ref(py); + lift_directory_digest(py_digest) + })?; let snapshot = store::Snapshot::from_digest(store, digest).await?; let gil = Python::acquire_gil(); Snapshot::store_snapshot(gil.python(), snapshot) @@ -349,19 +372,21 @@ fn merge_digests_request_to_digest( ) -> BoxFuture<'static, NodeResult> { let core = context.core; let store = core.store(); - let digests: Result, String> = - externs::getattr::>(&args[0], "digests") - .unwrap() - .into_iter() - .map(|val: Value| lift_directory_digest(&val)) - .collect(); async move { + let digests: Result, String> = Python::with_gil(|py| { + let py_merge_digests = (*args[0]).as_ref(py); + externs::getattr::>(py_merge_digests, "digests") + .unwrap() + .into_iter() + .map(|val| lift_directory_digest(val)) + .collect() + }); let digest = store .merge(digests.map_err(throw)?) .await .map_err(|e| throw(format!("{:?}", e)))?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), &digest).map_err(throw) + Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) } .boxed() } @@ -374,35 +399,39 @@ fn download_file_to_digest( let key = Key::from_value(args.pop().unwrap()).map_err(Failure::from_py_err)?; let digest = context.get(DownloadedFile(key)).await?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), &digest).map_err(throw) + Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) } .boxed() } fn path_globs_to_digest( context: Context, - mut args: Vec, + args: Vec, ) -> BoxFuture<'static, NodeResult> { async move { - let val = args.pop().unwrap(); - let path_globs = Snapshot::lift_path_globs(&val) - .map_err(|e| throw(format!("Failed to parse PathGlobs: {}", e)))?; + let path_globs = Python::with_gil(|py| { + let py_path_globs = (*args[0]).as_ref(py); + Snapshot::lift_path_globs(py_path_globs) + }) + .map_err(|e| throw(format!("Failed to parse PathGlobs: {}", e)))?; let digest = context.get(Snapshot::from_path_globs(path_globs)).await?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), &digest).map_err(throw) + Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) } .boxed() } fn path_globs_to_paths( context: Context, - mut args: Vec, + args: Vec, ) -> BoxFuture<'static, NodeResult> { let core = context.core.clone(); async move { - let val = args.pop().unwrap(); - let path_globs = Snapshot::lift_path_globs(&val) - .map_err(|e| throw(format!("Failed to parse PathGlobs: {}", e)))?; + let path_globs = Python::with_gil(|py| { + let py_path_globs = (*args[0]).as_ref(py); + Snapshot::lift_path_globs(py_path_globs) + }) + .map_err(|e| throw(format!("Failed to parse PathGlobs: {}", e)))?; let paths = context.get(Paths::from_path_globs(path_globs)).await?; let gil = Python::acquire_gil(); Paths::store_paths(gil.python(), &core, &paths).map_err(throw) @@ -423,20 +452,21 @@ fn create_digest_to_digest( let items: Vec = { let gil = Python::acquire_gil(); let py = gil.python(); - externs::collect_iterable(&args[0]) + let py_create_digest = (*args[0]).as_ref(py); + externs::collect_iterable(py_create_digest) .unwrap() .into_iter() .map(|obj| { - let raw_path: String = externs::getattr(&obj, "path").unwrap(); + let raw_path: String = externs::getattr(obj, "path").unwrap(); let path = RelativePath::new(PathBuf::from(raw_path)).unwrap(); - if obj.hasattr(py, "content").unwrap() { - let bytes = bytes::Bytes::from(externs::getattr::>(&obj, "content").unwrap()); - let is_executable: bool = externs::getattr(&obj, "is_executable").unwrap(); + if obj.hasattr("content").unwrap() { + let bytes = bytes::Bytes::from(externs::getattr::>(obj, "content").unwrap()); + let is_executable: bool = externs::getattr(obj, "is_executable").unwrap(); CreateDigestItem::FileContent(path, bytes, is_executable) - } else if obj.hasattr(py, "file_digest").unwrap() { - let py_digest = externs::getattr(&obj, "file_digest").unwrap(); - let digest = Snapshot::lift_file_digest(&py_digest).unwrap(); - let is_executable: bool = externs::getattr(&obj, "is_executable").unwrap(); + } else if obj.hasattr("file_digest").unwrap() { + let py_digest = externs::getattr(obj, "file_digest").unwrap(); + let digest = Snapshot::lift_file_digest(py_digest).unwrap(); + let is_executable: bool = externs::getattr(obj, "is_executable").unwrap(); CreateDigestItem::FileEntry(path, digest, is_executable) } else { CreateDigestItem::Dir(path) @@ -483,7 +513,7 @@ fn create_digest_to_digest( .await .map_err(|e| throw(format!("{:?}", e)))?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), &digest).map_err(throw) + Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) } .boxed() } @@ -492,20 +522,25 @@ fn digest_subset_to_digest( context: Context, args: Vec, ) -> BoxFuture<'static, NodeResult> { - let globs = externs::getattr(&args[0], "globs").unwrap(); let store = context.core.store(); - async move { - let path_globs = Snapshot::lift_prepared_path_globs(&globs).map_err(throw)?; - let original_digest = - lift_directory_digest(&externs::getattr(&args[0], "digest").unwrap()).map_err(throw)?; + let (path_globs, original_digest) = Python::with_gil(|py| { + let py_digest_subset = (*args[0]).as_ref(py); + let py_path_globs = externs::getattr(py_digest_subset, "globs").unwrap(); + let py_digest = externs::getattr(py_digest_subset, "digest").unwrap(); + let res: NodeResult<(PreparedPathGlobs, Digest)> = Ok(( + Snapshot::lift_prepared_path_globs(py_path_globs).map_err(throw)?, + lift_directory_digest(py_digest).map_err(throw)?, + )); + res + })?; let subset_params = SubsetParams { globs: path_globs }; let digest = store .subset(original_digest, subset_params) .await .map_err(|e| throw(format!("{:?}", e)))?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), &digest).map_err(throw) + Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) } .boxed() } @@ -520,33 +555,36 @@ fn run_id(context: Context, _args: Vec) -> BoxFuture<'static, NodeResult< fn interactive_process( context: Context, - mut args: Vec, + args: Vec, ) -> BoxFuture<'static, NodeResult> { async move { let types = &context.core.types; let interactive_process_result = types.interactive_process_result; - let value: Value = args.pop().unwrap(); - - let argv: Vec = externs::getattr(&value, "argv").unwrap(); - if argv.is_empty() { - return Err("Empty argv list not permitted".to_owned().into()); - } + let (argv, run_in_workspace, restartable, input_digest, env, append_only_caches) = Python::with_gil(|py| { + let py_interactive_process = (*args[0]).as_ref(py); + let argv: Vec = externs::getattr(py_interactive_process, "argv").unwrap(); + if argv.is_empty() { + return Err("Empty argv list not permitted".to_owned()); + } + let run_in_workspace: bool = externs::getattr(py_interactive_process, "run_in_workspace").unwrap(); + let restartable: bool = externs::getattr(py_interactive_process, "restartable").unwrap(); + let py_input_digest = externs::getattr(py_interactive_process, "input_digest").unwrap(); + let input_digest: Digest = lift_directory_digest(py_input_digest)?; + let env = externs::getattr_from_str_frozendict(py_interactive_process, "env"); - let run_in_workspace: bool = externs::getattr(&value, "run_in_workspace").unwrap(); - let restartable: bool = externs::getattr(&value, "restartable").unwrap(); - let input_digest_value: Value = externs::getattr(&value, "input_digest").unwrap(); - let input_digest: Digest = lift_directory_digest(&input_digest_value)?; - let env = externs::getattr_from_str_frozendict(&value, "env"); - let append_only_caches = externs::getattr_from_str_frozendict(&value, "append_only_caches") + let append_only_caches = externs::getattr_from_str_frozendict(py_interactive_process, "append_only_caches") .into_iter() .map(|(name, dest)| Ok((CacheName::new(name).unwrap(), CacheDest::new(dest).unwrap()))) .collect::, String>>()?; - let session = context.session; + if !append_only_caches.is_empty() && run_in_workspace { + return Err("Local interactive process cannot use append-only caches when run in workspace.".to_owned()); + } - if !append_only_caches.is_empty() && run_in_workspace { - return Err("Local interactive process cannot use append-only caches when run in workspace.".to_owned().into()); - } + Ok((argv, run_in_workspace, restartable, input_digest, env, append_only_caches)) + })?; + + let session = context.session; if !restartable { task_side_effected()?; @@ -578,43 +616,43 @@ fn interactive_process( } if !append_only_caches.is_empty() { - let named_caches = NamedCaches::new(context.core.named_caches_dir.clone()); - let named_cache_symlinks = named_caches - .local_paths(&append_only_caches) - .collect::>(); - - let destination = match maybe_tempdir { - Some(ref dir) => dir.path().to_path_buf(), - None => unreachable!(), - }; - - for named_cache_symlink in named_cache_symlinks { - safe_create_dir_all_ioerror(&named_cache_symlink.src).map_err(|err| { - format!( - "Error making {} for local execution: {:?}", - named_cache_symlink.src.display(), - err - ) - })?; - - let dst = destination.join(&named_cache_symlink.dst); - if let Some(dir) = dst.parent() { - safe_create_dir_all_ioerror(dir).map_err(|err| { - format!( - "Error making {} for local execution: {:?}", dir.display(), err - ) - })?; - } - symlink(&named_cache_symlink.src, &dst).map_err(|err| { - format!( - "Error linking {} -> {} for local execution: {:?}", - named_cache_symlink.src.display(), - dst.display(), - err - ) - })?; - } - } + let named_caches = NamedCaches::new(context.core.named_caches_dir.clone()); + let named_cache_symlinks = named_caches + .local_paths(&append_only_caches) + .collect::>(); + + let destination = match maybe_tempdir { + Some(ref dir) => dir.path().to_path_buf(), + None => unreachable!(), + }; + + for named_cache_symlink in named_cache_symlinks { + safe_create_dir_all_ioerror(&named_cache_symlink.src).map_err(|err| { + format!( + "Error making {} for local execution: {:?}", + named_cache_symlink.src.display(), + err + ) + })?; + + let dst = destination.join(&named_cache_symlink.dst); + if let Some(dir) = dst.parent() { + safe_create_dir_all_ioerror(dir).map_err(|err| { + format!( + "Error making {} for local execution: {:?}", dir.display(), err + ) + })?; + } + symlink(&named_cache_symlink.src, &dst).map_err(|err| { + format!( + "Error linking {} -> {} for local execution: {:?}", + named_cache_symlink.src.display(), + dst.display(), + err + ) + })?; + } + } let p = Path::new(&argv[0]); let program_name = match maybe_tempdir { diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 3dca6d690e8..7c6dd51f0ea 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -15,6 +15,8 @@ use bytes::Bytes; use futures::future::{self, BoxFuture, FutureExt, TryFutureExt}; use grpc_util::prost::MessageExt; use protos::gen::pants::cache::{CacheKey, CacheKeyType, ObservedUrl}; +use pyo3::prelude::{Py, PyAny, Python}; +use pyo3::IntoPy; use url::Url; use crate::context::{Context, Core}; @@ -24,7 +26,6 @@ use crate::python::{display_sorted_in_parens, throw, Failure, Key, Params, TypeI use crate::selectors; use crate::tasks::{self, Rule}; use crate::Types; -use cpython::{PyObject, Python, PythonObject}; use fs::{ self, DigestEntry, Dir, DirectoryListing, File, FileContent, FileEntry, GlobExpansionConjunction, GlobMatching, Link, PathGlobs, PathStat, PreparedPathGlobs, RelativePath, StrictGlobMatching, @@ -264,14 +265,13 @@ impl From