Skip to content

Commit

Permalink
[internal] Refactor TypeId, Key, and Value for rust-cpython FFI (#13514)
Browse files Browse the repository at this point in the history
This switches from several free functions in `externs/mod.rs` to instead implement methods on `TypeId`, `Key`, and `Value`. That's more idiomatic. 

The new methods also tend to take `py: Python` now, rather than force-acquiring the GIL. This allows sharing the GIL across several function calls, which the PyO3 maintainers shared can be much more efficient to avoid overhead: #12451 (comment)

Finally, this deduplicates when `Debug` and `Display` had the same implementations.

[ci skip-build-wheels]
  • Loading branch information
Eric-Arellano authored Nov 7, 2021
1 parent 8f99f28 commit 4301cb4
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 175 deletions.
4 changes: 2 additions & 2 deletions src/rust/engine/src/externs/engine_aware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::context::Context;
use crate::externs;
use crate::nodes::{lift_directory_digest, lift_file_digest};
use crate::python::Value;
use crate::python::{TypeId, Value};
use crate::Failure;
use crate::Types;

Expand Down Expand Up @@ -66,7 +66,7 @@ impl EngineAwareReturnType {
}
};

let artifact_output = if externs::get_type_for(&value) == types.file_digest {
let artifact_output = if TypeId::new(&value.get_type(py)) == types.file_digest {
match lift_file_digest(types, &value) {
Ok(digest) => ArtifactOutput::FileDigest(digest),
Err(e) => {
Expand Down
94 changes: 50 additions & 44 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use workunit_store::{
use crate::{
externs, nodes, Context, Core, ExecutionRequest, ExecutionStrategyOptions, ExecutionTermination,
Failure, Function, Intrinsic, Intrinsics, Key, LocalStoreOptions, Params, RemotingOptions, Rule,
Scheduler, Session, Tasks, Types, Value,
Scheduler, Session, Tasks, TypeId, Types, Value,
};

py_exception!(native_engine, PollTimeout);
Expand Down Expand Up @@ -452,30 +452,30 @@ py_class!(class PyTypes |py| {
Self::create_instance(
py,
RefCell::new(Some(Types {
directory_digest: externs::type_for(py.get_type::<externs::fs::PyDigest>()),
file_digest: externs::type_for(file_digest),
snapshot: externs::type_for(snapshot),
paths: externs::type_for(paths),
file_content: externs::type_for(file_content),
file_entry: externs::type_for(file_entry),
directory: externs::type_for(directory),
digest_contents: externs::type_for(digest_contents),
digest_entries: externs::type_for(digest_entries),
path_globs: externs::type_for(path_globs),
merge_digests: externs::type_for(merge_digests),
add_prefix: externs::type_for(add_prefix),
remove_prefix: externs::type_for(remove_prefix),
create_digest: externs::type_for(create_digest),
digest_subset: externs::type_for(digest_subset),
download_file: externs::type_for(download_file),
platform: externs::type_for(platform),
multi_platform_process: externs::type_for(multi_platform_process),
process_result: externs::type_for(process_result),
coroutine: externs::type_for(coroutine),
session_values: externs::type_for(session_values),
interactive_process: externs::type_for(interactive_process),
interactive_process_result: externs::type_for(interactive_process_result),
engine_aware_parameter: externs::type_for(engine_aware_parameter),
directory_digest: TypeId::new(&py.get_type::<externs::fs::PyDigest>()),
file_digest: TypeId::new(&file_digest),
snapshot: TypeId::new(&snapshot),
paths: TypeId::new(&paths),
file_content: TypeId::new(&file_content),
file_entry: TypeId::new(&file_entry),
directory: TypeId::new(&directory),
digest_contents: TypeId::new(&digest_contents),
digest_entries: TypeId::new(&digest_entries),
path_globs: TypeId::new(&path_globs),
merge_digests: TypeId::new(&merge_digests),
add_prefix: TypeId::new(&add_prefix),
remove_prefix: TypeId::new(&remove_prefix),
create_digest: TypeId::new(&create_digest),
digest_subset: TypeId::new(&digest_subset),
download_file: TypeId::new(&download_file),
platform: TypeId::new(&platform),
multi_platform_process: TypeId::new(&multi_platform_process),
process_result: TypeId::new(&process_result),
coroutine: TypeId::new(&coroutine),
session_values: TypeId::new(&session_values),
interactive_process: TypeId::new(&interactive_process),
interactive_process_result: TypeId::new(&interactive_process_result),
engine_aware_parameter: TypeId::new(&engine_aware_parameter),
})),
)
}
Expand Down Expand Up @@ -848,12 +848,12 @@ fn strongly_connected_components(
let mut node_ids: HashMap<Key, _> = HashMap::new();

for (node, adjacency_list) in adjacency_lists {
let node_key = externs::key_for(node.clone_ref(py).into())?;
let node_key = Key::from_value(node.clone_ref(py).into())?;
let node_id = *node_ids
.entry(node_key)
.or_insert_with(|| graph.add_node(node_key));
for dependency in adjacency_list {
let dependency_key = externs::key_for(dependency.clone_ref(py).into())?;
let dependency_key = Key::from_value(dependency.clone_ref(py).into())?;
let dependency_id = node_ids
.entry(dependency_key)
.or_insert_with(|| graph.add_node(dependency_key));
Expand All @@ -867,7 +867,7 @@ fn strongly_connected_components(
.map(|component| {
component
.into_iter()
.map(|node_id| externs::val_for(&graph[node_id]).consume_into_py_object(py))
.map(|node_id| graph[node_id].to_value().consume_into_py_object(py))
.collect::<Vec<_>>()
})
.collect(),
Expand Down Expand Up @@ -1260,10 +1260,10 @@ fn execution_add_root_select(
) -> PyUnitResult {
with_scheduler(py, scheduler_ptr, |scheduler| {
with_execution_request(py, execution_request_ptr, |execution_request| {
let product = externs::type_for(product);
let product = TypeId::new(&product);
let keys = param_vals
.into_iter()
.map(|p| externs::key_for(p.into()))
.map(|p| Key::from_value(p.into()))
.collect::<Result<Vec<_>, _>>()?;
Params::new(keys)
.and_then(|params| scheduler.add_root_select(execution_request, params, product))
Expand All @@ -1289,8 +1289,8 @@ fn tasks_task_begin(
.try_into()
.map_err(|e| PyErr::new::<exc::Exception, _>(py, (format!("{}", e),)))?;
with_tasks(py, tasks_ptr, |tasks| {
let func = Function(externs::key_for(func.into())?);
let output_type = externs::type_for(output_type);
let func = Function(Key::from_value(func.into())?);
let output_type = TypeId::new(&output_type);
tasks.task_begin(
func,
output_type,
Expand All @@ -1314,8 +1314,8 @@ fn tasks_task_end(py: Python, tasks_ptr: PyTasks) -> PyUnitResult {

fn tasks_add_get(py: Python, tasks_ptr: PyTasks, output: PyType, input: PyType) -> PyUnitResult {
with_tasks(py, tasks_ptr, |tasks| {
let output = externs::type_for(output);
let input = externs::type_for(input);
let output = TypeId::new(&output);
let input = TypeId::new(&input);
tasks.add_get(output, input);
Ok(None)
})
Expand All @@ -1329,16 +1329,19 @@ fn tasks_add_union(
) -> PyUnitResult {
with_tasks(py, tasks_ptr, |tasks| {
tasks.add_union(
externs::type_for(output_type),
input_types.into_iter().map(externs::type_for).collect(),
TypeId::new(&output_type),
input_types
.into_iter()
.map(|type_id| TypeId::new(&type_id))
.collect(),
);
Ok(None)
})
}

fn tasks_add_select(py: Python, tasks_ptr: PyTasks, selector: PyType) -> PyUnitResult {
with_tasks(py, tasks_ptr, |tasks| {
let selector = externs::type_for(selector);
let selector = TypeId::new(&selector);
tasks.add_select(selector);
Ok(None)
})
Expand All @@ -1352,8 +1355,11 @@ fn tasks_add_query(
) -> PyUnitResult {
with_tasks(py, tasks_ptr, |tasks| {
tasks.query_add(
externs::type_for(output_type),
input_types.into_iter().map(externs::type_for).collect(),
TypeId::new(&output_type),
input_types
.into_iter()
.map(|type_id| TypeId::new(&type_id))
.collect(),
);
Ok(None)
})
Expand Down Expand Up @@ -1516,20 +1522,20 @@ fn rule_graph_consumed_types(
with_scheduler(py, scheduler_ptr, |scheduler| {
let param_types = param_types
.into_iter()
.map(externs::type_for)
.map(|type_id| TypeId::new(&type_id))
.collect::<Vec<_>>();

let subgraph = scheduler
.core
.rule_graph
.subgraph(param_types, externs::type_for(product_type))
.subgraph(param_types, TypeId::new(&product_type))
.map_err(|e| PyErr::new::<exc::ValueError, _>(py, (e,)))?;

Ok(
subgraph
.consumed_types()
.into_iter()
.map(externs::type_for_type_id)
.map(|type_id| type_id.as_py_type(py))
.collect(),
)
})
Expand Down Expand Up @@ -1559,9 +1565,9 @@ fn rule_subgraph_visualize(
with_scheduler(py, scheduler_ptr, |scheduler| {
let param_types = param_types
.into_iter()
.map(externs::type_for)
.map(|py_type| TypeId::new(&py_type))
.collect::<Vec<_>>();
let product_type = externs::type_for(product_type);
let product_type = TypeId::new(&product_type);
let path = PathBuf::from(path);

// TODO(#7117): we want to represent union types in the graph visualizer somehow!!!
Expand Down
63 changes: 7 additions & 56 deletions src/rust/engine/src/externs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,6 @@ pub fn none() -> PyObject {
gil.python().None()
}

pub fn get_type_for(val: &PyObject) -> TypeId {
let gil = Python::acquire_gil();
let py = gil.python();
(&val.get_type(py)).into()
}

pub fn is_union(ty: TypeId) -> bool {
let gil = Python::acquire_gil();
let py = gil.python();
let py_type = (&ty).as_py_type(py);
let unions = py.import("pants.engine.unions").unwrap();
unions
.call(py, "is_union", (py_type,), None)
.unwrap()
.extract(py)
.unwrap()
}

pub fn equals(h1: &PyObject, h2: &PyObject) -> bool {
let gil = Python::acquire_gil();
let py = gil.python();
Expand All @@ -72,24 +54,6 @@ pub fn equals(h1: &PyObject, h2: &PyObject) -> bool {
.is_true()
}

pub fn type_for_type_id(ty: TypeId) -> PyType {
let gil = Python::acquire_gil();
(&ty).as_py_type(gil.python())
}

pub fn type_for(py_type: PyType) -> TypeId {
(&py_type).into()
}

pub fn key_for(val: Value) -> Result<Key, PyErr> {
let gil = Python::acquire_gil();
INTERNS.key_insert(gil.python(), val)
}

pub fn val_for(key: &Key) -> Value {
INTERNS.key_get(key)
}

pub fn store_tuple(values: Vec<Value>) -> Value {
let gil = Python::acquire_gil();
let arg_handles: Vec<_> = values
Expand Down Expand Up @@ -225,14 +189,6 @@ pub fn getattr_as_string(value: &PyObject, field: &str) -> String {
val_to_str(&getattr(value, field).unwrap())
}

pub fn key_to_str(key: &Key) -> String {
val_to_str(val_for(key).as_ref())
}

pub fn type_to_str(type_id: TypeId) -> String {
getattr_as_string(&type_for_type_id(type_id).into_object(), "__name__")
}

pub fn val_to_str(obj: &PyObject) -> String {
let gil = Python::acquire_gil();
let py = gil.python();
Expand Down Expand Up @@ -341,25 +297,25 @@ pub fn generator_send(generator: &Value, arg: &Value) -> Result<GeneratorRespons
/// those configured in types::Types.
///
pub fn unsafe_call(type_id: TypeId, args: &[Value]) -> Value {
let py_type = type_for_type_id(type_id);
let arg_handles: Vec<PyObject> = args.iter().map(|v| v.clone().into()).collect();
let gil = Python::acquire_gil();
let args_tuple = PyTuple::new(gil.python(), &arg_handles);
let py = gil.python();
let py_type = type_id.as_py_type(py);
let arg_handles: Vec<PyObject> = args.iter().map(|v| v.clone().into()).collect();
let args_tuple = PyTuple::new(py, &arg_handles);
py_type
.call(gil.python(), args_tuple, None)
.map(Value::from)
.unwrap_or_else(|e| {
let gil = Python::acquire_gil();
panic!(
"Core type constructor `{}` failed: {:?}",
py_type.name(gil.python()),
py_type.name(py),
e
);
})
}

lazy_static! {
static ref INTERNS: Interns = Interns::new();
pub static ref INTERNS: Interns = Interns::new();
}

py_class!(pub class PyGeneratorResponseBreak |py| {
Expand Down Expand Up @@ -406,12 +362,7 @@ impl Get {

impl fmt::Display for Get {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(
f,
"Get({}, {})",
type_to_str(self.output),
key_to_str(&self.input)
)
write!(f, "Get({}, {})", self.output, self.input)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::nodes::{
lift_directory_digest, task_side_effected, DownloadedFile, MultiPlatformExecuteProcess,
NodeResult, Paths, SessionValues, Snapshot,
};
use crate::python::{throw, Value};
use crate::python::{throw, Key, Value};
use crate::tasks::Intrinsic;
use crate::types::Types;
use crate::Failure;
Expand Down Expand Up @@ -332,7 +332,7 @@ fn download_file_to_digest(
mut args: Vec<Value>,
) -> BoxFuture<'static, NodeResult<Value>> {
async move {
let key = externs::key_for(args.pop().unwrap()).map_err(Failure::from_py_err)?;
let key = Key::from_value(args.pop().unwrap()).map_err(Failure::from_py_err)?;
let digest = context.get(DownloadedFile(key)).await?;
Snapshot::store_directory_digest(&digest).map_err(|s| throw(&s))
}
Expand Down
Loading

0 comments on commit 4301cb4

Please sign in to comment.