From 7e0138e5226185bf71050d797f1a0aec75451a2d Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Fri, 30 Dec 2022 17:31:34 +1100 Subject: [PATCH 1/5] Move retry logic into remote load_bytes_with --- src/rust/engine/fs/store/src/lib.rs | 60 +++------ src/rust/engine/fs/store/src/remote.rs | 127 ++++++++++--------- src/rust/engine/fs/store/src/remote_tests.rs | 5 +- 3 files changed, 83 insertions(+), 109 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 3461b93f5fe..6c4d567f50d 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -58,8 +58,6 @@ use fs::{ }; use futures::future::{self, BoxFuture, Either, FutureExt, TryFutureExt}; use grpc_util::prost::MessageExt; -use grpc_util::retry::{retry_call, status_is_retryable}; -use grpc_util::status_to_str; use hashing::Digest; use parking_lot::Mutex; use prost::Message; @@ -71,8 +69,6 @@ use sharded_lmdb::DEFAULT_LEASE_TIME; use tryfuture::try_future; use workunit_store::{in_workunit, Level, Metric}; -use crate::remote::ByteStoreError; - const KILOBYTES: usize = 1024; const MEGABYTES: usize = 1024 * KILOBYTES; const GIGABYTES: usize = 1024 * MEGABYTES; @@ -257,25 +253,15 @@ impl RemoteStore { // TODO(#17065): Now that we always copy from the remote store to the local store before // executing the caller's logic against the local store, // `remote::ByteStore::load_bytes_with` no longer needs to accept a function. - let bytes = retry_call( - remote_store, - |remote_store| async move { remote_store.load_bytes_with(digest, Ok).await }, - |err| match err { - ByteStoreError::Grpc(status) => status_is_retryable(status), - _ => false, - }, - ) - .await - .map_err(|err| match err { - ByteStoreError::Grpc(status) => status_to_str(status), - ByteStoreError::Other(msg) => msg, - })? - .ok_or_else(|| { - StoreError::MissingDigest( - "Was not present in either the local or remote store".to_owned(), - digest, - ) - })?; + let bytes = remote_store + .load_bytes_with(digest, Ok) + .await? + .ok_or_else(|| { + StoreError::MissingDigest( + "Was not present in either the local or remote store".to_owned(), + digest, + ) + })?; f_remote(bytes.clone())?; let stored_digest = local_store @@ -1076,27 +1062,13 @@ impl Store { return Err("Cannot load Trees from a remote without a remote".to_owned()); }; - let tree_opt = retry_call( - remote, - |remote| async move { - remote - .store - .load_bytes_with(tree_digest, |b| { - let tree = Tree::decode(b).map_err(|e| format!("protobuf decode error: {:?}", e))?; - Ok(tree) - }) - .await - }, - |err| match err { - ByteStoreError::Grpc(status) => status_is_retryable(status), - _ => false, - }, - ) - .await - .map_err(|err| match err { - ByteStoreError::Grpc(status) => status_to_str(status), - ByteStoreError::Other(msg) => msg, - })?; + let tree_opt = remote + .store + .load_bytes_with(tree_digest, |b| { + let tree = Tree::decode(b).map_err(|e| format!("protobuf decode error: {:?}", e))?; + Ok(tree) + }) + .await?; let tree = match tree_opt { Some(t) => t, diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index a963e4f863d..9c7f192da81 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -344,7 +344,7 @@ impl ByteStore { &self, digest: Digest, f: F, - ) -> Result, ByteStoreError> { + ) -> Result, String> { let start = Instant::now(); let store = self.clone(); let instance_name = store.instance_name.clone().unwrap_or_default(); @@ -356,80 +356,85 @@ impl ByteStore { digest.size_bytes ); let workunit_desc = format!("Loading bytes at: {resource_name}"); - let f = f.clone(); - let mut client = self.byte_stream_client.as_ref().clone(); - - let result_future = async move { - let mut start_opt = Some(Instant::now()); - - let stream_result = client - .read({ - protos::gen::google::bytestream::ReadRequest { - resource_name, - read_offset: 0, - // 0 means no limit. - read_limit: 0, - } - }) - .await; - - let mut stream = match stream_result { - Ok(response) => response.into_inner(), - Err(status) => { - return match status.code() { - Code::NotFound => Ok(None), - _ => Err(ByteStoreError::Grpc(status)), + let request = protos::gen::google::bytestream::ReadRequest { + resource_name, + read_offset: 0, + // 0 means no limit. + read_limit: 0, + }; + let client = self.byte_stream_client.as_ref().clone(); + + let result_future = retry_call( + (client, request, f), + move |(mut client, request, f)| async move { + let mut start_opt = Some(Instant::now()); + let stream_result = client.read(request).await; + + let mut stream = match stream_result { + Ok(response) => response.into_inner(), + Err(status) => { + return match status.code() { + Code::NotFound => Ok(None), + _ => Err(ByteStoreError::Grpc(status)), + } } - } - }; - - let read_result_closure = async { - let mut buf = BytesMut::with_capacity(digest.size_bytes); - while let Some(response) = stream.next().await { - // Record the observed time to receive the first response for this read. - if let Some(start) = start_opt.take() { - if let Some(workunit_store_handle) = workunit_store::get_workunit_store_handle() { - let timing: Result = - Instant::now().duration_since(start).as_micros().try_into(); - if let Ok(obs) = timing { - workunit_store_handle - .store - .record_observation(ObservationMetric::RemoteStoreTimeToFirstByteMicros, obs); + }; + + let read_result_closure = async { + let mut buf = BytesMut::with_capacity(digest.size_bytes); + while let Some(response) = stream.next().await { + // Record the observed time to receive the first response for this read. + if let Some(start) = start_opt.take() { + if let Some(workunit_store_handle) = workunit_store::get_workunit_store_handle() { + let timing: Result = + Instant::now().duration_since(start).as_micros().try_into(); + if let Ok(obs) = timing { + workunit_store_handle + .store + .record_observation(ObservationMetric::RemoteStoreTimeToFirstByteMicros, obs); + } } } + + buf.extend_from_slice(&(response?).data); } + Ok(buf.freeze()) + }; - buf.extend_from_slice(&(response?).data); - } - Ok(buf.freeze()) - }; - - let read_result: Result = read_result_closure.await; - - let maybe_bytes = match read_result { - Ok(bytes) => Some(bytes), - Err(status) => { - if status.code() == tonic::Code::NotFound { - None - } else { - return Err(ByteStoreError::Grpc(status)); + let read_result: Result = read_result_closure.await; + + let maybe_bytes = match read_result { + Ok(bytes) => Some(bytes), + Err(status) => { + if status.code() == tonic::Code::NotFound { + None + } else { + return Err(ByteStoreError::Grpc(status)); + } } - } - }; + }; - match maybe_bytes { - Some(b) => f(b).map(Some).map_err(ByteStoreError::Other), - None => Ok(None), - } - }; + match maybe_bytes { + Some(b) => f(b).map(Some).map_err(ByteStoreError::Other), + None => Ok(None), + } + }, + |err| match err { + ByteStoreError::Grpc(status) => status_is_retryable(status), + ByteStoreError::Other(_) => false, + }, + ); in_workunit!( "load_bytes_with", Level::Trace, desc = Some(workunit_desc), |workunit| async move { - let result = result_future.await; + let result = result_future.await.map_err(|err| match err { + ByteStoreError::Grpc(status) => status_to_str(status), + ByteStoreError::Other(msg) => msg, + }); workunit.record_observation( ObservationMetric::RemoteStoreReadBlobTimeMicros, start.elapsed().as_micros() as u64, diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index a5fed0d1811..a2c9afa5719 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -336,8 +336,5 @@ pub async fn load_directory_proto_bytes( } async fn load_bytes(store: &ByteStore, digest: Digest) -> Result, String> { - store - .load_bytes_with(digest, |b| Ok(b)) - .await - .map_err(|err| format!("{}", err)) + store.load_bytes_with(digest, |b| Ok(b)).await } From fb9e7a5f234e86153ecf3efbcb677e3d075b82d8 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Fri, 30 Dec 2022 18:07:00 +1100 Subject: [PATCH 2/5] Make remote load_bytes non-generic --- src/rust/engine/fs/store/src/lib.rs | 39 ++++++---------- src/rust/engine/fs/store/src/remote.rs | 47 ++++++-------------- src/rust/engine/fs/store/src/remote_tests.rs | 2 +- 3 files changed, 27 insertions(+), 61 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 6c4d567f50d..a9b3a387dec 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -250,18 +250,12 @@ impl RemoteStore { let remote_store = self.store.clone(); self .maybe_download(digest, async move { - // TODO(#17065): Now that we always copy from the remote store to the local store before - // executing the caller's logic against the local store, - // `remote::ByteStore::load_bytes_with` no longer needs to accept a function. - let bytes = remote_store - .load_bytes_with(digest, Ok) - .await? - .ok_or_else(|| { - StoreError::MissingDigest( - "Was not present in either the local or remote store".to_owned(), - digest, - ) - })?; + let bytes = remote_store.load_bytes(digest).await?.ok_or_else(|| { + StoreError::MissingDigest( + "Was not present in either the local or remote store".to_owned(), + digest, + ) + })?; f_remote(bytes.clone())?; let stored_digest = local_store @@ -1062,21 +1056,14 @@ impl Store { return Err("Cannot load Trees from a remote without a remote".to_owned()); }; - let tree_opt = remote - .store - .load_bytes_with(tree_digest, |b| { + match remote.store.load_bytes(tree_digest).await? { + Some(b) => { let tree = Tree::decode(b).map_err(|e| format!("protobuf decode error: {:?}", e))?; - Ok(tree) - }) - .await?; - - let tree = match tree_opt { - Some(t) => t, - None => return Ok(None), - }; - - let trie = DigestTrie::try_from(tree)?; - Ok(Some(trie.into())) + let trie = DigestTrie::try_from(tree)?; + Ok(Some(trie.into())) + } + None => Ok(None), + } } pub async fn lease_all_recursively<'a, Ds: Iterator>( diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 9c7f192da81..fcbfc4afa7a 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -337,14 +337,7 @@ impl ByteStore { .await } - pub async fn load_bytes_with< - T: Send + 'static, - F: Fn(Bytes) -> Result + Send + Sync + Clone + 'static, - >( - &self, - digest: Digest, - f: F, - ) -> Result, String> { + pub async fn load_bytes(&self, digest: Digest) -> Result, String> { let start = Instant::now(); let store = self.clone(); let instance_name = store.instance_name.clone().unwrap_or_default(); @@ -366,8 +359,8 @@ impl ByteStore { let client = self.byte_stream_client.as_ref().clone(); let result_future = retry_call( - (client, request, f), - move |(mut client, request, f)| async move { + (client, request), + move |(mut client, request)| async move { let mut start_opt = Some(Instant::now()); let stream_result = client.read(request).await; @@ -376,7 +369,7 @@ impl ByteStore { Err(status) => { return match status.code() { Code::NotFound => Ok(None), - _ => Err(ByteStoreError::Grpc(status)), + _ => Err(status), } } }; @@ -404,37 +397,23 @@ impl ByteStore { let read_result: Result = read_result_closure.await; - let maybe_bytes = match read_result { - Ok(bytes) => Some(bytes), - Err(status) => { - if status.code() == tonic::Code::NotFound { - None - } else { - return Err(ByteStoreError::Grpc(status)); - } - } - }; - - match maybe_bytes { - Some(b) => f(b).map(Some).map_err(ByteStoreError::Other), - None => Ok(None), + match read_result { + Ok(bytes) => Ok(Some(bytes)), + Err(status) => match status.code() { + Code::NotFound => Ok(None), + _ => Err(status), + }, } }, - |err| match err { - ByteStoreError::Grpc(status) => status_is_retryable(status), - ByteStoreError::Other(_) => false, - }, + status_is_retryable, ); in_workunit!( - "load_bytes_with", + "load_bytes", Level::Trace, desc = Some(workunit_desc), |workunit| async move { - let result = result_future.await.map_err(|err| match err { - ByteStoreError::Grpc(status) => status_to_str(status), - ByteStoreError::Other(msg) => msg, - }); + let result = result_future.await.map_err(status_to_str); workunit.record_observation( ObservationMetric::RemoteStoreReadBlobTimeMicros, start.elapsed().as_micros() as u64, diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index a2c9afa5719..f150e079af6 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -336,5 +336,5 @@ pub async fn load_directory_proto_bytes( } async fn load_bytes(store: &ByteStore, digest: Digest) -> Result, String> { - store.load_bytes_with(digest, |b| Ok(b)).await + store.load_bytes(digest).await } From 42be19809f3c7b09c35423d0fed41743478095ce Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Sun, 8 Jan 2023 17:34:24 +1100 Subject: [PATCH 3/5] Make ShardedLmdb::load_bytes non-generic, propagate upwards --- src/rust/engine/cache/src/lib.rs | 5 +- src/rust/engine/fs/fs_util/src/main.rs | 8 +-- src/rust/engine/fs/store/src/lib.rs | 69 ++++++++------------ src/rust/engine/fs/store/src/local.rs | 30 ++++----- src/rust/engine/fs/store/src/remote.rs | 4 +- src/rust/engine/fs/store/src/snapshot_ops.rs | 4 +- src/rust/engine/process_executor/src/main.rs | 12 ++-- src/rust/engine/sharded_lmdb/src/lib.rs | 11 +--- src/rust/engine/src/externs/interface.rs | 2 +- src/rust/engine/src/intrinsics.rs | 4 +- 10 files changed, 57 insertions(+), 92 deletions(-) diff --git a/src/rust/engine/cache/src/lib.rs b/src/rust/engine/cache/src/lib.rs index f45ea2bd56f..9d6bd0973e9 100644 --- a/src/rust/engine/cache/src/lib.rs +++ b/src/rust/engine/cache/src/lib.rs @@ -84,9 +84,6 @@ impl PersistentCache { pub async fn load(&self, key: &CacheKey) -> Result, String> { let fingerprint = Digest::of_bytes(&key.to_bytes()).hash; - self - .store - .load_bytes_with(fingerprint, move |bytes| Ok(Bytes::copy_from_slice(bytes))) - .await + self.store.load_bytes(fingerprint).await } } diff --git a/src/rust/engine/fs/fs_util/src/main.rs b/src/rust/engine/fs/fs_util/src/main.rs index 55802ff3745..32280ac22ae 100644 --- a/src/rust/engine/fs/fs_util/src/main.rs +++ b/src/rust/engine/fs/fs_util/src/main.rs @@ -32,7 +32,6 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; -use bytes::Bytes; use clap::{Arg, Command}; use fs::{ DirectoryDigest, GlobExpansionConjunction, GlobMatching, Permissions, PreparedPathGlobs, @@ -471,7 +470,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { let digest = Digest::new(fingerprint, size_bytes); Ok( store - .load_file_bytes_with(digest, |bytes| io::stdout().write_all(bytes).unwrap()) + .load_file_bytes_with(digest, |bytes| io::stdout().write_all(&bytes).unwrap()) .await?, ) } @@ -685,10 +684,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { .parse::() .expect("size_bytes must be a non-negative number"); let digest = Digest::new(fingerprint, size_bytes); - let bytes = match store - .load_file_bytes_with(digest, Bytes::copy_from_slice) - .await - { + let bytes = match store.load_file_bytes_with(digest, |b| b).await { Err(StoreError::MissingDigest { .. }) => store.load_directory(digest).await?.to_bytes(), Err(e) => return Err(e.into()), Ok(bytes) => bytes, diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index a9b3a387dec..674dfc13187 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -503,7 +503,7 @@ impl Store { /// pub async fn load_file_bytes_with< T: Send + 'static, - F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static, + F: Fn(Bytes) -> T + Clone + Send + Sync + 'static, >( &self, digest: Digest, @@ -514,7 +514,7 @@ impl Store { .load_bytes_with( EntryType::File, digest, - move |v: &[u8]| Ok(f(v)), + move |v: Bytes| Ok(f(v)), |_: Bytes| Ok(()), ) .await @@ -677,7 +677,7 @@ impl Store { digest, // Trust that locally stored values were canonical when they were written into the CAS // and only verify in debug mode, as it's slightly expensive. - move |bytes: &[u8]| { + move |bytes: Bytes| { let directory = remexec::Directory::decode(bytes).map_err(|e| { format!( "LMDB corruption: Directory bytes for {:?} were not valid: {:?}", @@ -727,7 +727,7 @@ impl Store { /// async fn load_bytes_with< T: Send + 'static, - FLocal: Fn(&[u8]) -> Result + Clone + Send + Sync + 'static, + FLocal: Fn(Bytes) -> Result + Clone + Send + Sync + 'static, FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static, >( &self, @@ -736,12 +736,8 @@ impl Store { f_local: FLocal, f_remote: FRemote, ) -> Result { - if let Some(bytes_res) = self - .local - .load_bytes_with(entry_type, digest, f_local.clone()) - .await? - { - return Ok(bytes_res?); + if let Some(bytes_res) = self.local.load_bytes(entry_type, digest).await? { + return Ok(f_local(bytes_res)?); } let remote = self.remote.clone().ok_or_else(|| { @@ -751,15 +747,12 @@ impl Store { .download_digest_to_local(self.local.clone(), digest, entry_type, f_remote) .await?; - Ok( - self - .local - .load_bytes_with(entry_type, digest, f_local) - .await? - .ok_or_else(|| { - format!("After downloading {digest:?}, the local store claimed that it was not present.") - })??, - ) + match self.local.load_bytes(entry_type, digest).await? { + Some(bytes_res) => Ok(f_local(bytes_res)?), + None => Err(format!( + "After downloading {digest:?}, the local store claimed that it was not present." + ))?, + } } /// @@ -868,11 +861,7 @@ impl Store { // // See https://github.com/pantsbuild/pants/pull/9793 for an earlier implementation // that used `Executor.block_on`, which avoided the clone but was blocking. - let maybe_bytes = local - .load_bytes_with(entry_type, digest, move |bytes| { - Bytes::copy_from_slice(bytes) - }) - .await?; + let maybe_bytes = local.load_bytes(entry_type, digest).await?; match maybe_bytes { Some(bytes) => Ok(remote.store_bytes(bytes).await?), None => Err(StoreError::MissingDigest( @@ -892,19 +881,8 @@ impl Store { digest: Digest, ) -> Result<(), StoreError> { remote - .store_buffered(digest, |mut buffer| async { - let result = local - .load_bytes_with(entry_type, digest, move |bytes| { - buffer.write_all(bytes).map_err(|e| { - format!( - "Failed to write {entry_type:?} {digest:?} to temporary buffer: {err}", - entry_type = entry_type, - digest = digest, - err = e - ) - }) - }) - .await?; + .store_buffered(digest, |mut buffer| async move { + let result = local.load_bytes(entry_type, digest).await?; match result { None => Err(StoreError::MissingDigest( format!( @@ -913,8 +891,15 @@ impl Store { ), digest, )), - Some(Err(err)) => Err(err.into()), - Some(Ok(())) => Ok(()), + Some(bytes) => buffer.write_all(&bytes).map_err(|e| { + format!( + "Failed to write {entry_type:?} {digest:?} to temporary buffer: {err}", + entry_type = entry_type, + digest = digest, + err = e + ) + .into() + }), } }) .await @@ -1400,7 +1385,7 @@ impl Store { e ) })?; - f.write_all(bytes) + f.write_all(&bytes) .map_err(|e| format!("Error writing file {}: {:?}", destination.display(), e))?; Ok(()) }) @@ -1446,7 +1431,7 @@ impl Store { let store = self.clone(); async move { let content = store - .load_file_bytes_with(digest, Bytes::copy_from_slice) + .load_file_bytes_with(digest, |b| b) .await .map_err(|e| e.enrich(&format!("Couldn't find file contents for {:?}", path)))?; Ok(FileContent { @@ -1595,7 +1580,7 @@ impl SnapshotOps for Store { async fn load_file_bytes_with< T: Send + 'static, - F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static, + F: Fn(Bytes) -> T + Clone + Send + Sync + 'static, >( &self, digest: Digest, diff --git a/src/rust/engine/fs/store/src/local.rs b/src/rust/engine/fs/store/src/local.rs index 55339eae320..94d7ebedd05 100644 --- a/src/rust/engine/fs/store/src/local.rs +++ b/src/rust/engine/fs/store/src/local.rs @@ -382,39 +382,33 @@ impl ByteStore { /// blocking, this accepts a function that views a slice rather than returning a clone of the /// data. The upshot is that the database is able to provide slices directly into shared memory. /// - pub async fn load_bytes_with T + Send + Sync + 'static>( + pub async fn load_bytes( &self, entry_type: EntryType, digest: Digest, - mut f: F, - ) -> Result, String> { + ) -> Result, String> { let start = Instant::now(); if digest == EMPTY_DIGEST { // Avoid I/O for this case. This allows some client-provided operations (like merging // snapshots) to work without needing to first store the empty snapshot. - return Ok(Some(f(&[]))); + return Ok(Some(Bytes::new())); } let dbs = match entry_type { EntryType::Directory => self.inner.directory_dbs.clone(), EntryType::File => self.inner.file_dbs.clone(), }?; - let res = dbs - .load_bytes_with(digest.hash, move |bytes| { - if bytes.len() == digest.size_bytes { - Ok(f(bytes)) - } else { - Err(format!( - "Got hash collision reading from store - digest {:?} was requested, but retrieved \ + let res = match dbs.load_bytes(digest.hash).await { + Ok(Some(bytes)) if bytes.len() != digest.size_bytes => Err(format!( + "Got hash collision reading from store - digest {:?} was requested, but retrieved \ bytes with that fingerprint had length {}. Congratulations, you may have broken \ sha256! Underlying bytes: {:?}", - digest, - bytes.len(), - bytes - )) - } - }) - .await; + digest, + bytes.len(), + bytes + )), + other => other, + }; if let Some(workunit_store_handle) = workunit_store::get_workunit_store_handle() { workunit_store_handle.store.record_observation( diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index fcbfc4afa7a..ac83cde6011 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -125,10 +125,10 @@ impl ByteStore { pub async fn store_buffered( &self, digest: Digest, - mut write_to_buffer: WriteToBuffer, + write_to_buffer: WriteToBuffer, ) -> Result<(), StoreError> where - WriteToBuffer: FnMut(std::fs::File) -> WriteResult, + WriteToBuffer: FnOnce(std::fs::File) -> WriteResult, WriteResult: Future>, { let write_buffer = tempfile::tempfile().map_err(|e| { diff --git a/src/rust/engine/fs/store/src/snapshot_ops.rs b/src/rust/engine/fs/store/src/snapshot_ops.rs index 2fd4c38d829..c407dffeba7 100644 --- a/src/rust/engine/fs/store/src/snapshot_ops.rs +++ b/src/rust/engine/fs/store/src/snapshot_ops.rs @@ -7,7 +7,7 @@ use std::fmt::{Debug, Display}; use std::iter::Iterator; use async_trait::async_trait; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use fs::{ directory, DigestTrie, DirectoryDigest, GlobMatching, PreparedPathGlobs, RelativePath, SymlinkBehavior, EMPTY_DIRECTORY_DIGEST, @@ -178,7 +178,7 @@ pub trait SnapshotOps: Clone + Send + Sync + 'static { async fn load_file_bytes_with< T: Send + 'static, - F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static, + F: Fn(Bytes) -> T + Clone + Send + Sync + 'static, >( &self, digest: Digest, diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 9366a9b258a..7fe7160fc0b 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -373,18 +373,18 @@ async fn main() { .unwrap(); } - let stdout: Vec = store - .load_file_bytes_with(result.stdout_digest, |bytes| bytes.to_vec()) + let stdout = store + .load_file_bytes_with(result.stdout_digest, |b| b) .await .unwrap(); - let stderr: Vec = store - .load_file_bytes_with(result.stderr_digest, |bytes| bytes.to_vec()) + let stderr = store + .load_file_bytes_with(result.stderr_digest, |b| b) .await .unwrap(); - print!("{}", String::from_utf8(stdout).unwrap()); - eprint!("{}", String::from_utf8(stderr).unwrap()); + print!("{}", std::str::from_utf8(&stdout).unwrap()); + eprint!("{}", std::str::from_utf8(&stderr).unwrap()); exit(result.exit_code); } diff --git a/src/rust/engine/sharded_lmdb/src/lib.rs b/src/rust/engine/sharded_lmdb/src/lib.rs index b6ebd708204..50537dd6b23 100644 --- a/src/rust/engine/sharded_lmdb/src/lib.rs +++ b/src/rust/engine/sharded_lmdb/src/lib.rs @@ -639,14 +639,7 @@ impl ShardedLmdb { (now_since_epoch + self.lease_time).as_secs() } - pub async fn load_bytes_with< - T: Send + 'static, - F: FnMut(&[u8]) -> Result + Send + Sync + 'static, - >( - &self, - fingerprint: Fingerprint, - mut f: F, - ) -> Result, String> { + pub async fn load_bytes(&self, fingerprint: Fingerprint) -> Result, String> { let store = self.clone(); let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); self @@ -657,7 +650,7 @@ impl ShardedLmdb { .begin_ro_txn() .map_err(|err| format!("Failed to begin read transaction: {}", err))?; match ro_txn.get(db, &effective_key) { - Ok(bytes) => f(bytes).map(Some), + Ok(bytes) => Ok(Some(Bytes::copy_from_slice(bytes))), Err(lmdb::Error::NotFound) => Ok(None), Err(err) => Err(format!( "Error loading versioned key {:?}: {}", diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index a7c46f937cc..a7c22039453 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -1602,7 +1602,7 @@ fn single_file_digests_to_bytes<'py>( .load_file_bytes_with(py_file_digest.0, |bytes| { let gil = Python::acquire_gil(); let py = gil.python(); - externs::store_bytes(py, bytes) + externs::store_bytes(py, &bytes) }) .await } diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index b867be9618a..52887988d86 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -182,10 +182,10 @@ fn process_request_to_process_result( let store = context.core.store(); let (stdout_bytes, stderr_bytes) = try_join!( store - .load_file_bytes_with(result.stdout_digest, |bytes: &[u8]| bytes.to_owned()) + .load_file_bytes_with(result.stdout_digest, |b| b) .map_err(|e| e.enrich("Bytes from stdout")), store - .load_file_bytes_with(result.stderr_digest, |bytes: &[u8]| bytes.to_owned()) + .load_file_bytes_with(result.stderr_digest, |b| b) .map_err(|e| e.enrich("Bytes from stderr")) )?; From cc43fd15728ac4e7146761871a2f6619904deca6 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Wed, 18 Jan 2023 18:50:30 +1100 Subject: [PATCH 4/5] Revert "Make ShardedLmdb::load_bytes non-generic, propagate upwards" This reverts commit 42be19809f3c7b09c35423d0fed41743478095ce. --- src/rust/engine/cache/src/lib.rs | 5 +- src/rust/engine/fs/fs_util/src/main.rs | 8 ++- src/rust/engine/fs/store/src/lib.rs | 69 ++++++++++++-------- src/rust/engine/fs/store/src/local.rs | 30 +++++---- src/rust/engine/fs/store/src/remote.rs | 4 +- src/rust/engine/fs/store/src/snapshot_ops.rs | 4 +- src/rust/engine/process_executor/src/main.rs | 12 ++-- src/rust/engine/sharded_lmdb/src/lib.rs | 11 +++- src/rust/engine/src/externs/interface.rs | 2 +- src/rust/engine/src/intrinsics.rs | 4 +- 10 files changed, 92 insertions(+), 57 deletions(-) diff --git a/src/rust/engine/cache/src/lib.rs b/src/rust/engine/cache/src/lib.rs index 9d6bd0973e9..f45ea2bd56f 100644 --- a/src/rust/engine/cache/src/lib.rs +++ b/src/rust/engine/cache/src/lib.rs @@ -84,6 +84,9 @@ impl PersistentCache { pub async fn load(&self, key: &CacheKey) -> Result, String> { let fingerprint = Digest::of_bytes(&key.to_bytes()).hash; - self.store.load_bytes(fingerprint).await + self + .store + .load_bytes_with(fingerprint, move |bytes| Ok(Bytes::copy_from_slice(bytes))) + .await } } diff --git a/src/rust/engine/fs/fs_util/src/main.rs b/src/rust/engine/fs/fs_util/src/main.rs index 32280ac22ae..55802ff3745 100644 --- a/src/rust/engine/fs/fs_util/src/main.rs +++ b/src/rust/engine/fs/fs_util/src/main.rs @@ -32,6 +32,7 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; +use bytes::Bytes; use clap::{Arg, Command}; use fs::{ DirectoryDigest, GlobExpansionConjunction, GlobMatching, Permissions, PreparedPathGlobs, @@ -470,7 +471,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { let digest = Digest::new(fingerprint, size_bytes); Ok( store - .load_file_bytes_with(digest, |bytes| io::stdout().write_all(&bytes).unwrap()) + .load_file_bytes_with(digest, |bytes| io::stdout().write_all(bytes).unwrap()) .await?, ) } @@ -684,7 +685,10 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { .parse::() .expect("size_bytes must be a non-negative number"); let digest = Digest::new(fingerprint, size_bytes); - let bytes = match store.load_file_bytes_with(digest, |b| b).await { + let bytes = match store + .load_file_bytes_with(digest, Bytes::copy_from_slice) + .await + { Err(StoreError::MissingDigest { .. }) => store.load_directory(digest).await?.to_bytes(), Err(e) => return Err(e.into()), Ok(bytes) => bytes, diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 674dfc13187..a9b3a387dec 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -503,7 +503,7 @@ impl Store { /// pub async fn load_file_bytes_with< T: Send + 'static, - F: Fn(Bytes) -> T + Clone + Send + Sync + 'static, + F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static, >( &self, digest: Digest, @@ -514,7 +514,7 @@ impl Store { .load_bytes_with( EntryType::File, digest, - move |v: Bytes| Ok(f(v)), + move |v: &[u8]| Ok(f(v)), |_: Bytes| Ok(()), ) .await @@ -677,7 +677,7 @@ impl Store { digest, // Trust that locally stored values were canonical when they were written into the CAS // and only verify in debug mode, as it's slightly expensive. - move |bytes: Bytes| { + move |bytes: &[u8]| { let directory = remexec::Directory::decode(bytes).map_err(|e| { format!( "LMDB corruption: Directory bytes for {:?} were not valid: {:?}", @@ -727,7 +727,7 @@ impl Store { /// async fn load_bytes_with< T: Send + 'static, - FLocal: Fn(Bytes) -> Result + Clone + Send + Sync + 'static, + FLocal: Fn(&[u8]) -> Result + Clone + Send + Sync + 'static, FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static, >( &self, @@ -736,8 +736,12 @@ impl Store { f_local: FLocal, f_remote: FRemote, ) -> Result { - if let Some(bytes_res) = self.local.load_bytes(entry_type, digest).await? { - return Ok(f_local(bytes_res)?); + if let Some(bytes_res) = self + .local + .load_bytes_with(entry_type, digest, f_local.clone()) + .await? + { + return Ok(bytes_res?); } let remote = self.remote.clone().ok_or_else(|| { @@ -747,12 +751,15 @@ impl Store { .download_digest_to_local(self.local.clone(), digest, entry_type, f_remote) .await?; - match self.local.load_bytes(entry_type, digest).await? { - Some(bytes_res) => Ok(f_local(bytes_res)?), - None => Err(format!( - "After downloading {digest:?}, the local store claimed that it was not present." - ))?, - } + Ok( + self + .local + .load_bytes_with(entry_type, digest, f_local) + .await? + .ok_or_else(|| { + format!("After downloading {digest:?}, the local store claimed that it was not present.") + })??, + ) } /// @@ -861,7 +868,11 @@ impl Store { // // See https://github.com/pantsbuild/pants/pull/9793 for an earlier implementation // that used `Executor.block_on`, which avoided the clone but was blocking. - let maybe_bytes = local.load_bytes(entry_type, digest).await?; + let maybe_bytes = local + .load_bytes_with(entry_type, digest, move |bytes| { + Bytes::copy_from_slice(bytes) + }) + .await?; match maybe_bytes { Some(bytes) => Ok(remote.store_bytes(bytes).await?), None => Err(StoreError::MissingDigest( @@ -881,8 +892,19 @@ impl Store { digest: Digest, ) -> Result<(), StoreError> { remote - .store_buffered(digest, |mut buffer| async move { - let result = local.load_bytes(entry_type, digest).await?; + .store_buffered(digest, |mut buffer| async { + let result = local + .load_bytes_with(entry_type, digest, move |bytes| { + buffer.write_all(bytes).map_err(|e| { + format!( + "Failed to write {entry_type:?} {digest:?} to temporary buffer: {err}", + entry_type = entry_type, + digest = digest, + err = e + ) + }) + }) + .await?; match result { None => Err(StoreError::MissingDigest( format!( @@ -891,15 +913,8 @@ impl Store { ), digest, )), - Some(bytes) => buffer.write_all(&bytes).map_err(|e| { - format!( - "Failed to write {entry_type:?} {digest:?} to temporary buffer: {err}", - entry_type = entry_type, - digest = digest, - err = e - ) - .into() - }), + Some(Err(err)) => Err(err.into()), + Some(Ok(())) => Ok(()), } }) .await @@ -1385,7 +1400,7 @@ impl Store { e ) })?; - f.write_all(&bytes) + f.write_all(bytes) .map_err(|e| format!("Error writing file {}: {:?}", destination.display(), e))?; Ok(()) }) @@ -1431,7 +1446,7 @@ impl Store { let store = self.clone(); async move { let content = store - .load_file_bytes_with(digest, |b| b) + .load_file_bytes_with(digest, Bytes::copy_from_slice) .await .map_err(|e| e.enrich(&format!("Couldn't find file contents for {:?}", path)))?; Ok(FileContent { @@ -1580,7 +1595,7 @@ impl SnapshotOps for Store { async fn load_file_bytes_with< T: Send + 'static, - F: Fn(Bytes) -> T + Clone + Send + Sync + 'static, + F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static, >( &self, digest: Digest, diff --git a/src/rust/engine/fs/store/src/local.rs b/src/rust/engine/fs/store/src/local.rs index 94d7ebedd05..55339eae320 100644 --- a/src/rust/engine/fs/store/src/local.rs +++ b/src/rust/engine/fs/store/src/local.rs @@ -382,33 +382,39 @@ impl ByteStore { /// blocking, this accepts a function that views a slice rather than returning a clone of the /// data. The upshot is that the database is able to provide slices directly into shared memory. /// - pub async fn load_bytes( + pub async fn load_bytes_with T + Send + Sync + 'static>( &self, entry_type: EntryType, digest: Digest, - ) -> Result, String> { + mut f: F, + ) -> Result, String> { let start = Instant::now(); if digest == EMPTY_DIGEST { // Avoid I/O for this case. This allows some client-provided operations (like merging // snapshots) to work without needing to first store the empty snapshot. - return Ok(Some(Bytes::new())); + return Ok(Some(f(&[]))); } let dbs = match entry_type { EntryType::Directory => self.inner.directory_dbs.clone(), EntryType::File => self.inner.file_dbs.clone(), }?; - let res = match dbs.load_bytes(digest.hash).await { - Ok(Some(bytes)) if bytes.len() != digest.size_bytes => Err(format!( - "Got hash collision reading from store - digest {:?} was requested, but retrieved \ + let res = dbs + .load_bytes_with(digest.hash, move |bytes| { + if bytes.len() == digest.size_bytes { + Ok(f(bytes)) + } else { + Err(format!( + "Got hash collision reading from store - digest {:?} was requested, but retrieved \ bytes with that fingerprint had length {}. Congratulations, you may have broken \ sha256! Underlying bytes: {:?}", - digest, - bytes.len(), - bytes - )), - other => other, - }; + digest, + bytes.len(), + bytes + )) + } + }) + .await; if let Some(workunit_store_handle) = workunit_store::get_workunit_store_handle() { workunit_store_handle.store.record_observation( diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index ac83cde6011..fcbfc4afa7a 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -125,10 +125,10 @@ impl ByteStore { pub async fn store_buffered( &self, digest: Digest, - write_to_buffer: WriteToBuffer, + mut write_to_buffer: WriteToBuffer, ) -> Result<(), StoreError> where - WriteToBuffer: FnOnce(std::fs::File) -> WriteResult, + WriteToBuffer: FnMut(std::fs::File) -> WriteResult, WriteResult: Future>, { let write_buffer = tempfile::tempfile().map_err(|e| { diff --git a/src/rust/engine/fs/store/src/snapshot_ops.rs b/src/rust/engine/fs/store/src/snapshot_ops.rs index c407dffeba7..2fd4c38d829 100644 --- a/src/rust/engine/fs/store/src/snapshot_ops.rs +++ b/src/rust/engine/fs/store/src/snapshot_ops.rs @@ -7,7 +7,7 @@ use std::fmt::{Debug, Display}; use std::iter::Iterator; use async_trait::async_trait; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use fs::{ directory, DigestTrie, DirectoryDigest, GlobMatching, PreparedPathGlobs, RelativePath, SymlinkBehavior, EMPTY_DIRECTORY_DIGEST, @@ -178,7 +178,7 @@ pub trait SnapshotOps: Clone + Send + Sync + 'static { async fn load_file_bytes_with< T: Send + 'static, - F: Fn(Bytes) -> T + Clone + Send + Sync + 'static, + F: Fn(&[u8]) -> T + Clone + Send + Sync + 'static, >( &self, digest: Digest, diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 7fe7160fc0b..9366a9b258a 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -373,18 +373,18 @@ async fn main() { .unwrap(); } - let stdout = store - .load_file_bytes_with(result.stdout_digest, |b| b) + let stdout: Vec = store + .load_file_bytes_with(result.stdout_digest, |bytes| bytes.to_vec()) .await .unwrap(); - let stderr = store - .load_file_bytes_with(result.stderr_digest, |b| b) + let stderr: Vec = store + .load_file_bytes_with(result.stderr_digest, |bytes| bytes.to_vec()) .await .unwrap(); - print!("{}", std::str::from_utf8(&stdout).unwrap()); - eprint!("{}", std::str::from_utf8(&stderr).unwrap()); + print!("{}", String::from_utf8(stdout).unwrap()); + eprint!("{}", String::from_utf8(stderr).unwrap()); exit(result.exit_code); } diff --git a/src/rust/engine/sharded_lmdb/src/lib.rs b/src/rust/engine/sharded_lmdb/src/lib.rs index 50537dd6b23..b6ebd708204 100644 --- a/src/rust/engine/sharded_lmdb/src/lib.rs +++ b/src/rust/engine/sharded_lmdb/src/lib.rs @@ -639,7 +639,14 @@ impl ShardedLmdb { (now_since_epoch + self.lease_time).as_secs() } - pub async fn load_bytes(&self, fingerprint: Fingerprint) -> Result, String> { + pub async fn load_bytes_with< + T: Send + 'static, + F: FnMut(&[u8]) -> Result + Send + Sync + 'static, + >( + &self, + fingerprint: Fingerprint, + mut f: F, + ) -> Result, String> { let store = self.clone(); let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); self @@ -650,7 +657,7 @@ impl ShardedLmdb { .begin_ro_txn() .map_err(|err| format!("Failed to begin read transaction: {}", err))?; match ro_txn.get(db, &effective_key) { - Ok(bytes) => Ok(Some(Bytes::copy_from_slice(bytes))), + Ok(bytes) => f(bytes).map(Some), Err(lmdb::Error::NotFound) => Ok(None), Err(err) => Err(format!( "Error loading versioned key {:?}: {}", diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index a7c22039453..a7c46f937cc 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -1602,7 +1602,7 @@ fn single_file_digests_to_bytes<'py>( .load_file_bytes_with(py_file_digest.0, |bytes| { let gil = Python::acquire_gil(); let py = gil.python(); - externs::store_bytes(py, &bytes) + externs::store_bytes(py, bytes) }) .await } diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 52887988d86..b867be9618a 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -182,10 +182,10 @@ fn process_request_to_process_result( let store = context.core.store(); let (stdout_bytes, stderr_bytes) = try_join!( store - .load_file_bytes_with(result.stdout_digest, |b| b) + .load_file_bytes_with(result.stdout_digest, |bytes: &[u8]| bytes.to_owned()) .map_err(|e| e.enrich("Bytes from stdout")), store - .load_file_bytes_with(result.stderr_digest, |b| b) + .load_file_bytes_with(result.stderr_digest, |bytes: &[u8]| bytes.to_owned()) .map_err(|e| e.enrich("Bytes from stderr")) )?; From d4c2c75052d0546fd5f8f26a4b902e7bd2ecc9e8 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Wed, 18 Jan 2023 21:30:38 +1100 Subject: [PATCH 5/5] Make ByteStoreError private --- src/rust/engine/fs/store/src/remote.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index fcbfc4afa7a..8fe294fd8c5 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -49,7 +49,7 @@ impl fmt::Debug for ByteStore { /// Represents an error from accessing a remote bytestore. #[derive(Debug)] -pub enum ByteStoreError { +enum ByteStoreError { /// gRPC error Grpc(Status),