Skip to content

Commit

Permalink
[internal] Separate holding of GIL in create_digest_to_digest() (#1…
Browse files Browse the repository at this point in the history
…3586)

Prework for #13526.

Before, we were parsing the Python objects (which requires the GIL) in the same iterator that created async blocks. That won't work when we port to PyO3 - iterating over the `file_items` collection uses `&'py PyAny` values, where the `'py` lifetime corresponds to how long we are holding the GIL (as represented by the `pyo3::Python` type). The `Python` token is not safe in async blocks because it does not implement `Send`, so we can't create the futures in this iterator.

This fix makes the code much more understandable as well. In the future, we are unblocked from defining `CreateDigest` in Rust with `PyO3` because parsing of the code is now completely decoupled from using it to create async futures. 

[ci skip-build-wheels]
  • Loading branch information
Eric-Arellano authored Nov 11, 2021
1 parent 60550cb commit a655638
Showing 1 changed file with 54 additions and 40 deletions.
94 changes: 54 additions & 40 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,62 +386,76 @@ fn path_globs_to_paths(
.boxed()
}

enum CreateDigestItem {
FileContent(RelativePath, bytes::Bytes, bool),
FileEntry(RelativePath, Digest, bool),
Dir(RelativePath),
}

fn create_digest_to_digest(
context: Context,
args: Vec<Value>,
) -> BoxFuture<'static, NodeResult<Value>> {
let file_items = externs::collect_iterable(&args[0]).unwrap();
let digests: Vec<_> = file_items
let items: Vec<CreateDigestItem> = {
let gil = Python::acquire_gil();
let py = gil.python();
externs::collect_iterable(&args[0])
.unwrap()
.into_iter()
.map(|obj| {
let raw_path: String = externs::getattr(&obj, "path").unwrap();
let path = RelativePath::new(PathBuf::from(raw_path)).unwrap();
if obj.hasattr(py, "content").unwrap() {
let bytes = bytes::Bytes::from(externs::getattr::<Vec<u8>>(&obj, "content").unwrap());
let is_executable: bool = externs::getattr(&obj, "is_executable").unwrap();
CreateDigestItem::FileContent(path, bytes, is_executable)
} else if obj.hasattr(py, "file_digest").unwrap() {
let py_digest = externs::getattr(&obj, "file_digest").unwrap();
let digest = Snapshot::lift_file_digest(&py_digest).unwrap();
let is_executable: bool = externs::getattr(&obj, "is_executable").unwrap();
CreateDigestItem::FileEntry(path, digest, is_executable)
} else {
CreateDigestItem::Dir(path)
}
})
.collect()
};

let digest_futures: Vec<_> = items
.into_iter()
.map(|file_item| {
let path: String = externs::getattr(&file_item, "path").unwrap();
.map(|item| {
let store = context.core.store();
async move {
let path = RelativePath::new(PathBuf::from(path))
.map_err(|e| format!("The `path` must be relative: {:?}", e))?;

let (is_file_content, is_file_entry) = {
let gil = Python::acquire_gil();
let py = gil.python();
(
file_item.hasattr(py, "content").unwrap(),
file_item.hasattr(py, "file_digest").unwrap(),
)
};

if is_file_content {
let bytes =
bytes::Bytes::from(externs::getattr::<Vec<u8>>(&file_item, "content").unwrap());
let is_executable: bool = externs::getattr(&file_item, "is_executable").unwrap();

let digest = store.store_file_bytes(bytes, true).await?;
let snapshot = store
.snapshot_of_one_file(path, digest, is_executable)
.await?;
let res: Result<_, String> = Ok(snapshot.digest);
res
} else if is_file_entry {
let digest_obj = externs::getattr(&file_item, "file_digest")?;
let digest = Snapshot::lift_file_digest(&digest_obj)?;
let is_executable: bool = externs::getattr(&file_item, "is_executable").unwrap();
let snapshot = store
.snapshot_of_one_file(path, digest, is_executable)
.await?;
let res: Result<_, String> = Ok(snapshot.digest);
res
} else {
store
match item {
CreateDigestItem::FileContent(path, bytes, is_executable) => {
let digest = store.store_file_bytes(bytes, true).await?;
let snapshot = store
.snapshot_of_one_file(path, digest, is_executable)
.await?;
let res: Result<_, String> = Ok(snapshot.digest);
res
}
CreateDigestItem::FileEntry(path, digest, is_executable) => {
let snapshot = store
.snapshot_of_one_file(path, digest, is_executable)
.await?;
let res: Result<_, String> = Ok(snapshot.digest);
res
}
CreateDigestItem::Dir(path) => store
.create_empty_dir(path)
.await
.map_err(|e| format!("{:?}", e))
.map_err(|e| format!("{:?}", e)),
}
}
})
.collect();

let store = context.core.store();
async move {
let digests = future::try_join_all(digests).await.map_err(|e| throw(&e))?;
let digests = future::try_join_all(digest_futures)
.await
.map_err(|e| throw(&e))?;
let digest = store
.merge(digests)
.await
Expand Down

0 comments on commit a655638

Please sign in to comment.