From 9e4da772a9e80137c40992d62b95258fa8e183e9 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Wed, 30 Aug 2023 14:36:08 +1000 Subject: [PATCH 01/19] Define ByteStoreProvider::store --- src/rust/engine/fs/store/src/remote.rs | 5 ++++- src/rust/engine/fs/store/src/remote/reapi.rs | 6 +++++- src/rust/engine/fs/store/src/remote_tests.rs | 12 +++++++++++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 431140a9f04..fd013ace144 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -14,7 +14,7 @@ use hashing::Digest; use log::Level; use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::ServerCapabilities; -use tokio::io::{AsyncSeekExt, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWrite}; use workunit_store::{in_workunit, ObservationMetric}; use crate::StoreError; @@ -24,9 +24,12 @@ mod reapi; mod reapi_tests; pub type ByteSource = Arc<(dyn Fn(Range) -> Bytes + Send + Sync + 'static)>; +pub type StoreSource = Box; #[async_trait] pub trait ByteStoreProvider: Sync + Send + 'static { + async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String>; + async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String>; async fn load( diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index de66e2f3849..9f2fdd37849 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -29,7 +29,7 @@ use workunit_store::{Metric, ObservationMetric}; use crate::RemoteOptions; -use super::{ByteSource, ByteStoreProvider, LoadDestination}; +use super::{ByteSource, ByteStoreProvider, LoadDestination, StoreSource}; pub struct Provider { instance_name: Option, @@ -232,6 +232,10 @@ impl ByteStoreProvider for Provider { result.map_err(|e| e.to_string()) } + async fn store(&self, _digest: Digest, _source: StoreSource) -> Result<(), String> { + unimplemented!() + } + async fn load( &self, digest: Digest, diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 6323d6c9782..d749bcedfee 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -13,7 +13,9 @@ use testutil::data::TestData; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use workunit_store::WorkunitStore; -use crate::remote::{ByteSource, ByteStore, ByteStoreProvider, LoadDestination, RemoteOptions}; +use crate::remote::{ + ByteSource, ByteStore, ByteStoreProvider, LoadDestination, RemoteOptions, StoreSource, +}; use crate::tests::new_cas; use crate::MEGABYTES; @@ -325,6 +327,10 @@ impl ByteStoreProvider for TestProvider { Ok(()) } + async fn store(&self, _digest: Digest, _file: StoreSource) -> Result<(), String> { + unimplemented!("shouldn't call this (yet)") + } + async fn load( &self, digest: Digest, @@ -365,6 +371,10 @@ impl ByteStoreProvider for AlwaysErrorProvider { Err("AlwaysErrorProvider always fails".to_owned()) } + async fn store(&self, _: Digest, _: StoreSource) -> Result<(), String> { + Err("AlwaysErrorProvider always fails".to_owned()) + } + async fn load(&self, _: Digest, _: &mut dyn LoadDestination) -> Result { Err("AlwaysErrorProvider always fails".to_owned()) } From d97b832f50e3fafea14cda1c28e7097e0bec1ead Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Wed, 30 Aug 2023 14:46:04 +1000 Subject: [PATCH 02/19] Implement ByteStore::store --- src/rust/engine/fs/store/src/remote.rs | 37 +++++++++++++++---- src/rust/engine/fs/store/src/remote_tests.rs | 39 ++++++++++++++++++-- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index fd013ace144..008bbeda1d2 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -17,7 +17,7 @@ use remexec::ServerCapabilities; use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWrite}; use workunit_store::{in_workunit, ObservationMetric}; -use crate::StoreError; +use super::StoreError; mod reapi; #[cfg(test)] @@ -117,6 +117,13 @@ impl ByteStore { self.provider.chunk_size_bytes() } + #[allow(dead_code)] + pub async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String> { + self + .store_tracking(digest, || self.provider.store(digest, source)) + .await + } + pub async fn store_buffered( &self, digest: Digest, @@ -158,10 +165,12 @@ impl ByteStore { }?); self - .store_bytes_source( - digest, - Arc::new(move |range| Bytes::copy_from_slice(&mmap[range])), - ) + .store_tracking(digest, || { + self.provider.store_bytes( + digest, + Arc::new(move |range| Bytes::copy_from_slice(&mmap[range])), + ) + }) .await?; Ok(()) @@ -170,17 +179,29 @@ impl ByteStore { pub async fn store_bytes(&self, bytes: Bytes) -> Result<(), String> { let digest = Digest::of_bytes(&bytes); self - .store_bytes_source(digest, Arc::new(move |range| bytes.slice(range))) + .store_tracking(digest, || { + self + .provider + .store_bytes(digest, Arc::new(move |range| bytes.slice(range))) + }) .await } - async fn store_bytes_source(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> { + async fn store_tracking( + &self, + digest: Digest, + do_store: DoStore, + ) -> Result<(), String> + where + DoStore: FnOnce() -> Fut + Send, + Fut: Future> + Send, + { in_workunit!( "store_bytes", Level::Trace, desc = Some(format!("Storing {digest:?}")), |workunit| async move { - let result = self.provider.store_bytes(digest, bytes).await; + let result = do_store().await; if result.is_ok() { workunit.record_observation( diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index d749bcedfee..84e17db4b25 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -1,7 +1,7 @@ // Copyright 2022 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, HashMap, HashSet}; -use std::io::Write; +use std::io::{Cursor, Write}; use std::sync::Arc; use std::time::Duration; @@ -199,6 +199,35 @@ async fn store_buffered_provider_error() { ); } +#[tokio::test] +async fn store() { + let _ = WorkunitStore::setup_for_tests(); + let testdata = TestData::roland(); + + let (store, provider) = empty_byte_store(); + assert_eq!( + store + .store(testdata.digest(), Box::new(Cursor::new(testdata.bytes()))) + .await, + Ok(()) + ); + + let blobs = provider.blobs.lock(); + assert_eq!(blobs.get(&testdata.fingerprint()), Some(&testdata.bytes())); +} + +#[tokio::test] +async fn store_provider_error() { + let _ = WorkunitStore::setup_for_tests(); + let testdata = TestData::roland(); + let store = byte_store_always_error_provider(); + assert_error( + store + .store(testdata.digest(), Box::new(Cursor::new(testdata.bytes()))) + .await, + ); +} + #[tokio::test] async fn list_missing_digests_none_missing() { let _ = WorkunitStore::setup_for_tests(); @@ -327,8 +356,12 @@ impl ByteStoreProvider for TestProvider { Ok(()) } - async fn store(&self, _digest: Digest, _file: StoreSource) -> Result<(), String> { - unimplemented!("shouldn't call this (yet)") + async fn store(&self, digest: Digest, mut file: StoreSource) -> Result<(), String> { + // just pull it all into memory + let mut bytes = Vec::new(); + tokio::io::copy(&mut file, &mut bytes).await.unwrap(); + self.blobs.lock().insert(digest.hash, Bytes::from(bytes)); + Ok(()) } async fn load( From bf251578b798efdcf41a8f755d34bee86e20c444 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Wed, 30 Aug 2023 14:59:41 +1000 Subject: [PATCH 03/19] Have Store call store when appropriate --- src/rust/engine/fs/store/src/lib.rs | 54 ++++++++++---------------- src/rust/engine/fs/store/src/remote.rs | 3 +- 2 files changed, 23 insertions(+), 34 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 240e368691a..b397e828cd1 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -806,15 +806,16 @@ impl Store { remote_store .clone() .maybe_upload(digest, async move { - // TODO(John Sirois): Consider allowing configuration of when to buffer large blobs - // to disk to be independent of the remote store wire chunk size. - if digest.size_bytes > remote_store.store.chunk_size_bytes() { - Self::store_large_blob_remote(local, remote_store.store, entry_type, digest) - .await - } else { - Self::store_small_blob_remote(local, remote_store.store, entry_type, digest) - .await - } + match local.load_from_fs(digest).await? { + Some(path) => { + Self::store_fsdb_blob_remote(remote_store.store, digest, path).await? + } + None => { + Self::store_lmdb_blob_remote(local, remote_store.store, entry_type, digest) + .await? + } + }; + Ok(()) }) .await } @@ -837,15 +838,17 @@ impl Store { .boxed() } - async fn store_small_blob_remote( + async fn store_lmdb_blob_remote( local: local::ByteStore, remote: remote::ByteStore, entry_type: EntryType, digest: Digest, ) -> Result<(), StoreError> { // We need to copy the bytes into memory so that they may be used safely in an async - // future. While this unfortunately increases memory consumption, we prioritize - // being able to run `remote.store_bytes()` as async. + // future. While this unfortunately increases memory consumption, we prioritize being + // able to run `remote.store_bytes()` as async. In addition, this is only used for + // blobs in the LMDB store, most of which are small: large blobs end up in the FSDB + // store. // // See https://github.com/pantsbuild/pants/pull/9793 for an earlier implementation // that used `Executor.block_on`, which avoided the clone but was blocking. @@ -863,31 +866,16 @@ impl Store { } } - async fn store_large_blob_remote( - local: local::ByteStore, + async fn store_fsdb_blob_remote( remote: remote::ByteStore, - entry_type: EntryType, digest: Digest, + path: PathBuf, ) -> Result<(), StoreError> { - remote - .store_buffered(digest, |mut buffer| async { - let result = local - .load_bytes_with(entry_type, digest, move |bytes| { - buffer.write_all(bytes).map_err(|e| { - format!("Failed to write {entry_type:?} {digest:?} to temporary buffer: {e}") - }) - }) - .await?; - match result { - None => Err(StoreError::MissingDigest( - format!("Failed to upload {entry_type:?}: Not found in local store",), - digest, - )), - Some(Err(err)) => Err(err.into()), - Some(Ok(())) => Ok(()), - } - }) + let file = tokio::fs::File::open(&path) .await + .map_err(|e| format!("failed to read {digest:?} from {path:?}: {e}"))?; + remote.store(digest, Box::new(file)).await?; + Ok(()) } /// diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 008bbeda1d2..be0167a030d 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -113,17 +113,18 @@ impl ByteStore { Ok(ByteStore::new(instance_name, provider)) } + #[allow(dead_code)] pub(crate) fn chunk_size_bytes(&self) -> usize { self.provider.chunk_size_bytes() } - #[allow(dead_code)] pub async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String> { self .store_tracking(digest, || self.provider.store(digest, source)) .await } + #[allow(dead_code)] pub async fn store_buffered( &self, digest: Digest, From bbefe4b1a3fc4d6b58d0ba67c23cb1a5945a8e5b Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Wed, 30 Aug 2023 15:04:00 +1000 Subject: [PATCH 04/19] remove chunk_size_bytes --- src/rust/engine/fs/store/src/remote.rs | 7 ------- src/rust/engine/fs/store/src/remote/reapi.rs | 4 ---- src/rust/engine/fs/store/src/remote_tests.rs | 8 -------- 3 files changed, 19 deletions(-) diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index be0167a030d..103030cb5e6 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -42,8 +42,6 @@ pub trait ByteStoreProvider: Sync + Send + 'static { &self, digests: &mut (dyn Iterator + Send), ) -> Result, String>; - - fn chunk_size_bytes(&self) -> usize; } // TODO: Consider providing `impl Default`, similar to `super::LocalOptions`. @@ -113,11 +111,6 @@ impl ByteStore { Ok(ByteStore::new(instance_name, provider)) } - #[allow(dead_code)] - pub(crate) fn chunk_size_bytes(&self) -> usize { - self.provider.chunk_size_bytes() - } - pub async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String> { self .store_tracking(digest, || self.provider.store(digest, source)) diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index 9f2fdd37849..0888ec66c28 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -342,8 +342,4 @@ impl ByteStoreProvider for Provider { .map(|digest| digest.try_into()) .collect::, _>>() } - - fn chunk_size_bytes(&self) -> usize { - self.chunk_size_bytes - } } diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 84e17db4b25..48d7829f2fe 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -386,10 +386,6 @@ impl ByteStoreProvider for TestProvider { let blobs = self.blobs.lock(); Ok(digests.filter(|d| !blobs.contains_key(&d.hash)).collect()) } - - fn chunk_size_bytes(&self) -> usize { - 1234 - } } struct AlwaysErrorProvider; @@ -418,8 +414,4 @@ impl ByteStoreProvider for AlwaysErrorProvider { ) -> Result, String> { Err("AlwaysErrorProvider always fails".to_owned()) } - - fn chunk_size_bytes(&self) -> usize { - unreachable!("shouldn't call this") - } } From 0554d9da56e49ad62190c4a8305d64e641e35400 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Wed, 30 Aug 2023 15:09:45 +1000 Subject: [PATCH 05/19] remove store_buffered --- src/rust/engine/Cargo.lock | 11 ---- src/rust/engine/Cargo.toml | 1 - src/rust/engine/fs/store/Cargo.toml | 1 - src/rust/engine/fs/store/src/remote.rs | 55 -------------------- src/rust/engine/fs/store/src/remote_tests.rs | 43 +-------------- 5 files changed, 1 insertion(+), 110 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index fac093bd71a..12111ceca9d 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -1964,16 +1964,6 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" -[[package]] -name = "memmap" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "memoffset" version = "0.6.4" @@ -3364,7 +3354,6 @@ dependencies = [ "lmdb-rkv", "log", "madvise", - "memmap", "mock", "num_cpus", "parking_lot 0.12.1", diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index 8b0798585f2..6e71b81eb50 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -244,7 +244,6 @@ lmdb-rkv = { git = "https://github.com/pantsbuild/lmdb-rs.git", rev = "6ae7a552a log = "0.4.17" madvise = "0.1" maplit = "1.0.1" -memmap = "0.7" nails = "0.13" nix = "0.25" notify = { git = "https://github.com/pantsbuild/notify", rev = "276af0f3c5f300bfd534941386ba2f3b3a022df7" } diff --git a/src/rust/engine/fs/store/Cargo.toml b/src/rust/engine/fs/store/Cargo.toml index b0f7d9ae095..b94bb19a7cf 100644 --- a/src/rust/engine/fs/store/Cargo.toml +++ b/src/rust/engine/fs/store/Cargo.toml @@ -25,7 +25,6 @@ itertools = { workspace = true } lmdb-rkv = { workspace = true } log = { workspace = true } madvise = { workspace = true } -memmap = { workspace = true } parking_lot = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 103030cb5e6..185f3b4d584 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -17,8 +17,6 @@ use remexec::ServerCapabilities; use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWrite}; use workunit_store::{in_workunit, ObservationMetric}; -use super::StoreError; - mod reapi; #[cfg(test)] mod reapi_tests; @@ -117,59 +115,6 @@ impl ByteStore { .await } - #[allow(dead_code)] - pub async fn store_buffered( - &self, - digest: Digest, - write_to_buffer: WriteToBuffer, - ) -> Result<(), StoreError> - where - WriteToBuffer: FnOnce(std::fs::File) -> WriteResult, - WriteResult: Future>, - { - let write_buffer = tempfile::tempfile().map_err(|e| { - format!("Failed to create a temporary blob upload buffer for {digest:?}: {e}") - })?; - let read_buffer = write_buffer.try_clone().map_err(|e| { - format!("Failed to create a read handle for the temporary upload buffer for {digest:?}: {e}") - })?; - write_to_buffer(write_buffer).await?; - - // Unsafety: Mmap presents an immutable slice of bytes, but the underlying file that is mapped - // could be mutated by another process. We guard against this by creating an anonymous - // temporary file and ensuring it is written to and closed via the only other handle to it in - // the code just above. - let mmap = Arc::new(unsafe { - let mapping = memmap::Mmap::map(&read_buffer).map_err(|e| { - format!("Failed to memory map the temporary file buffer for {digest:?}: {e}") - })?; - if let Err(err) = madvise::madvise( - mapping.as_ptr(), - mapping.len(), - madvise::AccessPattern::Sequential, - ) { - log::warn!( - "Failed to madvise(MADV_SEQUENTIAL) for the memory map of the temporary file buffer for \ - {digest:?}. Continuing with possible reduced performance: {err}", - digest = digest, - err = err - ) - } - Ok(mapping) as Result - }?); - - self - .store_tracking(digest, || { - self.provider.store_bytes( - digest, - Arc::new(move |range| Bytes::copy_from_slice(&mmap[range])), - ) - }) - .await?; - - Ok(()) - } - pub async fn store_bytes(&self, bytes: Bytes) -> Result<(), String> { let digest = Digest::of_bytes(&bytes); self diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 48d7829f2fe..99160856bac 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -1,7 +1,7 @@ // Copyright 2022 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, HashMap, HashSet}; -use std::io::{Cursor, Write}; +use std::io::Cursor; use std::sync::Arc; use std::time::Duration; @@ -158,47 +158,6 @@ async fn store_bytes_provider_error() { assert_error(store.store_bytes(TestData::roland().bytes()).await) } -#[tokio::test] -async fn store_buffered() { - let _ = WorkunitStore::setup_for_tests(); - - let testdata = TestData::roland(); - let bytes = testdata.bytes(); - - let (store, provider) = empty_byte_store(); - assert_eq!( - store - .store_buffered(testdata.digest(), move |mut file| async move { - file.write_all(&bytes).unwrap(); - Ok(()) - }) - .await, - Ok(()) - ); - - let blobs = provider.blobs.lock(); - assert_eq!(blobs.get(&testdata.fingerprint()), Some(&testdata.bytes())); -} - -#[tokio::test] -async fn store_buffered_provider_error() { - let _ = WorkunitStore::setup_for_tests(); - - let testdata = TestData::roland(); - let bytes = testdata.bytes(); - - let store = byte_store_always_error_provider(); - assert_error( - store - .store_buffered(testdata.digest(), move |mut file| async move { - file.write_all(&bytes).unwrap(); - Ok(()) - }) - .await - .map_err(|e| e.to_string()), - ); -} - #[tokio::test] async fn store() { let _ = WorkunitStore::setup_for_tests(); From 3d67be5c9d4caa091e1a411dc9956249088c8f5e Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Wed, 30 Aug 2023 16:36:31 +1000 Subject: [PATCH 06/19] Replace ByteSource with Bytes, for store_bytes --- src/rust/engine/Cargo.lock | 1 + src/rust/engine/fs/store/Cargo.toml | 1 + src/rust/engine/fs/store/src/remote.rs | 10 +- src/rust/engine/fs/store/src/remote/reapi.rs | 118 ++++++++++++------ .../engine/fs/store/src/remote/reapi_tests.rs | 16 +-- src/rust/engine/fs/store/src/remote_tests.rs | 13 +- 6 files changed, 91 insertions(+), 68 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 12111ceca9d..a79425ab506 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -3368,6 +3368,7 @@ dependencies = [ "testutil", "tokio", "tokio-rustls", + "tokio-util 0.7.8", "tonic", "tower-service", "tryfuture", diff --git a/src/rust/engine/fs/store/Cargo.toml b/src/rust/engine/fs/store/Cargo.toml index b94bb19a7cf..a0ae34a9358 100644 --- a/src/rust/engine/fs/store/Cargo.toml +++ b/src/rust/engine/fs/store/Cargo.toml @@ -35,6 +35,7 @@ task_executor = { path = "../../task_executor" } tempfile = { workspace = true } tokio-rustls = { workspace = true } tokio = { workspace = true, features = ["fs"] } +tokio-util = { workspace = true, features = ["io"] } tonic = { workspace = true, features = ["transport", "codegen", "tls", "tls-roots", "prost"] } tower-service = { workspace = true } tryfuture = { path = "../../tryfuture" } diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 185f3b4d584..9f711fc0035 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -2,7 +2,6 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, HashSet}; use std::fmt; -use std::ops::Range; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -21,14 +20,13 @@ mod reapi; #[cfg(test)] mod reapi_tests; -pub type ByteSource = Arc<(dyn Fn(Range) -> Bytes + Send + Sync + 'static)>; pub type StoreSource = Box; #[async_trait] pub trait ByteStoreProvider: Sync + Send + 'static { async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String>; - async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String>; + async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>; async fn load( &self, @@ -118,11 +116,7 @@ impl ByteStore { pub async fn store_bytes(&self, bytes: Bytes) -> Result<(), String> { let digest = Digest::of_bytes(&bytes); self - .store_tracking(digest, || { - self - .provider - .store_bytes(digest, Arc::new(move |range| bytes.slice(range))) - }) + .store_tracking(digest, || self.provider.store_bytes(digest, bytes)) .await } diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index 0888ec66c28..2abcfe9e7f0 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -1,14 +1,15 @@ // Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::cmp::min; use std::collections::HashSet; use std::convert::TryInto; use std::fmt; +use std::io::Cursor; use std::sync::Arc; use std::time::Instant; use async_oncecell::OnceCell; use async_trait::async_trait; +use bytes::Bytes; use futures::{FutureExt, StreamExt}; use grpc_util::retry::{retry_call, status_is_retryable}; use grpc_util::{ @@ -29,7 +30,7 @@ use workunit_store::{Metric, ObservationMetric}; use crate::RemoteOptions; -use super::{ByteSource, ByteStoreProvider, LoadDestination, StoreSource}; +use super::{ByteStoreProvider, LoadDestination, StoreSource}; pub struct Provider { instance_name: Option, @@ -103,16 +104,12 @@ impl Provider { }) } - async fn store_bytes_source_batch( - &self, - digest: Digest, - bytes: ByteSource, - ) -> Result<(), ByteStoreError> { + async fn store_bytes_batch(&self, digest: Digest, bytes: Bytes) -> Result<(), ByteStoreError> { let request = BatchUpdateBlobsRequest { instance_name: self.instance_name.clone().unwrap_or_default(), requests: vec![remexec::batch_update_blobs_request::Request { digest: Some(digest.into()), - data: bytes(0..digest.size_bytes), + data: bytes, compressor: remexec::compressor::Value::Identity as i32, }], }; @@ -125,10 +122,10 @@ impl Provider { Ok(()) } - async fn store_bytes_source_stream( + async fn store_source_stream( &self, digest: Digest, - bytes: ByteSource, + source: StoreSource, ) -> Result<(), ByteStoreError> { let len = digest.size_bytes; let instance_name = self.instance_name.clone().unwrap_or_default(); @@ -143,40 +140,79 @@ impl Provider { let mut client = self.byte_stream_client.as_ref().clone(); - let chunk_size_bytes = self.chunk_size_bytes; + // we have to communicate the (first) error reading the underlying reader out of band + let error_occurred = Arc::new(parking_lot::Mutex::new(None)); + let error_occurred_stream = error_occurred.clone(); - let stream = futures::stream::unfold((0, false), move |(offset, has_sent_any)| { - if offset >= len && has_sent_any { - futures::future::ready(None) - } else { - let next_offset = min(offset + chunk_size_bytes, len); - let req = protos::gen::google::bytestream::WriteRequest { + let stream = if len == 0 { + // if the reader is empty, the ReaderStream gives no elements, but we have to write at least + // one. + futures::stream::once(futures::future::ready( + protos::gen::google::bytestream::WriteRequest { resource_name: resource_name.clone(), - write_offset: offset as i64, - finish_write: next_offset == len, - // TODO(tonic): Explore using the unreleased `Bytes` support in Prost from: - // https://github.com/danburkert/prost/pull/341 - data: bytes(offset..next_offset), - }; - futures::future::ready(Some((req, (next_offset, true)))) - } - }); + write_offset: 0, + finish_write: true, + data: Bytes::new(), + }, + )) + .boxed() + } else { + // Read the source in appropriately sized chunks. + // NB. it is possible that this doesn't fill each chunk fully (i.e. may not send + // `chunk_size_bytes` in each request). For the usual sources, this should be unlikely. + tokio_util::io::ReaderStream::with_capacity(source, self.chunk_size_bytes) + .scan(0, move |num_seen_bytes, read_result| { + futures::future::ready(match read_result { + Ok(data) => { + let write_offset = *num_seen_bytes as i64; + *num_seen_bytes += data.len(); + + Some(protos::gen::google::bytestream::WriteRequest { + resource_name: resource_name.clone(), + write_offset, + finish_write: *num_seen_bytes == len, + data, + }) + } + Err(err) => { + // reading locally hit an error, so store it for re-processing below + *error_occurred_stream.lock() = Some(err); + // cut off here, no point continuing + None + } + }) + }) + .boxed() + }; // NB: We must box the future to avoid a stack overflow. // Explicit type annotation is a workaround for https://github.com/rust-lang/rust/issues/64552 let future: std::pin::Pin< Box> + Send>, - > = Box::pin(client.write(Request::new(stream)).map(|r| match r { - Err(err) => Err(ByteStoreError::Grpc(err)), - Ok(response) => { - let response = response.into_inner(); - if response.committed_size == len as i64 { - Ok(()) - } else { - Err(ByteStoreError::Other(format!( - "Uploading file with digest {:?}: want committed size {} but got {}", - digest, len, response.committed_size - ))) + > = Box::pin(client.write(Request::new(stream)).map(move |r| { + if let Some(ref read_err) = *error_occurred.lock() { + // check if reading `source` locally hit an error: if so, propagate that error (there will + // likely be a remote error too, because our write will be too short, but the local error is + // the interesting root cause) + return Err(ByteStoreError::Other(format!( + "Uploading file with digest {:?}: failed to read local source: {}", + digest, read_err + ))); + } + + match r { + Err(err) => Err(ByteStoreError::Grpc(err)), + Ok(response) => { + let response = response.into_inner(); + + if response.committed_size == len as i64 { + Ok(()) + } else { + Err(ByteStoreError::Other(format!( + "Uploading file with digest {:?}: want committed size {} but got {}", + digest, len, response.committed_size + ))) + } } } })); @@ -207,7 +243,7 @@ impl Provider { #[async_trait] impl ByteStoreProvider for Provider { - async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> { + async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String> { let len = digest.size_bytes; let max_batch_total_size_bytes = { @@ -225,9 +261,11 @@ impl ByteStoreProvider for Provider { max_batch_total_size_bytes == 0 || len < max_batch_total_size_bytes; let result = if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config { - self.store_bytes_source_batch(digest, bytes).await + self.store_bytes_batch(digest, bytes).await } else { - self.store_bytes_source_stream(digest, bytes).await + self + .store_source_stream(digest, Box::new(Cursor::new(bytes))) + .await }; result.map_err(|e| e.to_string()) } diff --git a/src/rust/engine/fs/store/src/remote/reapi_tests.rs b/src/rust/engine/fs/store/src/remote/reapi_tests.rs index 40375601fcf..0424ca7913d 100644 --- a/src/rust/engine/fs/store/src/remote/reapi_tests.rs +++ b/src/rust/engine/fs/store/src/remote/reapi_tests.rs @@ -1,7 +1,6 @@ // Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, HashSet}; -use std::sync::Arc; use std::time::Duration; use bytes::Bytes; @@ -15,7 +14,6 @@ use crate::tests::{big_file_bytes, big_file_fingerprint, new_cas}; use crate::MEGABYTES; use super::reapi::Provider; -use super::ByteSource; fn remote_options( cas_address: String, @@ -46,10 +44,6 @@ async fn new_provider(cas: &StubCAS) -> Provider { .unwrap() } -fn byte_source(bytes: Bytes) -> ByteSource { - Arc::new(move |r| bytes.slice(r)) -} - async fn load_test(chunk_size: usize) { let testdata = TestData::roland(); let cas = new_cas(chunk_size); @@ -174,7 +168,7 @@ async fn store_bytes_one_chunk() { let provider = new_provider(&cas).await; provider - .store_bytes(testdata.digest(), byte_source(testdata.bytes())) + .store_bytes(testdata.digest(), testdata.bytes()) .await .unwrap(); @@ -197,7 +191,7 @@ async fn store_bytes_multiple_chunks() { let digest = Digest::new(fingerprint, all_the_henries.len()); provider - .store_bytes(digest, byte_source(all_the_henries.clone())) + .store_bytes(digest, all_the_henries.clone()) .await .unwrap(); @@ -211,7 +205,7 @@ async fn store_bytes_empty_file() { let provider = new_provider(&cas).await; provider - .store_bytes(testdata.digest(), byte_source(testdata.bytes())) + .store_bytes(testdata.digest(), testdata.bytes()) .await .unwrap(); @@ -225,7 +219,7 @@ async fn store_bytes_grpc_error() { let provider = new_provider(&cas).await; let error = provider - .store_bytes(testdata.digest(), byte_source(testdata.bytes())) + .store_bytes(testdata.digest(), testdata.bytes()) .await .expect_err("Want err"); assert!( @@ -246,7 +240,7 @@ async fn store_bytes_connection_error() { .unwrap(); let error = provider - .store_bytes(testdata.digest(), byte_source(testdata.bytes())) + .store_bytes(testdata.digest(), testdata.bytes()) .await .expect_err("Want err"); assert!( diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 99160856bac..c12f85b7e54 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -13,9 +13,7 @@ use testutil::data::TestData; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use workunit_store::WorkunitStore; -use crate::remote::{ - ByteSource, ByteStore, ByteStoreProvider, LoadDestination, RemoteOptions, StoreSource, -}; +use crate::remote::{ByteStore, ByteStoreProvider, LoadDestination, RemoteOptions, StoreSource}; use crate::tests::new_cas; use crate::MEGABYTES; @@ -307,11 +305,8 @@ impl TestProvider { #[async_trait::async_trait] impl ByteStoreProvider for TestProvider { - async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> { - self - .blobs - .lock() - .insert(digest.hash, bytes(0..digest.size_bytes)); + async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String> { + self.blobs.lock().insert(digest.hash, bytes); Ok(()) } @@ -355,7 +350,7 @@ impl AlwaysErrorProvider { } #[async_trait::async_trait] impl ByteStoreProvider for AlwaysErrorProvider { - async fn store_bytes(&self, _: Digest, _: ByteSource) -> Result<(), String> { + async fn store_bytes(&self, _: Digest, _: Bytes) -> Result<(), String> { Err("AlwaysErrorProvider always fails".to_owned()) } From 80acaaa5e310762701293b14cc28b06eb3d5c48e Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Wed, 30 Aug 2023 16:47:28 +1000 Subject: [PATCH 07/19] Implement and test reapi store --- src/rust/engine/Cargo.lock | 1 + src/rust/engine/fs/store/src/remote/reapi.rs | 9 +- .../engine/fs/store/src/remote/reapi_tests.rs | 128 ++++++++++++++++++ src/rust/engine/testutil/Cargo.toml | 1 + src/rust/engine/testutil/src/lib.rs | 1 + src/rust/engine/testutil/src/stub_io.rs | 36 +++++ 6 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 src/rust/engine/testutil/src/stub_io.rs diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index a79425ab506..49639bb72d4 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -3531,6 +3531,7 @@ dependencies = [ "hashing", "prost", "protos", + "tokio", ] [[package]] diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index 2abcfe9e7f0..f0c2d17e2c2 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -270,8 +270,13 @@ impl ByteStoreProvider for Provider { result.map_err(|e| e.to_string()) } - async fn store(&self, _digest: Digest, _source: StoreSource) -> Result<(), String> { - unimplemented!() + async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String> { + // an arbitrary source (e.g. file) might be small enough to write via the batch API, but we + // ignore that possibility for now + self + .store_source_stream(digest, source) + .await + .map_err(|e| e.to_string()) } async fn load( diff --git a/src/rust/engine/fs/store/src/remote/reapi_tests.rs b/src/rust/engine/fs/store/src/remote/reapi_tests.rs index 0424ca7913d..bf12581042c 100644 --- a/src/rust/engine/fs/store/src/remote/reapi_tests.rs +++ b/src/rust/engine/fs/store/src/remote/reapi_tests.rs @@ -8,12 +8,14 @@ use grpc_util::tls; use hashing::{Digest, Fingerprint}; use mock::StubCAS; use testutil::data::TestData; +use testutil::stub_io::EventuallyFailingReader; use crate::remote::{ByteStoreProvider, RemoteOptions}; use crate::tests::{big_file_bytes, big_file_fingerprint, new_cas}; use crate::MEGABYTES; use super::reapi::Provider; +use super::StoreSource; fn remote_options( cas_address: String, @@ -161,6 +163,132 @@ fn assert_cas_store( } } +fn store_source(data: Bytes) -> StoreSource { + Box::new(std::io::Cursor::new(data)) +} + +#[tokio::test] +async fn store_one_chunk() { + let testdata = TestData::roland(); + let cas = StubCAS::empty(); + let provider = new_provider(&cas).await; + + provider + .store(testdata.digest(), store_source(testdata.bytes())) + .await + .unwrap(); + + assert_cas_store(&cas, testdata.fingerprint(), testdata.bytes(), 1, 1024) +} +#[tokio::test] +async fn store_multiple_chunks() { + let cas = StubCAS::empty(); + let chunk_size = 10 * 1024; + let provider = Provider::new(remote_options( + cas.address(), + chunk_size, + 0, // disable batch API, force streaming API + )) + .await + .unwrap(); + + let all_the_henries = big_file_bytes(); + let fingerprint = big_file_fingerprint(); + let digest = Digest::new(fingerprint, all_the_henries.len()); + + provider + .store(digest, store_source(all_the_henries.clone())) + .await + .unwrap(); + + assert_cas_store(&cas, fingerprint, all_the_henries, 98, chunk_size) +} + +#[tokio::test] +async fn store_empty_file() { + let testdata = TestData::empty(); + let cas = StubCAS::empty(); + let provider = new_provider(&cas).await; + + provider + .store(testdata.digest(), store_source(testdata.bytes())) + .await + .unwrap(); + + assert_cas_store(&cas, testdata.fingerprint(), testdata.bytes(), 1, 1024) +} + +#[tokio::test] +async fn store_grpc_error() { + let testdata = TestData::roland(); + let cas = StubCAS::cas_always_errors(); + let provider = new_provider(&cas).await; + + let error = provider + .store(testdata.digest(), store_source(testdata.bytes())) + .await + .expect_err("Want err"); + assert!( + error.contains("StubCAS is configured to always fail"), + "Bad error message, got: {error}" + ); +} + +#[tokio::test] +async fn store_connection_error() { + let testdata = TestData::roland(); + let provider = Provider::new(remote_options( + "http://doesnotexist.example".to_owned(), + 10 * MEGABYTES, + crate::tests::STORE_BATCH_API_SIZE_LIMIT, + )) + .await + .unwrap(); + + let error = provider + .store(testdata.digest(), store_source(testdata.bytes())) + .await + .expect_err("Want err"); + assert!( + error.contains("Unavailable: \"error trying to connect: dns error"), + "Bad error message, got: {error}" + ); +} + +#[tokio::test] +async fn store_source_read_error_immediately() { + let testdata = TestData::roland(); + let cas = StubCAS::empty(); + let provider = new_provider(&cas).await; + + let source = EventuallyFailingReader::new(0); + let error = provider + .store(testdata.digest(), Box::new(source)) + .await + .expect_err("Want err"); + assert!( + error.contains("EventuallyFailingReader hit its limit"), + "Bad error message, got: {error}", + ) +} + +#[tokio::test] +async fn store_source_read_error_later() { + let testdata = TestData::roland(); + let cas = StubCAS::empty(); + let provider = new_provider(&cas).await; + + let source = EventuallyFailingReader::new(5); + let error = provider + .store(testdata.digest(), Box::new(source)) + .await + .expect_err("Want err"); + assert!( + error.contains("EventuallyFailingReader hit its limit"), + "Bad error message, got: {error}", + ) +} + #[tokio::test] async fn store_bytes_one_chunk() { let testdata = TestData::roland(); diff --git a/src/rust/engine/testutil/Cargo.toml b/src/rust/engine/testutil/Cargo.toml index 064cd85d48d..a20038c64af 100644 --- a/src/rust/engine/testutil/Cargo.toml +++ b/src/rust/engine/testutil/Cargo.toml @@ -13,3 +13,4 @@ grpc_util = { path = "../grpc_util" } hashing = { path = "../hashing" } prost = { workspace = true } protos = { path = "../protos" } +tokio = { workspace = true } diff --git a/src/rust/engine/testutil/src/lib.rs b/src/rust/engine/testutil/src/lib.rs index 929e0d62f34..153b39c2741 100644 --- a/src/rust/engine/testutil/src/lib.rs +++ b/src/rust/engine/testutil/src/lib.rs @@ -35,6 +35,7 @@ use fs::RelativePath; pub mod data; pub mod file; pub mod path; +pub mod stub_io; pub fn owned_string_vec(args: &[&str]) -> Vec { args.iter().map(<&str>::to_string).collect() diff --git a/src/rust/engine/testutil/src/stub_io.rs b/src/rust/engine/testutil/src/stub_io.rs new file mode 100644 index 00000000000..3a78ea9ec31 --- /dev/null +++ b/src/rust/engine/testutil/src/stub_io.rs @@ -0,0 +1,36 @@ +use std::io::{Error, ErrorKind}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, ReadBuf}; + +/// A reader that does 1-byte reads for a while and then starts (consistently) failing +pub struct EventuallyFailingReader { + reads_before_failure: usize, +} +impl EventuallyFailingReader { + pub fn new(reads_before_failure: usize) -> EventuallyFailingReader { + EventuallyFailingReader { + reads_before_failure, + } + } +} + +impl AsyncRead for EventuallyFailingReader { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context, + buf: &mut ReadBuf, + ) -> Poll> { + let self_ref = self.get_mut(); + if self_ref.reads_before_failure == 0 { + Poll::Ready(Err(Error::new( + ErrorKind::Other, + "EventuallyFailingReader hit its limit", + ))) + } else { + self_ref.reads_before_failure -= 1; + buf.put_slice(&[0]); + Poll::Ready(Ok(())) + } + } +} From 708d6a88f89a2c6d35d9083817c5d5bfa3d6b8b9 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Wed, 30 Aug 2023 20:57:15 +1000 Subject: [PATCH 08/19] Docs and minor tweaks --- src/rust/engine/fs/store/src/remote.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 9f711fc0035..88bef103717 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -24,16 +24,22 @@ pub type StoreSource = Box; #[async_trait] pub trait ByteStoreProvider: Sync + Send + 'static { + /// Store the bytes readable from `source` into the remote store async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String>; + /// Store the bytes in `bytes` into the remote store, as an optimisation of `store` when the bytes + /// are already in memory async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>; + /// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns + /// true when found, false when not. async fn load( &self, digest: Digest, destination: &mut dyn LoadDestination, ) -> Result; + /// Return any digests from `digests` that are not (currently) available in the remote store. async fn list_missing_digests( &self, digests: &mut (dyn Iterator + Send), @@ -107,21 +113,27 @@ impl ByteStore { Ok(ByteStore::new(instance_name, provider)) } + /// Store the bytes readable from `source` into the remote store pub async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String> { self - .store_tracking(digest, || self.provider.store(digest, source)) + .store_tracking("store", digest, || self.provider.store(digest, source)) .await } + /// Store the bytes in `bytes` into the remote store, as an optimisation of `store` when the bytes + /// are already in memory pub async fn store_bytes(&self, bytes: Bytes) -> Result<(), String> { let digest = Digest::of_bytes(&bytes); self - .store_tracking(digest, || self.provider.store_bytes(digest, bytes)) + .store_tracking("store_bytes", digest, || { + self.provider.store_bytes(digest, bytes) + }) .await } async fn store_tracking( &self, + workunit: &'static str, digest: Digest, do_store: DoStore, ) -> Result<(), String> @@ -130,7 +142,7 @@ impl ByteStore { Fut: Future> + Send, { in_workunit!( - "store_bytes", + workunit, Level::Trace, desc = Some(format!("Storing {digest:?}")), |workunit| async move { From 704d75f04a19ad64307189a22ca97015f057d92e Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Thu, 31 Aug 2023 09:13:26 +1000 Subject: [PATCH 09/19] preamble --- src/rust/engine/testutil/src/stub_io.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/rust/engine/testutil/src/stub_io.rs b/src/rust/engine/testutil/src/stub_io.rs index 3a78ea9ec31..382dde3d570 100644 --- a/src/rust/engine/testutil/src/stub_io.rs +++ b/src/rust/engine/testutil/src/stub_io.rs @@ -1,3 +1,5 @@ +// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). use std::io::{Error, ErrorKind}; use std::pin::Pin; use std::task::{Context, Poll}; From cf5aa4685a001a743161025d09aacecd212f1902 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Fri, 8 Sep 2023 16:08:28 +1000 Subject: [PATCH 10/19] Rewrite to use async_stream --- src/rust/engine/fs/store/src/remote/reapi.rs | 52 +++++++++---------- .../engine/fs/store/src/remote/reapi_tests.rs | 2 +- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index 62bb2bf0669..2342b39bde4 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -144,45 +144,45 @@ impl Provider { let error_occurred = Arc::new(parking_lot::Mutex::new(None)); let error_occurred_stream = error_occurred.clone(); - let stream = if len == 0 { - // if the reader is empty, the ReaderStream gives no elements, but we have to write at least - // one. - futures::stream::once(futures::future::ready( - protos::gen::google::bytestream::WriteRequest { + let chunk_size_bytes = self.chunk_size_bytes; + let stream = async_stream::stream! { + if len == 0 { + // if the reader is empty, the ReaderStream gives no elements, but we have to write at least + // one. + yield protos::gen::google::bytestream::WriteRequest { resource_name: resource_name.clone(), write_offset: 0, finish_write: true, data: Bytes::new(), - }, - )) - .boxed() - } else { - // Read the source in appropriately sized chunks. - // NB. it is possible that this doesn't fill each chunk fully (i.e. may not send - // `chunk_size_bytes` in each request). For the usual sources, this should be unlikely. - tokio_util::io::ReaderStream::with_capacity(source, self.chunk_size_bytes) - .scan(0, move |num_seen_bytes, read_result| { - futures::future::ready(match read_result { + }; + } else { + // Read the source in appropriately sized chunks. + // NB. it is possible that this doesn't fill each chunk fully (i.e. may not send + // `chunk_size_bytes` in each request). For the usual sources, this should be unlikely. + let reader_stream = tokio_util::io::ReaderStream::with_capacity(source, chunk_size_bytes); + let mut num_seen_bytes = 0; + + for await read_result in reader_stream { + match read_result { Ok(data) => { - let write_offset = *num_seen_bytes as i64; - *num_seen_bytes += data.len(); - - Some(protos::gen::google::bytestream::WriteRequest { + let write_offset = num_seen_bytes as i64; + num_seen_bytes += data.len(); + yield protos::gen::google::bytestream::WriteRequest { resource_name: resource_name.clone(), write_offset, - finish_write: *num_seen_bytes == len, + finish_write: num_seen_bytes == len, data, - }) - } + } + }, Err(err) => { // reading locally hit an error, so store it for re-processing below *error_occurred_stream.lock() = Some(err); // cut off here, no point continuing - None + break; } - }) - }) - .boxed() + } + } + } }; // NB: We must box the future to avoid a stack overflow. diff --git a/src/rust/engine/fs/store/src/remote/reapi_tests.rs b/src/rust/engine/fs/store/src/remote/reapi_tests.rs index f8e175b693f..90b655b63a9 100644 --- a/src/rust/engine/fs/store/src/remote/reapi_tests.rs +++ b/src/rust/engine/fs/store/src/remote/reapi_tests.rs @@ -387,7 +387,7 @@ async fn store_bytes_write_stream_grpc_error() { let digest = Digest::new(fingerprint, all_the_henries.len()); let error = provider - .store_bytes(digest, byte_source(all_the_henries)) + .store_bytes(digest, all_the_henries) .await .expect_err("Want err"); assert!( From 3935016c771b699c483f3359cb1a51bb8879d121 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Sat, 9 Sep 2023 21:12:25 +1000 Subject: [PATCH 11/19] Implement seek for EventuallyFailingReader --- .../engine/fs/store/src/remote/reapi_tests.rs | 4 +- src/rust/engine/testutil/src/stub_io.rs | 67 +++++++++++++++---- 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/src/rust/engine/fs/store/src/remote/reapi_tests.rs b/src/rust/engine/fs/store/src/remote/reapi_tests.rs index 90b655b63a9..0ffe1c384b7 100644 --- a/src/rust/engine/fs/store/src/remote/reapi_tests.rs +++ b/src/rust/engine/fs/store/src/remote/reapi_tests.rs @@ -266,7 +266,7 @@ async fn store_source_read_error_immediately() { let cas = StubCAS::empty(); let provider = new_provider(&cas).await; - let source = EventuallyFailingReader::new(0); + let source = EventuallyFailingReader::new(0, 10); let error = provider .store(testdata.digest(), Box::new(source)) .await @@ -283,7 +283,7 @@ async fn store_source_read_error_later() { let cas = StubCAS::empty(); let provider = new_provider(&cas).await; - let source = EventuallyFailingReader::new(5); + let source = EventuallyFailingReader::new(5, testdata.len()); let error = provider .store(testdata.digest(), Box::new(source)) .await diff --git a/src/rust/engine/testutil/src/stub_io.rs b/src/rust/engine/testutil/src/stub_io.rs index 382dde3d570..e52d984f959 100644 --- a/src/rust/engine/testutil/src/stub_io.rs +++ b/src/rust/engine/testutil/src/stub_io.rs @@ -1,18 +1,36 @@ // Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::io::{Error, ErrorKind}; +use std::io::{Error, ErrorKind, SeekFrom}; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, ReadBuf}; +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; -/// A reader that does 1-byte reads for a while and then starts (consistently) failing +/// A seekable reader that does 1-byte reads for a while and then starts (consistently) failing. +/// +/// Failure is based on the number of individual operations, e.g. individual seek and read calls. pub struct EventuallyFailingReader { - reads_before_failure: usize, + operations_before_failure: usize, + position: u64, + end: u64, } impl EventuallyFailingReader { - pub fn new(reads_before_failure: usize) -> EventuallyFailingReader { + pub fn new(operations_before_failure: usize, size_bytes: usize) -> EventuallyFailingReader { EventuallyFailingReader { - reads_before_failure, + operations_before_failure, + position: 0, + end: size_bytes as u64, + } + } + + fn record_operation_and_check_error(&mut self) -> Result<(), Error> { + if self.operations_before_failure == 0 { + Err(Error::new( + ErrorKind::Other, + "EventuallyFailingReader hit its limit", + )) + } else { + self.operations_before_failure -= 1; + Ok(()) } } } @@ -24,15 +42,36 @@ impl AsyncRead for EventuallyFailingReader { buf: &mut ReadBuf, ) -> Poll> { let self_ref = self.get_mut(); - if self_ref.reads_before_failure == 0 { - Poll::Ready(Err(Error::new( - ErrorKind::Other, - "EventuallyFailingReader hit its limit", - ))) - } else { - self_ref.reads_before_failure -= 1; + let result = self_ref.record_operation_and_check_error(); + if result.is_ok() && self_ref.position < self_ref.end { buf.put_slice(&[0]); - Poll::Ready(Ok(())) + self_ref.position += 1; } + Poll::Ready(result) + } +} + +impl AsyncSeek for EventuallyFailingReader { + fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<(), Error> { + let self_ref = self.get_mut(); + let result = self_ref.record_operation_and_check_error(); + if result.is_ok() { + let end = i64::try_from(self_ref.end).expect("end too large"); + let position_from_start: i64 = match position { + SeekFrom::Start(offset) => offset.try_into().expect("offset too large"), + SeekFrom::End(offset) => end + offset, + SeekFrom::Current(offset) => { + i64::try_from(self_ref.position).expect("position too large") + offset + } + }; + self_ref.position = position_from_start.clamp(0, end) as u64; + } + + result + } + + fn poll_complete(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + // the "operation" is recorded as part of the corresponding `poll_complete` + Poll::Ready(Ok(self.get_mut().position)) } } From 3a481f567db17e0be3bfdd9d078d6b6ed70e4710 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Sat, 9 Sep 2023 21:21:43 +1000 Subject: [PATCH 12/19] Pass attempts in from retry_call --- src/rust/engine/fs/store/src/remote/reapi.rs | 12 ++++++++---- src/rust/engine/grpc_util/src/retry.rs | 18 +++++++++++++----- .../remote/src/remote_cache/reapi.rs | 4 ++-- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index 2342b39bde4..d5f36d06c0e 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -262,7 +262,7 @@ impl ByteStoreProvider for Provider { retry_call( bytes, - move |bytes| async move { + move |bytes, _| async move { if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config { self.store_bytes_batch(digest, bytes).await } else { @@ -315,7 +315,7 @@ impl ByteStoreProvider for Provider { retry_call( (client, request, destination), - move |(mut client, request, destination)| { + move |(mut client, request, destination), retry_attempt| { async move { let mut start_opt = Some(Instant::now()); let response = client.read(request).await?; @@ -337,7 +337,11 @@ impl ByteStoreProvider for Provider { let mut writer = destination.lock().await; let mut hasher = Hasher::new(); - writer.reset().await?; + if retry_attempt > 0 { + // if we're retrying, we need to clear out the destination to start the whole write + // fresh + writer.reset().await?; + } while let Some(response) = stream.next().await { let response = response?; writer.write_all(&response.data).await?; @@ -379,7 +383,7 @@ impl ByteStoreProvider for Provider { let client = self.cas_client.as_ref().clone(); let response = retry_call( client, - move |mut client| { + move |mut client, _| { let request = request.clone(); async move { client.find_missing_blobs(request).await } }, diff --git a/src/rust/engine/grpc_util/src/retry.rs b/src/rust/engine/grpc_util/src/retry.rs index e17cc661225..8cdc0b07a26 100644 --- a/src/rust/engine/grpc_util/src/retry.rs +++ b/src/rust/engine/grpc_util/src/retry.rs @@ -24,7 +24,7 @@ pub fn status_is_retryable(status: &Status) -> bool { pub async fn retry_call(client: C, mut f: F, is_retryable: G) -> Result where C: Clone, - F: FnMut(C) -> Fut, + F: FnMut(C, u32) -> Fut, G: Fn(&E) -> bool, Fut: Future>, { @@ -43,7 +43,7 @@ where } let client2 = client.clone(); - let result_fut = f(client2); + let result_fut = f(client2, num_retries); let last_error = match result_fut.await { Ok(r) => return Ok(r), Err(err) => { @@ -97,21 +97,28 @@ mod tests { #[tokio::test] async fn retry_call_works_as_expected() { + // several retryable errors let client = MockClient::new(vec![ Err(MockError(true, "first")), Err(MockError(true, "second")), Ok(3_isize), Ok(4_isize), ]); + let mut expected_attempt = 0; let result = retry_call( client.clone(), - |client| async move { client.next().await }, + |client, attempt| async move { + assert_eq!(attempt, expected_attempt); + expected_attempt += 1; + client.next().await + }, |err| err.0, ) .await; assert_eq!(result, Ok(3_isize)); assert_eq!(client.values.lock().len(), 1); + // a non retryable error let client = MockClient::new(vec![ Err(MockError(true, "first")), Err(MockError(false, "second")), @@ -120,13 +127,14 @@ mod tests { ]); let result = retry_call( client.clone(), - |client| async move { client.next().await }, + |client, _| async move { client.next().await }, |err| err.0, ) .await; assert_eq!(result, Err(MockError(false, "second"))); assert_eq!(client.values.lock().len(), 2); + // retryable errors, but too many let client = MockClient::new(vec![ Err(MockError(true, "first")), Err(MockError(true, "second")), @@ -135,7 +143,7 @@ mod tests { ]); let result = retry_call( client.clone(), - |client| async move { client.next().await }, + |client, _| async move { client.next().await }, |err| err.0, ) .await; diff --git a/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs b/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs index f55c859d6d2..23b8ced179b 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs +++ b/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs @@ -68,7 +68,7 @@ impl ActionCacheProvider for Provider { let client = self.action_cache_client.as_ref().clone(); retry_call( client, - move |mut client| { + move |mut client, _| { let update_action_cache_request = remexec::UpdateActionResultRequest { instance_name: self.instance_name.clone().unwrap_or_else(|| "".to_owned()), action_digest: Some(action_digest.into()), @@ -98,7 +98,7 @@ impl ActionCacheProvider for Provider { let client = self.action_cache_client.as_ref().clone(); let response = retry_call( client, - move |mut client| { + move |mut client, _| { let request = remexec::GetActionResultRequest { action_digest: Some(action_digest.into()), instance_name: self.instance_name.clone().unwrap_or_default(), From 4a49edf034f22382135119b1a879b0ac3418ce90 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Sun, 10 Sep 2023 07:39:04 +1000 Subject: [PATCH 13/19] Add retry to .store() --- src/rust/engine/fs/store/src/lib.rs | 4 +- src/rust/engine/fs/store/src/remote.rs | 12 ++++- src/rust/engine/fs/store/src/remote/reapi.rs | 48 ++++++++++++++----- .../engine/fs/store/src/remote/reapi_tests.rs | 35 ++++++++++++-- src/rust/engine/fs/store/src/remote_tests.rs | 12 +++-- 5 files changed, 87 insertions(+), 24 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index b397e828cd1..8d4a9aa38b1 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -874,7 +874,9 @@ impl Store { let file = tokio::fs::File::open(&path) .await .map_err(|e| format!("failed to read {digest:?} from {path:?}: {e}"))?; - remote.store(digest, Box::new(file)).await?; + remote + .store(digest, Arc::new(tokio::sync::Mutex::new(file))) + .await?; Ok(()) } diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 88bef103717..49631d6a5de 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -13,14 +13,18 @@ use hashing::Digest; use log::Level; use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::ServerCapabilities; -use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; +use tokio::sync::Mutex; use workunit_store::{in_workunit, ObservationMetric}; mod reapi; #[cfg(test)] mod reapi_tests; -pub type StoreSource = Box; +// this uses a Arc/Mutex because we need to have mutable access without losing ownership, due to +// retries and REAPI's ByteStreamClient::write requiring 'static (via tonic::IntoStreamingRequest), +// thus outlawing &mut +pub type StoreSource = Arc>; #[async_trait] pub trait ByteStoreProvider: Sync + Send + 'static { @@ -96,6 +100,10 @@ impl LoadDestination for Vec { } } +/// Blanket-implemented trait to approximate a `dyn AsyncRead + AsyncSeek + ...` trait object +pub trait StoreSource_: AsyncRead + AsyncSeek + Send + Sync + Unpin + 'static {} +impl StoreSource_ for T {} + impl ByteStore { pub fn new( instance_name: Option, diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index d5f36d06c0e..fc147c9d7fd 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -23,7 +23,7 @@ use remexec::{ content_addressable_storage_client::ContentAddressableStorageClient, BatchUpdateBlobsRequest, ServerCapabilities, }; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio::sync::Mutex; use tonic::{Code, Request, Status}; use workunit_store::{Metric, ObservationMetric}; @@ -53,6 +53,15 @@ enum ByteStoreError { Other(String), } +impl ByteStoreError { + fn is_retryable(&self) -> bool { + match self { + ByteStoreError::Grpc(status) => status_is_retryable(status), + ByteStoreError::Other(_) => false, + } + } +} + impl fmt::Display for ByteStoreError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -159,7 +168,8 @@ impl Provider { // Read the source in appropriately sized chunks. // NB. it is possible that this doesn't fill each chunk fully (i.e. may not send // `chunk_size_bytes` in each request). For the usual sources, this should be unlikely. - let reader_stream = tokio_util::io::ReaderStream::with_capacity(source, chunk_size_bytes); + let mut source = source.lock().await; + let reader_stream = tokio_util::io::ReaderStream::with_capacity(&mut *source, chunk_size_bytes); let mut num_seen_bytes = 0; for await read_result in reader_stream { @@ -267,26 +277,38 @@ impl ByteStoreProvider for Provider { self.store_bytes_batch(digest, bytes).await } else { self - .store_source_stream(digest, Box::new(Cursor::new(bytes))) + .store_source_stream(digest, Arc::new(Mutex::new(Cursor::new(bytes)))) .await } }, - |err| match err { - ByteStoreError::Grpc(status) => status_is_retryable(status), - ByteStoreError::Other(_) => false, - }, + ByteStoreError::is_retryable, ) .await .map_err(|e| e.to_string()) } async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String> { - // an arbitrary source (e.g. file) might be small enough to write via the batch API, but we - // ignore that possibility for now - self - .store_source_stream(digest, source) - .await - .map_err(|e| e.to_string()) + retry_call( + source, + move |source, retry_attempt| async move { + if retry_attempt > 0 { + // if we're retrying, we need to go back to the start of the source to start the whole + // read fresh + source.lock().await.rewind().await.map_err(|err| { + ByteStoreError::Other(format!( + "Uploading file with digest {digest:?}: failed to rewind before retry {retry_attempt}: {err}" + )) + })?; + } + + // an arbitrary source (e.g. file) might be small enough to write via the batch API, but we + // ignore that possibility for now + self.store_source_stream(digest, source).await + }, + ByteStoreError::is_retryable, + ) + .await + .map_err(|e| e.to_string()) } async fn load( diff --git a/src/rust/engine/fs/store/src/remote/reapi_tests.rs b/src/rust/engine/fs/store/src/remote/reapi_tests.rs index 0ffe1c384b7..f545b799dde 100644 --- a/src/rust/engine/fs/store/src/remote/reapi_tests.rs +++ b/src/rust/engine/fs/store/src/remote/reapi_tests.rs @@ -1,6 +1,7 @@ // Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, HashSet}; +use std::sync::Arc; use std::time::Duration; use bytes::Bytes; @@ -9,6 +10,7 @@ use hashing::{Digest, Fingerprint}; use mock::{RequestType, StubCAS}; use testutil::data::TestData; use testutil::stub_io::EventuallyFailingReader; +use tokio::sync::Mutex; use crate::remote::{ByteStoreProvider, RemoteOptions}; use crate::tests::{big_file_bytes, big_file_fingerprint, new_cas}; @@ -169,7 +171,7 @@ fn assert_cas_store( } fn store_source(data: Bytes) -> StoreSource { - Box::new(std::io::Cursor::new(data)) + Arc::new(Mutex::new(std::io::Cursor::new(data))) } #[tokio::test] @@ -237,6 +239,12 @@ async fn store_grpc_error() { error.contains("StubCAS is configured to always fail"), "Bad error message, got: {error}" ); + + // retries: + assert_eq!( + cas.request_counts.lock().get(&RequestType::BSWrite), + Some(&3) + ); } #[tokio::test] @@ -268,7 +276,7 @@ async fn store_source_read_error_immediately() { let source = EventuallyFailingReader::new(0, 10); let error = provider - .store(testdata.digest(), Box::new(source)) + .store(testdata.digest(), Arc::new(Mutex::new(source))) .await .expect_err("Want err"); assert!( @@ -285,11 +293,30 @@ async fn store_source_read_error_later() { let source = EventuallyFailingReader::new(5, testdata.len()); let error = provider - .store(testdata.digest(), Box::new(source)) + .store(testdata.digest(), Arc::new(Mutex::new(source))) .await .expect_err("Want err"); assert!( - error.contains("EventuallyFailingReader hit its limit"), + error.contains("failed to read local source") + && error.contains("EventuallyFailingReader hit its limit"), + "Bad error message, got: {error}", + ) +} + +#[tokio::test] +async fn store_source_read_error_later_rewind() { + let testdata = TestData::roland(); + let cas = StubCAS::cas_always_errors(); + let provider = new_provider(&cas).await; + + // fail to rewind after the second attempt + let source = EventuallyFailingReader::new(2 * testdata.len() + 3, testdata.len()); + let error = provider + .store(testdata.digest(), Arc::new(Mutex::new(source))) + .await + .expect_err("Want err"); + assert!( + error.contains("failed to rewind") && error.contains("EventuallyFailingReader hit its limit"), "Bad error message, got: {error}", ) } diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index c12f85b7e54..b5a49b5687e 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -156,6 +156,10 @@ async fn store_bytes_provider_error() { assert_error(store.store_bytes(TestData::roland().bytes()).await) } +fn store_source(data: Bytes) -> StoreSource { + Arc::new(tokio::sync::Mutex::new(Cursor::new(data))) +} + #[tokio::test] async fn store() { let _ = WorkunitStore::setup_for_tests(); @@ -164,7 +168,7 @@ async fn store() { let (store, provider) = empty_byte_store(); assert_eq!( store - .store(testdata.digest(), Box::new(Cursor::new(testdata.bytes()))) + .store(testdata.digest(), store_source(testdata.bytes())) .await, Ok(()) ); @@ -180,7 +184,7 @@ async fn store_provider_error() { let store = byte_store_always_error_provider(); assert_error( store - .store(testdata.digest(), Box::new(Cursor::new(testdata.bytes()))) + .store(testdata.digest(), store_source(testdata.bytes())) .await, ); } @@ -310,10 +314,10 @@ impl ByteStoreProvider for TestProvider { Ok(()) } - async fn store(&self, digest: Digest, mut file: StoreSource) -> Result<(), String> { + async fn store(&self, digest: Digest, file: StoreSource) -> Result<(), String> { // just pull it all into memory let mut bytes = Vec::new(); - tokio::io::copy(&mut file, &mut bytes).await.unwrap(); + file.lock().await.read_to_end(&mut bytes).await.unwrap(); self.blobs.lock().insert(digest.hash, Bytes::from(bytes)); Ok(()) } From 972255cf0eec0b5b32880b3fe378ff6ff7f5a115 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Sun, 10 Sep 2023 10:11:02 +1000 Subject: [PATCH 14/19] Rewrite retry_call attempt testing --- src/rust/engine/grpc_util/src/retry.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/rust/engine/grpc_util/src/retry.rs b/src/rust/engine/grpc_util/src/retry.rs index 8cdc0b07a26..9c2aec454fa 100644 --- a/src/rust/engine/grpc_util/src/retry.rs +++ b/src/rust/engine/grpc_util/src/retry.rs @@ -107,10 +107,13 @@ mod tests { let mut expected_attempt = 0; let result = retry_call( client.clone(), - |client, attempt| async move { + |client, attempt| { + // check `attempt` is being passed through as expected: starting with 0 for the first + // call, and incriminating for each one after assert_eq!(attempt, expected_attempt); expected_attempt += 1; - client.next().await + + async move { client.next().await } }, |err| err.0, ) From 2ca968bb31683be2111ac23f966ded2bd388ea36 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Tue, 12 Sep 2023 08:55:49 +1000 Subject: [PATCH 15/19] Review: store -> store_file using tokio::fs::File directly --- src/rust/engine/fs/store/src/lib.rs | 4 +- src/rust/engine/fs/store/src/remote.rs | 23 ++---- src/rust/engine/fs/store/src/remote/reapi.rs | 10 ++- .../engine/fs/store/src/remote/reapi_tests.rs | 77 +++++++------------ src/rust/engine/fs/store/src/remote_tests.rs | 43 +++++------ src/rust/engine/fs/store/src/tests.rs | 16 ++++ 6 files changed, 74 insertions(+), 99 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 8d4a9aa38b1..0a43aeacbdd 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -874,9 +874,7 @@ impl Store { let file = tokio::fs::File::open(&path) .await .map_err(|e| format!("failed to read {digest:?} from {path:?}: {e}"))?; - remote - .store(digest, Arc::new(tokio::sync::Mutex::new(file))) - .await?; + remote.store_file(digest, file).await?; Ok(()) } diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 49631d6a5de..3390b41a330 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -13,23 +13,18 @@ use hashing::Digest; use log::Level; use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::ServerCapabilities; -use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite}; -use tokio::sync::Mutex; +use tokio::fs::File; +use tokio::io::{AsyncSeekExt, AsyncWrite}; use workunit_store::{in_workunit, ObservationMetric}; mod reapi; #[cfg(test)] mod reapi_tests; -// this uses a Arc/Mutex because we need to have mutable access without losing ownership, due to -// retries and REAPI's ByteStreamClient::write requiring 'static (via tonic::IntoStreamingRequest), -// thus outlawing &mut -pub type StoreSource = Arc>; - #[async_trait] pub trait ByteStoreProvider: Sync + Send + 'static { - /// Store the bytes readable from `source` into the remote store - async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String>; + /// Store the bytes readable from `file` into the remote store + async fn store_file(&self, digest: Digest, file: File) -> Result<(), String>; /// Store the bytes in `bytes` into the remote store, as an optimisation of `store` when the bytes /// are already in memory @@ -100,10 +95,6 @@ impl LoadDestination for Vec { } } -/// Blanket-implemented trait to approximate a `dyn AsyncRead + AsyncSeek + ...` trait object -pub trait StoreSource_: AsyncRead + AsyncSeek + Send + Sync + Unpin + 'static {} -impl StoreSource_ for T {} - impl ByteStore { pub fn new( instance_name: Option, @@ -121,10 +112,10 @@ impl ByteStore { Ok(ByteStore::new(instance_name, provider)) } - /// Store the bytes readable from `source` into the remote store - pub async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String> { + /// Store the bytes readable from `file` into the remote store + pub async fn store_file(&self, digest: Digest, file: File) -> Result<(), String> { self - .store_tracking("store", digest, || self.provider.store(digest, source)) + .store_tracking("store", digest, || self.provider.store_file(digest, file)) .await } diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index fc147c9d7fd..5a7f2b80aa2 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -23,14 +23,15 @@ use remexec::{ content_addressable_storage_client::ContentAddressableStorageClient, BatchUpdateBlobsRequest, ServerCapabilities, }; -use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use tokio::fs::File; +use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWriteExt}; use tokio::sync::Mutex; use tonic::{Code, Request, Status}; use workunit_store::{Metric, ObservationMetric}; use crate::RemoteOptions; -use super::{ByteStoreProvider, LoadDestination, StoreSource}; +use super::{ByteStoreProvider, LoadDestination}; pub struct Provider { instance_name: Option, @@ -134,7 +135,7 @@ impl Provider { async fn store_source_stream( &self, digest: Digest, - source: StoreSource, + source: Arc>, ) -> Result<(), ByteStoreError> { let len = digest.size_bytes; let instance_name = self.instance_name.clone().unwrap_or_default(); @@ -287,7 +288,8 @@ impl ByteStoreProvider for Provider { .map_err(|e| e.to_string()) } - async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String> { + async fn store_file(&self, digest: Digest, file: File) -> Result<(), String> { + let source = Arc::new(Mutex::new(file)); retry_call( source, move |source, retry_attempt| async move { diff --git a/src/rust/engine/fs/store/src/remote/reapi_tests.rs b/src/rust/engine/fs/store/src/remote/reapi_tests.rs index f545b799dde..cd2be2d2734 100644 --- a/src/rust/engine/fs/store/src/remote/reapi_tests.rs +++ b/src/rust/engine/fs/store/src/remote/reapi_tests.rs @@ -1,23 +1,21 @@ // Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, HashSet}; -use std::sync::Arc; use std::time::Duration; use bytes::Bytes; use grpc_util::tls; use hashing::{Digest, Fingerprint}; use mock::{RequestType, StubCAS}; +use tempfile::TempDir; use testutil::data::TestData; -use testutil::stub_io::EventuallyFailingReader; -use tokio::sync::Mutex; +use tokio::fs::File; use crate::remote::{ByteStoreProvider, RemoteOptions}; -use crate::tests::{big_file_bytes, big_file_fingerprint, new_cas}; +use crate::tests::{big_file_bytes, big_file_fingerprint, mk_tempfile, new_cas}; use crate::MEGABYTES; use super::reapi::Provider; -use super::StoreSource; fn remote_options( cas_address: String, @@ -170,10 +168,6 @@ fn assert_cas_store( } } -fn store_source(data: Bytes) -> StoreSource { - Arc::new(Mutex::new(std::io::Cursor::new(data))) -} - #[tokio::test] async fn store_one_chunk() { let testdata = TestData::roland(); @@ -181,7 +175,10 @@ async fn store_one_chunk() { let provider = new_provider(&cas).await; provider - .store(testdata.digest(), store_source(testdata.bytes())) + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) .await .unwrap(); @@ -204,7 +201,7 @@ async fn store_multiple_chunks() { let digest = Digest::new(fingerprint, all_the_henries.len()); provider - .store(digest, store_source(all_the_henries.clone())) + .store_file(digest, mk_tempfile(Some(&all_the_henries)).await) .await .unwrap(); @@ -218,7 +215,10 @@ async fn store_empty_file() { let provider = new_provider(&cas).await; provider - .store(testdata.digest(), store_source(testdata.bytes())) + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) .await .unwrap(); @@ -232,7 +232,10 @@ async fn store_grpc_error() { let provider = new_provider(&cas).await; let error = provider - .store(testdata.digest(), store_source(testdata.bytes())) + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) .await .expect_err("Want err"); assert!( @@ -259,7 +262,10 @@ async fn store_connection_error() { .unwrap(); let error = provider - .store(testdata.digest(), store_source(testdata.bytes())) + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) .await .expect_err("Want err"); assert!( @@ -274,52 +280,21 @@ async fn store_source_read_error_immediately() { let cas = StubCAS::empty(); let provider = new_provider(&cas).await; - let source = EventuallyFailingReader::new(0, 10); - let error = provider - .store(testdata.digest(), Arc::new(Mutex::new(source))) - .await - .expect_err("Want err"); - assert!( - error.contains("EventuallyFailingReader hit its limit"), - "Bad error message, got: {error}", - ) -} - -#[tokio::test] -async fn store_source_read_error_later() { - let testdata = TestData::roland(); - let cas = StubCAS::empty(); - let provider = new_provider(&cas).await; + let temp_dir = TempDir::new().unwrap(); + let file_that_is_a_dir = File::open(temp_dir.path()).await.unwrap(); - let source = EventuallyFailingReader::new(5, testdata.len()); let error = provider - .store(testdata.digest(), Arc::new(Mutex::new(source))) + .store_file(testdata.digest(), file_that_is_a_dir) .await .expect_err("Want err"); assert!( - error.contains("failed to read local source") - && error.contains("EventuallyFailingReader hit its limit"), + error.contains("Is a directory"), "Bad error message, got: {error}", ) } -#[tokio::test] -async fn store_source_read_error_later_rewind() { - let testdata = TestData::roland(); - let cas = StubCAS::cas_always_errors(); - let provider = new_provider(&cas).await; - - // fail to rewind after the second attempt - let source = EventuallyFailingReader::new(2 * testdata.len() + 3, testdata.len()); - let error = provider - .store(testdata.digest(), Arc::new(Mutex::new(source))) - .await - .expect_err("Want err"); - assert!( - error.contains("failed to rewind") && error.contains("EventuallyFailingReader hit its limit"), - "Bad error message, got: {error}", - ) -} +// TODO: it would also be good to validate the behaviour if the file reads start failing later +// (e.g. read 10 bytes, and then fail), if that's a thing that is possible. #[tokio::test] async fn store_bytes_one_chunk() { diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index b5a49b5687e..06234e6b37a 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -1,7 +1,6 @@ // Copyright 2022 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, HashMap, HashSet}; -use std::io::Cursor; use std::sync::Arc; use std::time::Duration; @@ -10,11 +9,12 @@ use grpc_util::tls; use hashing::{Digest, Fingerprint}; use parking_lot::Mutex; use testutil::data::TestData; +use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use workunit_store::WorkunitStore; -use crate::remote::{ByteStore, ByteStoreProvider, LoadDestination, RemoteOptions, StoreSource}; -use crate::tests::new_cas; +use crate::remote::{ByteStore, ByteStoreProvider, LoadDestination, RemoteOptions}; +use crate::tests::{mk_tempfile, new_cas}; use crate::MEGABYTES; #[tokio::test] @@ -105,7 +105,7 @@ async fn load_file_existing() { let _ = WorkunitStore::setup_for_tests(); let store = new_byte_store(&testdata); - let file = mk_tempfile().await; + let file = mk_tempfile(None).await; let file = store .load_file(testdata.digest(), file) @@ -121,7 +121,7 @@ async fn load_file_missing() { let _ = WorkunitStore::setup_for_tests(); let (store, _) = empty_byte_store(); - let file = mk_tempfile().await; + let file = mk_tempfile(None).await; let result = store.load_file(TestData::roland().digest(), file).await; assert!(result.unwrap().is_none()); @@ -132,7 +132,7 @@ async fn load_file_provider_error() { let _ = WorkunitStore::setup_for_tests(); let store = byte_store_always_error_provider(); - let file = mk_tempfile().await; + let file = mk_tempfile(None).await; assert_error(store.load_file(TestData::roland().digest(), file).await); } @@ -156,10 +156,6 @@ async fn store_bytes_provider_error() { assert_error(store.store_bytes(TestData::roland().bytes()).await) } -fn store_source(data: Bytes) -> StoreSource { - Arc::new(tokio::sync::Mutex::new(Cursor::new(data))) -} - #[tokio::test] async fn store() { let _ = WorkunitStore::setup_for_tests(); @@ -168,7 +164,10 @@ async fn store() { let (store, provider) = empty_byte_store(); assert_eq!( store - .store(testdata.digest(), store_source(testdata.bytes())) + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await + ) .await, Ok(()) ); @@ -184,7 +183,10 @@ async fn store_provider_error() { let store = byte_store_always_error_provider(); assert_error( store - .store(testdata.digest(), store_source(testdata.bytes())) + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) .await, ); } @@ -231,8 +233,7 @@ async fn list_missing_digests_provider_error() { #[tokio::test] async fn file_as_load_destination_reset() { - let mut file = mk_tempfile().await; - file.write_all(b"initial").await.unwrap(); + let mut file = mk_tempfile(Some(b"initial")).await; file.reset().await.unwrap(); assert_file_contents(file, "").await; @@ -260,14 +261,6 @@ fn byte_store_always_error_provider() -> ByteStore { ByteStore::new(None, AlwaysErrorProvider::new()) } -async fn mk_tempfile() -> tokio::fs::File { - let file = tokio::task::spawn_blocking(tempfile::tempfile) - .await - .unwrap() - .unwrap(); - tokio::fs::File::from_std(file) -} - async fn assert_file_contents(mut file: tokio::fs::File, expected: &str) { file.rewind().await.unwrap(); @@ -314,10 +307,10 @@ impl ByteStoreProvider for TestProvider { Ok(()) } - async fn store(&self, digest: Digest, file: StoreSource) -> Result<(), String> { + async fn store_file(&self, digest: Digest, mut file: File) -> Result<(), String> { // just pull it all into memory let mut bytes = Vec::new(); - file.lock().await.read_to_end(&mut bytes).await.unwrap(); + file.read_to_end(&mut bytes).await.unwrap(); self.blobs.lock().insert(digest.hash, Bytes::from(bytes)); Ok(()) } @@ -358,7 +351,7 @@ impl ByteStoreProvider for AlwaysErrorProvider { Err("AlwaysErrorProvider always fails".to_owned()) } - async fn store(&self, _: Digest, _: StoreSource) -> Result<(), String> { + async fn store_file(&self, _: Digest, _: File) -> Result<(), String> { Err("AlwaysErrorProvider always fails".to_owned()) } diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index 693b0723938..b9b8f146c6f 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -19,6 +19,7 @@ use grpc_util::tls; use hashing::{Digest, Fingerprint}; use mock::{RequestType, StubCAS}; use protos::gen::build::bazel::remote::execution::v2 as remexec; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use workunit_store::WorkunitStore; use crate::{ @@ -69,6 +70,21 @@ pub async fn load_file_bytes(store: &Store, digest: Digest) -> Result) -> tokio::fs::File { + let file = tokio::task::spawn_blocking(tempfile::tempfile) + .await + .unwrap() + .unwrap(); + let mut file = tokio::fs::File::from_std(file); + + if let Some(contents) = contents { + file.write_all(contents).await.unwrap(); + file.rewind().await.unwrap(); + } + + file +} + /// /// Create a StubCas with a file and a directory inside. /// From f0813a24a8c9a0882fd7f92bef81a87b1ea28ecb Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Tue, 12 Sep 2023 08:56:11 +1000 Subject: [PATCH 16/19] Remove now-unused EventuallyFailingReader --- src/rust/engine/testutil/src/lib.rs | 1 - src/rust/engine/testutil/src/stub_io.rs | 77 ------------------------- 2 files changed, 78 deletions(-) delete mode 100644 src/rust/engine/testutil/src/stub_io.rs diff --git a/src/rust/engine/testutil/src/lib.rs b/src/rust/engine/testutil/src/lib.rs index 153b39c2741..929e0d62f34 100644 --- a/src/rust/engine/testutil/src/lib.rs +++ b/src/rust/engine/testutil/src/lib.rs @@ -35,7 +35,6 @@ use fs::RelativePath; pub mod data; pub mod file; pub mod path; -pub mod stub_io; pub fn owned_string_vec(args: &[&str]) -> Vec { args.iter().map(<&str>::to_string).collect() diff --git a/src/rust/engine/testutil/src/stub_io.rs b/src/rust/engine/testutil/src/stub_io.rs deleted file mode 100644 index e52d984f959..00000000000 --- a/src/rust/engine/testutil/src/stub_io.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). -// Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::io::{Error, ErrorKind, SeekFrom}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; - -/// A seekable reader that does 1-byte reads for a while and then starts (consistently) failing. -/// -/// Failure is based on the number of individual operations, e.g. individual seek and read calls. -pub struct EventuallyFailingReader { - operations_before_failure: usize, - position: u64, - end: u64, -} -impl EventuallyFailingReader { - pub fn new(operations_before_failure: usize, size_bytes: usize) -> EventuallyFailingReader { - EventuallyFailingReader { - operations_before_failure, - position: 0, - end: size_bytes as u64, - } - } - - fn record_operation_and_check_error(&mut self) -> Result<(), Error> { - if self.operations_before_failure == 0 { - Err(Error::new( - ErrorKind::Other, - "EventuallyFailingReader hit its limit", - )) - } else { - self.operations_before_failure -= 1; - Ok(()) - } - } -} - -impl AsyncRead for EventuallyFailingReader { - fn poll_read( - self: Pin<&mut Self>, - _: &mut Context, - buf: &mut ReadBuf, - ) -> Poll> { - let self_ref = self.get_mut(); - let result = self_ref.record_operation_and_check_error(); - if result.is_ok() && self_ref.position < self_ref.end { - buf.put_slice(&[0]); - self_ref.position += 1; - } - Poll::Ready(result) - } -} - -impl AsyncSeek for EventuallyFailingReader { - fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> Result<(), Error> { - let self_ref = self.get_mut(); - let result = self_ref.record_operation_and_check_error(); - if result.is_ok() { - let end = i64::try_from(self_ref.end).expect("end too large"); - let position_from_start: i64 = match position { - SeekFrom::Start(offset) => offset.try_into().expect("offset too large"), - SeekFrom::End(offset) => end + offset, - SeekFrom::Current(offset) => { - i64::try_from(self_ref.position).expect("position too large") + offset - } - }; - self_ref.position = position_from_start.clamp(0, end) as u64; - } - - result - } - - fn poll_complete(self: Pin<&mut Self>, _: &mut Context) -> Poll> { - // the "operation" is recorded as part of the corresponding `poll_complete` - Poll::Ready(Ok(self.get_mut().position)) - } -} From 22fc62181f799593c96f5748ca28a559e39bfc0b Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Tue, 12 Sep 2023 08:57:19 +1000 Subject: [PATCH 17/19] review: early return for empty stream --- src/rust/engine/fs/store/src/remote/reapi.rs | 51 ++++++++++---------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index 5a7f2b80aa2..f7c191fa76c 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -165,32 +165,33 @@ impl Provider { finish_write: true, data: Bytes::new(), }; - } else { - // Read the source in appropriately sized chunks. - // NB. it is possible that this doesn't fill each chunk fully (i.e. may not send - // `chunk_size_bytes` in each request). For the usual sources, this should be unlikely. - let mut source = source.lock().await; - let reader_stream = tokio_util::io::ReaderStream::with_capacity(&mut *source, chunk_size_bytes); - let mut num_seen_bytes = 0; - - for await read_result in reader_stream { - match read_result { - Ok(data) => { - let write_offset = num_seen_bytes as i64; - num_seen_bytes += data.len(); - yield protos::gen::google::bytestream::WriteRequest { - resource_name: resource_name.clone(), - write_offset, - finish_write: num_seen_bytes == len, - data, - } - }, - Err(err) => { - // reading locally hit an error, so store it for re-processing below - *error_occurred_stream.lock() = Some(err); - // cut off here, no point continuing - break; + return; + } + + // Read the source in appropriately sized chunks. + // NB. it is possible that this doesn't fill each chunk fully (i.e. may not send + // `chunk_size_bytes` in each request). For the usual sources, this should be unlikely. + let mut source = source.lock().await; + let reader_stream = tokio_util::io::ReaderStream::with_capacity(&mut *source, chunk_size_bytes); + let mut num_seen_bytes = 0; + + for await read_result in reader_stream { + match read_result { + Ok(data) => { + let write_offset = num_seen_bytes as i64; + num_seen_bytes += data.len(); + yield protos::gen::google::bytestream::WriteRequest { + resource_name: resource_name.clone(), + write_offset, + finish_write: num_seen_bytes == len, + data, } + }, + Err(err) => { + // reading locally hit an error, so store it for re-processing below + *error_occurred_stream.lock() = Some(err); + // cut off here, no point continuing + break; } } } From 0363e6bd5f17788b4393a4c1336854254b714574 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Tue, 12 Sep 2023 10:43:26 +1000 Subject: [PATCH 18/19] Missed minor updates for store -> store_file rename --- src/rust/engine/fs/store/src/remote.rs | 8 ++++---- src/rust/engine/fs/store/src/remote/reapi.rs | 4 ++-- src/rust/engine/fs/store/src/remote/reapi_tests.rs | 12 ++++++------ src/rust/engine/fs/store/src/remote_tests.rs | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 3390b41a330..ce0dc5882e3 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -26,8 +26,8 @@ pub trait ByteStoreProvider: Sync + Send + 'static { /// Store the bytes readable from `file` into the remote store async fn store_file(&self, digest: Digest, file: File) -> Result<(), String>; - /// Store the bytes in `bytes` into the remote store, as an optimisation of `store` when the bytes - /// are already in memory + /// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the + /// bytes are already in memory async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>; /// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns @@ -119,8 +119,8 @@ impl ByteStore { .await } - /// Store the bytes in `bytes` into the remote store, as an optimisation of `store` when the bytes - /// are already in memory + /// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the + /// bytes are already in memory pub async fn store_bytes(&self, bytes: Bytes) -> Result<(), String> { let digest = Digest::of_bytes(&bytes); self diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index f7c191fa76c..7ea35d74e00 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -304,8 +304,8 @@ impl ByteStoreProvider for Provider { })?; } - // an arbitrary source (e.g. file) might be small enough to write via the batch API, but we - // ignore that possibility for now + // A file might be small enough to write via the batch API, but we ignore that possibility + // for now, because these are expected to stored in the FSDB, and thus large self.store_source_stream(digest, source).await }, ByteStoreError::is_retryable, diff --git a/src/rust/engine/fs/store/src/remote/reapi_tests.rs b/src/rust/engine/fs/store/src/remote/reapi_tests.rs index cd2be2d2734..23fc279709a 100644 --- a/src/rust/engine/fs/store/src/remote/reapi_tests.rs +++ b/src/rust/engine/fs/store/src/remote/reapi_tests.rs @@ -169,7 +169,7 @@ fn assert_cas_store( } #[tokio::test] -async fn store_one_chunk() { +async fn store_file_one_chunk() { let testdata = TestData::roland(); let cas = StubCAS::empty(); let provider = new_provider(&cas).await; @@ -185,7 +185,7 @@ async fn store_one_chunk() { assert_cas_store(&cas, testdata.fingerprint(), testdata.bytes(), 1, 1024) } #[tokio::test] -async fn store_multiple_chunks() { +async fn store_file_multiple_chunks() { let cas = StubCAS::empty(); let chunk_size = 10 * 1024; let provider = Provider::new(remote_options( @@ -209,7 +209,7 @@ async fn store_multiple_chunks() { } #[tokio::test] -async fn store_empty_file() { +async fn store_file_empty_file() { let testdata = TestData::empty(); let cas = StubCAS::empty(); let provider = new_provider(&cas).await; @@ -226,7 +226,7 @@ async fn store_empty_file() { } #[tokio::test] -async fn store_grpc_error() { +async fn store_file_grpc_error() { let testdata = TestData::roland(); let cas = StubCAS::cas_always_errors(); let provider = new_provider(&cas).await; @@ -251,7 +251,7 @@ async fn store_grpc_error() { } #[tokio::test] -async fn store_connection_error() { +async fn store_file_connection_error() { let testdata = TestData::roland(); let provider = Provider::new(remote_options( "http://doesnotexist.example".to_owned(), @@ -275,7 +275,7 @@ async fn store_connection_error() { } #[tokio::test] -async fn store_source_read_error_immediately() { +async fn store_file_source_read_error_immediately() { let testdata = TestData::roland(); let cas = StubCAS::empty(); let provider = new_provider(&cas).await; diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 06234e6b37a..34d6d07371e 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -157,7 +157,7 @@ async fn store_bytes_provider_error() { } #[tokio::test] -async fn store() { +async fn store_file() { let _ = WorkunitStore::setup_for_tests(); let testdata = TestData::roland(); @@ -177,7 +177,7 @@ async fn store() { } #[tokio::test] -async fn store_provider_error() { +async fn store_file_provider_error() { let _ = WorkunitStore::setup_for_tests(); let testdata = TestData::roland(); let store = byte_store_always_error_provider(); From b775f5188bf11af0429d2a4f437e7b756b3db5f2 Mon Sep 17 00:00:00 2001 From: Huon Wilson Date: Tue, 12 Sep 2023 10:44:11 +1000 Subject: [PATCH 19/19] Reduce spurious changes --- src/rust/engine/fs/store/src/lib.rs | 8 ++++---- src/rust/engine/fs/store/src/remote/reapi.rs | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 0a43aeacbdd..54935c47232 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -845,10 +845,10 @@ impl Store { digest: Digest, ) -> Result<(), StoreError> { // We need to copy the bytes into memory so that they may be used safely in an async - // future. While this unfortunately increases memory consumption, we prioritize being - // able to run `remote.store_bytes()` as async. In addition, this is only used for - // blobs in the LMDB store, most of which are small: large blobs end up in the FSDB - // store. + // future. While this unfortunately increases memory consumption, we prioritize + // being able to run `remote.store_bytes()` as async. In addition, this is only used + // for blobs in the LMDB store, most of which are small: large blobs end up in the + // FSDB store. // // See https://github.com/pantsbuild/pants/pull/9793 for an earlier implementation // that used `Executor.block_on`, which avoided the clone but was blocking. diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/fs/store/src/remote/reapi.rs index 7ea35d74e00..d78a368ca27 100755 --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/fs/store/src/remote/reapi.rs @@ -216,7 +216,6 @@ impl Provider { Err(err) => Err(ByteStoreError::Grpc(err)), Ok(response) => { let response = response.into_inner(); - if response.committed_size == len as i64 { Ok(()) } else {