diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 571127d618e..91273fe9335 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -1964,16 +1964,6 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" -[[package]] -name = "memmap" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "memoffset" version = "0.6.4" @@ -3364,7 +3354,6 @@ dependencies = [ "lmdb-rkv", "log", "madvise", - "memmap", "mock", "num_cpus", "parking_lot 0.12.1", @@ -3379,6 +3368,7 @@ dependencies = [ "testutil", "tokio", "tokio-rustls", + "tokio-util 0.7.8", "tonic", "tower-service", "tryfuture", @@ -3541,6 +3531,7 @@ dependencies = [ "hashing", "prost", "protos", + "tokio", ] [[package]] diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index d75d29aea8f..d5b12c405a6 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -244,7 +244,6 @@ lmdb-rkv = { git = "https://github.com/pantsbuild/lmdb-rs.git", rev = "6ae7a552a log = "0.4.17" madvise = "0.1" maplit = "1.0.1" -memmap = "0.7" nails = "0.13" nix = "0.25" notify = { git = "https://github.com/pantsbuild/notify", rev = "276af0f3c5f300bfd534941386ba2f3b3a022df7" } diff --git a/src/rust/engine/fs/store/Cargo.toml b/src/rust/engine/fs/store/Cargo.toml index b0f7d9ae095..a0ae34a9358 100644 --- a/src/rust/engine/fs/store/Cargo.toml +++ b/src/rust/engine/fs/store/Cargo.toml @@ -25,7 +25,6 @@ itertools = { workspace = true } lmdb-rkv = { workspace = true } log = { workspace = true } madvise = { workspace = true } -memmap = { workspace = true } parking_lot = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } @@ -36,6 +35,7 @@ task_executor = { path = "../../task_executor" } tempfile = { workspace = true } tokio-rustls = { workspace = true } tokio = { workspace = true, features = ["fs"] } +tokio-util = { workspace = true, features = ["io"] } tonic = { workspace = true, features = ["transport", "codegen", "tls", "tls-roots", "prost"] } tower-service = { workspace = true } tryfuture = { path = "../../tryfuture" } diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 240e368691a..54935c47232 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -806,15 +806,16 @@ impl Store { remote_store .clone() .maybe_upload(digest, async move { - // TODO(John Sirois): Consider allowing configuration of when to buffer large blobs - // to disk to be independent of the remote store wire chunk size. - if digest.size_bytes > remote_store.store.chunk_size_bytes() { - Self::store_large_blob_remote(local, remote_store.store, entry_type, digest) - .await - } else { - Self::store_small_blob_remote(local, remote_store.store, entry_type, digest) - .await - } + match local.load_from_fs(digest).await? { + Some(path) => { + Self::store_fsdb_blob_remote(remote_store.store, digest, path).await? + } + None => { + Self::store_lmdb_blob_remote(local, remote_store.store, entry_type, digest) + .await? + } + }; + Ok(()) }) .await } @@ -837,7 +838,7 @@ impl Store { .boxed() } - async fn store_small_blob_remote( + async fn store_lmdb_blob_remote( local: local::ByteStore, remote: remote::ByteStore, entry_type: EntryType, @@ -845,7 +846,9 @@ impl Store { ) -> Result<(), StoreError> { // We need to copy the bytes into memory so that they may be used safely in an async // future. While this unfortunately increases memory consumption, we prioritize - // being able to run `remote.store_bytes()` as async. + // being able to run `remote.store_bytes()` as async. In addition, this is only used + // for blobs in the LMDB store, most of which are small: large blobs end up in the + // FSDB store. // // See https://github.com/pantsbuild/pants/pull/9793 for an earlier implementation // that used `Executor.block_on`, which avoided the clone but was blocking. @@ -863,31 +866,16 @@ impl Store { } } - async fn store_large_blob_remote( - local: local::ByteStore, + async fn store_fsdb_blob_remote( remote: remote::ByteStore, - entry_type: EntryType, digest: Digest, + path: PathBuf, ) -> Result<(), StoreError> { - remote - .store_buffered(digest, |mut buffer| async { - let result = local - .load_bytes_with(entry_type, digest, move |bytes| { - buffer.write_all(bytes).map_err(|e| { - format!("Failed to write {entry_type:?} {digest:?} to temporary buffer: {e}") - }) - }) - .await?; - match result { - None => Err(StoreError::MissingDigest( - format!("Failed to upload {entry_type:?}: Not found in local store",), - digest, - )), - Some(Err(err)) => Err(err.into()), - Some(Ok(())) => Ok(()), - } - }) + let file = tokio::fs::File::open(&path) .await + .map_err(|e| format!("failed to read {digest:?} from {path:?}: {e}"))?; + remote.store_file(digest, file).await?; + Ok(()) } /// diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 431140a9f04..ce0dc5882e3 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -2,7 +2,6 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, HashSet}; use std::fmt; -use std::ops::Range; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -14,33 +13,36 @@ use hashing::Digest; use log::Level; use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::ServerCapabilities; +use tokio::fs::File; use tokio::io::{AsyncSeekExt, AsyncWrite}; use workunit_store::{in_workunit, ObservationMetric}; -use crate::StoreError; - mod reapi; #[cfg(test)] mod reapi_tests; -pub type ByteSource = Arc<(dyn Fn(Range) -> Bytes + Send + Sync + 'static)>; - #[async_trait] pub trait ByteStoreProvider: Sync + Send + 'static { - async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String>; + /// Store the bytes readable from `file` into the remote store + async fn store_file(&self, digest: Digest, file: File) -> Result<(), String>; + + /// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the + /// bytes are already in memory + async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>; + /// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns + /// true when found, false when not. async fn load( &self, digest: Digest, destination: &mut dyn LoadDestination, ) -> Result; + /// Return any digests from `digests` that are not (currently) available in the remote store. async fn list_missing_digests( &self, digests: &mut (dyn Iterator + Send), ) -> Result, String>; - - fn chunk_size_bytes(&self) -> usize; } // TODO: Consider providing `impl Default`, similar to `super::LocalOptions`. @@ -110,74 +112,40 @@ impl ByteStore { Ok(ByteStore::new(instance_name, provider)) } - pub(crate) fn chunk_size_bytes(&self) -> usize { - self.provider.chunk_size_bytes() - } - - pub async fn store_buffered( - &self, - digest: Digest, - write_to_buffer: WriteToBuffer, - ) -> Result<(), StoreError> - where - WriteToBuffer: FnOnce(std::fs::File) -> WriteResult, - WriteResult: Future>, - { - let write_buffer = tempfile::tempfile().map_err(|e| { - format!("Failed to create a temporary blob upload buffer for {digest:?}: {e}") - })?; - let read_buffer = write_buffer.try_clone().map_err(|e| { - format!("Failed to create a read handle for the temporary upload buffer for {digest:?}: {e}") - })?; - write_to_buffer(write_buffer).await?; - - // Unsafety: Mmap presents an immutable slice of bytes, but the underlying file that is mapped - // could be mutated by another process. We guard against this by creating an anonymous - // temporary file and ensuring it is written to and closed via the only other handle to it in - // the code just above. - let mmap = Arc::new(unsafe { - let mapping = memmap::Mmap::map(&read_buffer).map_err(|e| { - format!("Failed to memory map the temporary file buffer for {digest:?}: {e}") - })?; - if let Err(err) = madvise::madvise( - mapping.as_ptr(), - mapping.len(), - madvise::AccessPattern::Sequential, - ) { - log::warn!( - "Failed to madvise(MADV_SEQUENTIAL) for the memory map of the temporary file buffer for \ - {digest:?}. Continuing with possible reduced performance: {err}", - digest = digest, - err = err - ) - } - Ok(mapping) as Result - }?); - + /// Store the bytes readable from `file` into the remote store + pub async fn store_file(&self, digest: Digest, file: File) -> Result<(), String> { self - .store_bytes_source( - digest, - Arc::new(move |range| Bytes::copy_from_slice(&mmap[range])), - ) - .await?; - - Ok(()) + .store_tracking("store", digest, || self.provider.store_file(digest, file)) + .await } + /// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the + /// bytes are already in memory pub async fn store_bytes(&self, bytes: Bytes) -> Result<(), String> { let digest = Digest::of_bytes(&bytes); self - .store_bytes_source(digest, Arc::new(move |range| bytes.slice(range))) + .store_tracking("store_bytes", digest, || { + self.provider.store_bytes(digest, bytes) + }) .await } - async fn store_bytes_source(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> { + async fn store_tracking( + &self, + workunit: &'static str, + digest: Digest, + do_store: DoStore, + ) -> Result<(), String> + where + DoStore: FnOnce() -> Fut + Send, + Fut: Future> + Send, + { in_workunit!( - "store_bytes", + workunit, Level::Trace, desc = Some(format!("Storing {digest:?}")), |workunit| async move { - let result = self.provider.store_bytes(digest, bytes).await; + let result = do_store().await; if result.is_ok() { workunit.record_observation( diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index 7395cbd671b..d78a368ca27 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -1,14 +1,15 @@ // Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::cmp::min; use std::collections::HashSet; use std::convert::TryInto; use std::fmt; +use std::io::Cursor; use std::sync::Arc; use std::time::Instant; use async_oncecell::OnceCell; use async_trait::async_trait; +use bytes::Bytes; use futures::{FutureExt, StreamExt}; use grpc_util::retry::{retry_call, status_is_retryable}; use grpc_util::{ @@ -22,14 +23,15 @@ use remexec::{ content_addressable_storage_client::ContentAddressableStorageClient, BatchUpdateBlobsRequest, ServerCapabilities, }; -use tokio::io::AsyncWriteExt; +use tokio::fs::File; +use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt}; use tokio::sync::Mutex; use tonic::{Code, Request, Status}; use workunit_store::{Metric, ObservationMetric}; use crate::RemoteOptions; -use super::{ByteSource, ByteStoreProvider, LoadDestination}; +use super::{ByteStoreProvider, LoadDestination}; pub struct Provider { instance_name: Option, @@ -52,6 +54,15 @@ enum ByteStoreError { Other(String), } +impl ByteStoreError { + fn is_retryable(&self) -> bool { + match self { + ByteStoreError::Grpc(status) => status_is_retryable(status), + ByteStoreError::Other(_) => false, + } + } +} + impl fmt::Display for ByteStoreError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -103,16 +114,12 @@ impl Provider { }) } - async fn store_bytes_source_batch( - &self, - digest: Digest, - bytes: ByteSource, - ) -> Result<(), ByteStoreError> { + async fn store_bytes_batch(&self, digest: Digest, bytes: Bytes) -> Result<(), ByteStoreError> { let request = BatchUpdateBlobsRequest { instance_name: self.instance_name.clone().unwrap_or_default(), requests: vec![remexec::batch_update_blobs_request::Request { digest: Some(digest.into()), - data: bytes(0..digest.size_bytes), + data: bytes, compressor: remexec::compressor::Value::Identity as i32, }], }; @@ -125,10 +132,10 @@ impl Provider { Ok(()) } - async fn store_bytes_source_stream( + async fn store_source_stream( &self, digest: Digest, - bytes: ByteSource, + source: Arc>, ) -> Result<(), ByteStoreError> { let len = digest.size_bytes; let instance_name = self.instance_name.clone().unwrap_or_default(); @@ -143,40 +150,80 @@ impl Provider { let mut client = self.byte_stream_client.as_ref().clone(); - let chunk_size_bytes = self.chunk_size_bytes; + // we have to communicate the (first) error reading the underlying reader out of band + let error_occurred = Arc::new(parking_lot::Mutex::new(None)); + let error_occurred_stream = error_occurred.clone(); - let stream = futures::stream::unfold((0, false), move |(offset, has_sent_any)| { - if offset >= len && has_sent_any { - futures::future::ready(None) - } else { - let next_offset = min(offset + chunk_size_bytes, len); - let req = protos::gen::google::bytestream::WriteRequest { + let chunk_size_bytes = self.chunk_size_bytes; + let stream = async_stream::stream! { + if len == 0 { + // if the reader is empty, the ReaderStream gives no elements, but we have to write at least + // one. + yield protos::gen::google::bytestream::WriteRequest { resource_name: resource_name.clone(), - write_offset: offset as i64, - finish_write: next_offset == len, - // TODO(tonic): Explore using the unreleased `Bytes` support in Prost from: - // https://github.com/danburkert/prost/pull/341 - data: bytes(offset..next_offset), + write_offset: 0, + finish_write: true, + data: Bytes::new(), }; - futures::future::ready(Some((req, (next_offset, true)))) + return; + } + + // Read the source in appropriately sized chunks. + // NB. it is possible that this doesn't fill each chunk fully (i.e. may not send + // `chunk_size_bytes` in each request). For the usual sources, this should be unlikely. + let mut source = source.lock().await; + let reader_stream = tokio_util::io::ReaderStream::with_capacity(&mut *source, chunk_size_bytes); + let mut num_seen_bytes = 0; + + for await read_result in reader_stream { + match read_result { + Ok(data) => { + let write_offset = num_seen_bytes as i64; + num_seen_bytes += data.len(); + yield protos::gen::google::bytestream::WriteRequest { + resource_name: resource_name.clone(), + write_offset, + finish_write: num_seen_bytes == len, + data, + } + }, + Err(err) => { + // reading locally hit an error, so store it for re-processing below + *error_occurred_stream.lock() = Some(err); + // cut off here, no point continuing + break; + } + } } - }); + }; // NB: We must box the future to avoid a stack overflow. // Explicit type annotation is a workaround for https://github.com/rust-lang/rust/issues/64552 let future: std::pin::Pin< Box> + Send>, - > = Box::pin(client.write(Request::new(stream)).map(|r| match r { - Err(err) => Err(ByteStoreError::Grpc(err)), - Ok(response) => { - let response = response.into_inner(); - if response.committed_size == len as i64 { - Ok(()) - } else { - Err(ByteStoreError::Other(format!( - "Uploading file with digest {:?}: want committed size {} but got {}", - digest, len, response.committed_size - ))) + > = Box::pin(client.write(Request::new(stream)).map(move |r| { + if let Some(ref read_err) = *error_occurred.lock() { + // check if reading `source` locally hit an error: if so, propagate that error (there will + // likely be a remote error too, because our write will be too short, but the local error is + // the interesting root cause) + return Err(ByteStoreError::Other(format!( + "Uploading file with digest {:?}: failed to read local source: {}", + digest, read_err + ))); + } + + match r { + Err(err) => Err(ByteStoreError::Grpc(err)), + Ok(response) => { + let response = response.into_inner(); + if response.committed_size == len as i64 { + Ok(()) + } else { + Err(ByteStoreError::Other(format!( + "Uploading file with digest {:?}: want committed size {} but got {}", + digest, len, response.committed_size + ))) + } } } })); @@ -207,7 +254,7 @@ impl Provider { #[async_trait] impl ByteStoreProvider for Provider { - async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> { + async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String> { let len = digest.size_bytes; let max_batch_total_size_bytes = { @@ -226,17 +273,41 @@ impl ByteStoreProvider for Provider { retry_call( bytes, - move |bytes| async move { + move |bytes, _| async move { if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config { - self.store_bytes_source_batch(digest, bytes).await + self.store_bytes_batch(digest, bytes).await } else { - self.store_bytes_source_stream(digest, bytes).await + self + .store_source_stream(digest, Arc::new(Mutex::new(Cursor::new(bytes)))) + .await } }, - |err| match err { - ByteStoreError::Grpc(status) => status_is_retryable(status), - ByteStoreError::Other(_) => false, + ByteStoreError::is_retryable, + ) + .await + .map_err(|e| e.to_string()) + } + + async fn store_file(&self, digest: Digest, file: File) -> Result<(), String> { + let source = Arc::new(Mutex::new(file)); + retry_call( + source, + move |source, retry_attempt| async move { + if retry_attempt > 0 { + // if we're retrying, we need to go back to the start of the source to start the whole + // read fresh + source.lock().await.rewind().await.map_err(|err| { + ByteStoreError::Other(format!( + "Uploading file with digest {digest:?}: failed to rewind before retry {retry_attempt}: {err}" + )) + })?; + } + + // A file might be small enough to write via the batch API, but we ignore that possibility + // for now, because these are expected to stored in the FSDB, and thus large + self.store_source_stream(digest, source).await }, + ByteStoreError::is_retryable, ) .await .map_err(|e| e.to_string()) @@ -268,7 +339,7 @@ impl ByteStoreProvider for Provider { retry_call( (client, request, destination), - move |(mut client, request, destination)| { + move |(mut client, request, destination), retry_attempt| { async move { let mut start_opt = Some(Instant::now()); let response = client.read(request).await?; @@ -290,7 +361,11 @@ impl ByteStoreProvider for Provider { let mut writer = destination.lock().await; let mut hasher = Hasher::new(); - writer.reset().await?; + if retry_attempt > 0 { + // if we're retrying, we need to clear out the destination to start the whole write + // fresh + writer.reset().await?; + } while let Some(response) = stream.next().await { let response = response?; writer.write_all(&response.data).await?; @@ -332,7 +407,7 @@ impl ByteStoreProvider for Provider { let client = self.cas_client.as_ref().clone(); let response = retry_call( client, - move |mut client| { + move |mut client, _| { let request = request.clone(); async move { client.find_missing_blobs(request).await } }, @@ -348,8 +423,4 @@ impl ByteStoreProvider for Provider { .map(|digest| digest.try_into()) .collect::, _>>() } - - fn chunk_size_bytes(&self) -> usize { - self.chunk_size_bytes - } } diff --git a/src/rust/engine/fs/store/src/remote/reapi_tests.rs b/src/rust/engine/fs/store/src/remote/reapi_tests.rs index a32f4c3f38c..23fc279709a 100644 --- a/src/rust/engine/fs/store/src/remote/reapi_tests.rs +++ b/src/rust/engine/fs/store/src/remote/reapi_tests.rs @@ -1,21 +1,21 @@ // Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, HashSet}; -use std::sync::Arc; use std::time::Duration; use bytes::Bytes; use grpc_util::tls; use hashing::{Digest, Fingerprint}; use mock::{RequestType, StubCAS}; +use tempfile::TempDir; use testutil::data::TestData; +use tokio::fs::File; use crate::remote::{ByteStoreProvider, RemoteOptions}; -use crate::tests::{big_file_bytes, big_file_fingerprint, new_cas}; +use crate::tests::{big_file_bytes, big_file_fingerprint, mk_tempfile, new_cas}; use crate::MEGABYTES; use super::reapi::Provider; -use super::ByteSource; fn remote_options( cas_address: String, @@ -46,10 +46,6 @@ async fn new_provider(cas: &StubCAS) -> Provider { .unwrap() } -fn byte_source(bytes: Bytes) -> ByteSource { - Arc::new(move |r| bytes.slice(r)) -} - async fn load_test(chunk_size: usize) { let testdata = TestData::roland(); let cas = new_cas(chunk_size); @@ -172,6 +168,134 @@ fn assert_cas_store( } } +#[tokio::test] +async fn store_file_one_chunk() { + let testdata = TestData::roland(); + let cas = StubCAS::empty(); + let provider = new_provider(&cas).await; + + provider + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) + .await + .unwrap(); + + assert_cas_store(&cas, testdata.fingerprint(), testdata.bytes(), 1, 1024) +} +#[tokio::test] +async fn store_file_multiple_chunks() { + let cas = StubCAS::empty(); + let chunk_size = 10 * 1024; + let provider = Provider::new(remote_options( + cas.address(), + chunk_size, + 0, // disable batch API, force streaming API + )) + .await + .unwrap(); + + let all_the_henries = big_file_bytes(); + let fingerprint = big_file_fingerprint(); + let digest = Digest::new(fingerprint, all_the_henries.len()); + + provider + .store_file(digest, mk_tempfile(Some(&all_the_henries)).await) + .await + .unwrap(); + + assert_cas_store(&cas, fingerprint, all_the_henries, 98, chunk_size) +} + +#[tokio::test] +async fn store_file_empty_file() { + let testdata = TestData::empty(); + let cas = StubCAS::empty(); + let provider = new_provider(&cas).await; + + provider + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) + .await + .unwrap(); + + assert_cas_store(&cas, testdata.fingerprint(), testdata.bytes(), 1, 1024) +} + +#[tokio::test] +async fn store_file_grpc_error() { + let testdata = TestData::roland(); + let cas = StubCAS::cas_always_errors(); + let provider = new_provider(&cas).await; + + let error = provider + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) + .await + .expect_err("Want err"); + assert!( + error.contains("StubCAS is configured to always fail"), + "Bad error message, got: {error}" + ); + + // retries: + assert_eq!( + cas.request_counts.lock().get(&RequestType::BSWrite), + Some(&3) + ); +} + +#[tokio::test] +async fn store_file_connection_error() { + let testdata = TestData::roland(); + let provider = Provider::new(remote_options( + "http://doesnotexist.example".to_owned(), + 10 * MEGABYTES, + crate::tests::STORE_BATCH_API_SIZE_LIMIT, + )) + .await + .unwrap(); + + let error = provider + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) + .await + .expect_err("Want err"); + assert!( + error.contains("Unavailable: \"error trying to connect: dns error"), + "Bad error message, got: {error}" + ); +} + +#[tokio::test] +async fn store_file_source_read_error_immediately() { + let testdata = TestData::roland(); + let cas = StubCAS::empty(); + let provider = new_provider(&cas).await; + + let temp_dir = TempDir::new().unwrap(); + let file_that_is_a_dir = File::open(temp_dir.path()).await.unwrap(); + + let error = provider + .store_file(testdata.digest(), file_that_is_a_dir) + .await + .expect_err("Want err"); + assert!( + error.contains("Is a directory"), + "Bad error message, got: {error}", + ) +} + +// TODO: it would also be good to validate the behaviour if the file reads start failing later +// (e.g. read 10 bytes, and then fail), if that's a thing that is possible. + #[tokio::test] async fn store_bytes_one_chunk() { let testdata = TestData::roland(); @@ -179,7 +303,7 @@ async fn store_bytes_one_chunk() { let provider = new_provider(&cas).await; provider - .store_bytes(testdata.digest(), byte_source(testdata.bytes())) + .store_bytes(testdata.digest(), testdata.bytes()) .await .unwrap(); @@ -202,7 +326,7 @@ async fn store_bytes_multiple_chunks() { let digest = Digest::new(fingerprint, all_the_henries.len()); provider - .store_bytes(digest, byte_source(all_the_henries.clone())) + .store_bytes(digest, all_the_henries.clone()) .await .unwrap(); @@ -216,7 +340,7 @@ async fn store_bytes_empty_file() { let provider = new_provider(&cas).await; provider - .store_bytes(testdata.digest(), byte_source(testdata.bytes())) + .store_bytes(testdata.digest(), testdata.bytes()) .await .unwrap(); @@ -230,7 +354,7 @@ async fn store_bytes_batch_grpc_error() { let provider = new_provider(&cas).await; let error = provider - .store_bytes(testdata.digest(), byte_source(testdata.bytes())) + .store_bytes(testdata.digest(), testdata.bytes()) .await .expect_err("Want err"); assert!( @@ -265,7 +389,7 @@ async fn store_bytes_write_stream_grpc_error() { let digest = Digest::new(fingerprint, all_the_henries.len()); let error = provider - .store_bytes(digest, byte_source(all_the_henries)) + .store_bytes(digest, all_the_henries) .await .expect_err("Want err"); assert!( @@ -292,7 +416,7 @@ async fn store_bytes_connection_error() { .unwrap(); let error = provider - .store_bytes(testdata.digest(), byte_source(testdata.bytes())) + .store_bytes(testdata.digest(), testdata.bytes()) .await .expect_err("Want err"); assert!( diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 6323d6c9782..34d6d07371e 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -1,7 +1,6 @@ // Copyright 2022 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, HashMap, HashSet}; -use std::io::Write; use std::sync::Arc; use std::time::Duration; @@ -10,11 +9,12 @@ use grpc_util::tls; use hashing::{Digest, Fingerprint}; use parking_lot::Mutex; use testutil::data::TestData; +use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use workunit_store::WorkunitStore; -use crate::remote::{ByteSource, ByteStore, ByteStoreProvider, LoadDestination, RemoteOptions}; -use crate::tests::new_cas; +use crate::remote::{ByteStore, ByteStoreProvider, LoadDestination, RemoteOptions}; +use crate::tests::{mk_tempfile, new_cas}; use crate::MEGABYTES; #[tokio::test] @@ -105,7 +105,7 @@ async fn load_file_existing() { let _ = WorkunitStore::setup_for_tests(); let store = new_byte_store(&testdata); - let file = mk_tempfile().await; + let file = mk_tempfile(None).await; let file = store .load_file(testdata.digest(), file) @@ -121,7 +121,7 @@ async fn load_file_missing() { let _ = WorkunitStore::setup_for_tests(); let (store, _) = empty_byte_store(); - let file = mk_tempfile().await; + let file = mk_tempfile(None).await; let result = store.load_file(TestData::roland().digest(), file).await; assert!(result.unwrap().is_none()); @@ -132,7 +132,7 @@ async fn load_file_provider_error() { let _ = WorkunitStore::setup_for_tests(); let store = byte_store_always_error_provider(); - let file = mk_tempfile().await; + let file = mk_tempfile(None).await; assert_error(store.load_file(TestData::roland().digest(), file).await); } @@ -157,19 +157,17 @@ async fn store_bytes_provider_error() { } #[tokio::test] -async fn store_buffered() { +async fn store_file() { let _ = WorkunitStore::setup_for_tests(); - let testdata = TestData::roland(); - let bytes = testdata.bytes(); let (store, provider) = empty_byte_store(); assert_eq!( store - .store_buffered(testdata.digest(), move |mut file| async move { - file.write_all(&bytes).unwrap(); - Ok(()) - }) + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await + ) .await, Ok(()) ); @@ -179,21 +177,17 @@ async fn store_buffered() { } #[tokio::test] -async fn store_buffered_provider_error() { +async fn store_file_provider_error() { let _ = WorkunitStore::setup_for_tests(); - let testdata = TestData::roland(); - let bytes = testdata.bytes(); - let store = byte_store_always_error_provider(); assert_error( store - .store_buffered(testdata.digest(), move |mut file| async move { - file.write_all(&bytes).unwrap(); - Ok(()) - }) - .await - .map_err(|e| e.to_string()), + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) + .await, ); } @@ -239,8 +233,7 @@ async fn list_missing_digests_provider_error() { #[tokio::test] async fn file_as_load_destination_reset() { - let mut file = mk_tempfile().await; - file.write_all(b"initial").await.unwrap(); + let mut file = mk_tempfile(Some(b"initial")).await; file.reset().await.unwrap(); assert_file_contents(file, "").await; @@ -268,14 +261,6 @@ fn byte_store_always_error_provider() -> ByteStore { ByteStore::new(None, AlwaysErrorProvider::new()) } -async fn mk_tempfile() -> tokio::fs::File { - let file = tokio::task::spawn_blocking(tempfile::tempfile) - .await - .unwrap() - .unwrap(); - tokio::fs::File::from_std(file) -} - async fn assert_file_contents(mut file: tokio::fs::File, expected: &str) { file.rewind().await.unwrap(); @@ -317,11 +302,16 @@ impl TestProvider { #[async_trait::async_trait] impl ByteStoreProvider for TestProvider { - async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> { - self - .blobs - .lock() - .insert(digest.hash, bytes(0..digest.size_bytes)); + async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String> { + self.blobs.lock().insert(digest.hash, bytes); + Ok(()) + } + + async fn store_file(&self, digest: Digest, mut file: File) -> Result<(), String> { + // just pull it all into memory + let mut bytes = Vec::new(); + file.read_to_end(&mut bytes).await.unwrap(); + self.blobs.lock().insert(digest.hash, Bytes::from(bytes)); Ok(()) } @@ -347,10 +337,6 @@ impl ByteStoreProvider for TestProvider { let blobs = self.blobs.lock(); Ok(digests.filter(|d| !blobs.contains_key(&d.hash)).collect()) } - - fn chunk_size_bytes(&self) -> usize { - 1234 - } } struct AlwaysErrorProvider; @@ -361,7 +347,11 @@ impl AlwaysErrorProvider { } #[async_trait::async_trait] impl ByteStoreProvider for AlwaysErrorProvider { - async fn store_bytes(&self, _: Digest, _: ByteSource) -> Result<(), String> { + async fn store_bytes(&self, _: Digest, _: Bytes) -> Result<(), String> { + Err("AlwaysErrorProvider always fails".to_owned()) + } + + async fn store_file(&self, _: Digest, _: File) -> Result<(), String> { Err("AlwaysErrorProvider always fails".to_owned()) } @@ -375,8 +365,4 @@ impl ByteStoreProvider for AlwaysErrorProvider { ) -> Result, String> { Err("AlwaysErrorProvider always fails".to_owned()) } - - fn chunk_size_bytes(&self) -> usize { - unreachable!("shouldn't call this") - } } diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index 693b0723938..b9b8f146c6f 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -19,6 +19,7 @@ use grpc_util::tls; use hashing::{Digest, Fingerprint}; use mock::{RequestType, StubCAS}; use protos::gen::build::bazel::remote::execution::v2 as remexec; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use workunit_store::WorkunitStore; use crate::{ @@ -69,6 +70,21 @@ pub async fn load_file_bytes(store: &Store, digest: Digest) -> Result) -> tokio::fs::File { + let file = tokio::task::spawn_blocking(tempfile::tempfile) + .await + .unwrap() + .unwrap(); + let mut file = tokio::fs::File::from_std(file); + + if let Some(contents) = contents { + file.write_all(contents).await.unwrap(); + file.rewind().await.unwrap(); + } + + file +} + /// /// Create a StubCas with a file and a directory inside. /// diff --git a/src/rust/engine/grpc_util/src/retry.rs b/src/rust/engine/grpc_util/src/retry.rs index e17cc661225..9c2aec454fa 100644 --- a/src/rust/engine/grpc_util/src/retry.rs +++ b/src/rust/engine/grpc_util/src/retry.rs @@ -24,7 +24,7 @@ pub fn status_is_retryable(status: &Status) -> bool { pub async fn retry_call(client: C, mut f: F, is_retryable: G) -> Result where C: Clone, - F: FnMut(C) -> Fut, + F: FnMut(C, u32) -> Fut, G: Fn(&E) -> bool, Fut: Future>, { @@ -43,7 +43,7 @@ where } let client2 = client.clone(); - let result_fut = f(client2); + let result_fut = f(client2, num_retries); let last_error = match result_fut.await { Ok(r) => return Ok(r), Err(err) => { @@ -97,21 +97,31 @@ mod tests { #[tokio::test] async fn retry_call_works_as_expected() { + // several retryable errors let client = MockClient::new(vec![ Err(MockError(true, "first")), Err(MockError(true, "second")), Ok(3_isize), Ok(4_isize), ]); + let mut expected_attempt = 0; let result = retry_call( client.clone(), - |client| async move { client.next().await }, + |client, attempt| { + // check `attempt` is being passed through as expected: starting with 0 for the first + // call, and incriminating for each one after + assert_eq!(attempt, expected_attempt); + expected_attempt += 1; + + async move { client.next().await } + }, |err| err.0, ) .await; assert_eq!(result, Ok(3_isize)); assert_eq!(client.values.lock().len(), 1); + // a non retryable error let client = MockClient::new(vec![ Err(MockError(true, "first")), Err(MockError(false, "second")), @@ -120,13 +130,14 @@ mod tests { ]); let result = retry_call( client.clone(), - |client| async move { client.next().await }, + |client, _| async move { client.next().await }, |err| err.0, ) .await; assert_eq!(result, Err(MockError(false, "second"))); assert_eq!(client.values.lock().len(), 2); + // retryable errors, but too many let client = MockClient::new(vec![ Err(MockError(true, "first")), Err(MockError(true, "second")), @@ -135,7 +146,7 @@ mod tests { ]); let result = retry_call( client.clone(), - |client| async move { client.next().await }, + |client, _| async move { client.next().await }, |err| err.0, ) .await; diff --git a/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs b/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs index f55c859d6d2..23b8ced179b 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs +++ b/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs @@ -68,7 +68,7 @@ impl ActionCacheProvider for Provider { let client = self.action_cache_client.as_ref().clone(); retry_call( client, - move |mut client| { + move |mut client, _| { let update_action_cache_request = remexec::UpdateActionResultRequest { instance_name: self.instance_name.clone().unwrap_or_else(|| "".to_owned()), action_digest: Some(action_digest.into()), @@ -98,7 +98,7 @@ impl ActionCacheProvider for Provider { let client = self.action_cache_client.as_ref().clone(); let response = retry_call( client, - move |mut client| { + move |mut client, _| { let request = remexec::GetActionResultRequest { action_digest: Some(action_digest.into()), instance_name: self.instance_name.clone().unwrap_or_default(), diff --git a/src/rust/engine/testutil/Cargo.toml b/src/rust/engine/testutil/Cargo.toml index 064cd85d48d..a20038c64af 100644 --- a/src/rust/engine/testutil/Cargo.toml +++ b/src/rust/engine/testutil/Cargo.toml @@ -13,3 +13,4 @@ grpc_util = { path = "../grpc_util" } hashing = { path = "../hashing" } prost = { workspace = true } protos = { path = "../protos" } +tokio = { workspace = true }