From a03782a7df9a62b47d730940f4a170a63760f998 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Tue, 8 Nov 2022 14:56:21 -0600 Subject: [PATCH 01/10] Combine into a single batched function, but not parallelized --- src/rust/engine/fs/store/src/lib.rs | 75 ++++++++------------ src/rust/engine/fs/store/src/tests.rs | 2 +- src/rust/engine/process_execution/src/lib.rs | 19 ++--- 3 files changed, 37 insertions(+), 59 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 1edfda79e52..dec655847e5 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -951,53 +951,40 @@ impl Store { Ok(missing.is_empty()) } - /// - /// Ensure that a directory is locally loadable, which will download it from the Remote store as - /// a sideeffect (if one is configured). - /// - pub async fn ensure_local_has_recursive_directory( + /// Ensure that the files are locally loadable. This will download them from the remote store as + /// a side effect, if one is configured. + pub async fn ensure_local_has_files( &self, - dir_digest: DirectoryDigest, + mut file_digests: Vec, + directory_digests: Vec, ) -> Result<(), StoreError> { - let mut file_digests = Vec::new(); - self - .load_digest_trie(dir_digest) - .await? - .walk(SymlinkBehavior::Aware, &mut |_, entry| match entry { - directory::Entry::File(f) => file_digests.push(f.digest()), - directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (), - }); - - let _ = future::try_join_all( - file_digests - .into_iter() - .map(|file_digest| self.ensure_local_has_file(file_digest)) - .collect::>(), - ) - .await?; - Ok(()) - } - - /// Ensure that a file is locally loadable, which will download it from the Remote store as - /// a side effect (if one is configured). Called only with the Digest of a File. - pub async fn ensure_local_has_file(&self, file_digest: Digest) -> Result<(), StoreError> { - if let Err(e) = self - .load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(())) - .await - { - log::debug!("Missing file digest from remote store: {:?}", file_digest); - in_workunit!( - "missing_file_counter", - Level::Trace, - |workunit| async move { - workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1); - }, - ) - .await; - Err(e) - } else { - Ok(()) + for dir_digest in directory_digests.into_iter() { + self + .load_digest_trie(dir_digest) + .await? + .walk(SymlinkBehavior::Aware, &mut |_, entry| match entry { + directory::Entry::File(f) => file_digests.push(f.digest()), + directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (), + }); } + for file_digest in file_digests { + if let Err(e) = self + .load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(())) + .await + { + log::debug!("Missing file digest from remote store: {:?}", file_digest); + in_workunit!( + "missing_file_counter", + Level::Trace, + |workunit| async move { + workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1); + }, + ) + .await; + return Err(e); + } + } + Ok(()) } /// Load a REv2 Tree from a remote CAS _without_ persisting the embedded Directory protos in diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index d41317907c7..adf41bea052 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -219,7 +219,7 @@ async fn load_recursive_directory() { .build(); new_store(dir.path(), &cas.address()) - .ensure_local_has_recursive_directory(recursive_testdir_digest.clone()) + .ensure_local_has_files(vec![], vec![recursive_testdir_digest.clone()]) .await .expect("Downloading recursive directory should have succeeded."); diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 4e67cbda56c..d7b84e7f278 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -833,20 +833,11 @@ pub(crate) async fn check_cache_content( match cache_content_behavior { CacheContentBehavior::Fetch => { let response = response.clone(); - let fetch_result = in_workunit!( - "eager_fetch_action_cache", - Level::Trace, - |_workunit| async move { - try_join_all(vec![ - store.ensure_local_has_file(response.stdout_digest).boxed(), - store.ensure_local_has_file(response.stderr_digest).boxed(), - store - .ensure_local_has_recursive_directory(response.output_directory) - .boxed(), - ]) - .await - } - ) + let fetch_result = in_workunit!("eager_fetch_action_cache", Level::Trace, |_workunit| store + .ensure_local_has_files( + vec![response.stdout_digest, response.stderr_digest], + vec![response.output_directory] + )) .await; match fetch_result { Err(StoreError::MissingDigest { .. }) => Ok(false), From 643c3c60216c4f8e4dc002ed2892f6c0ef262519 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Tue, 8 Nov 2022 15:09:56 -0600 Subject: [PATCH 02/10] Only operate on missing digests --- src/rust/engine/fs/store/src/lib.rs | 15 +++++++++++---- src/rust/engine/fs/store/src/tests.rs | 2 +- src/rust/engine/process_execution/src/lib.rs | 6 +++--- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index dec655847e5..70cf36a16d9 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -955,19 +955,26 @@ impl Store { /// a side effect, if one is configured. pub async fn ensure_local_has_files( &self, - mut file_digests: Vec, - directory_digests: Vec, + mut file_digests: HashSet, + directory_digests: HashSet, ) -> Result<(), StoreError> { for dir_digest in directory_digests.into_iter() { self .load_digest_trie(dir_digest) .await? .walk(SymlinkBehavior::Aware, &mut |_, entry| match entry { - directory::Entry::File(f) => file_digests.push(f.digest()), + directory::Entry::File(f) => { + file_digests.insert(f.digest()); + } directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (), }); } - for file_digest in file_digests { + let missing_file_digests = self + .local + .get_missing_digests(EntryType::File, file_digests) + .await?; + + for file_digest in missing_file_digests { if let Err(e) = self .load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(())) .await diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index adf41bea052..26717f1e618 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -219,7 +219,7 @@ async fn load_recursive_directory() { .build(); new_store(dir.path(), &cas.address()) - .ensure_local_has_files(vec![], vec![recursive_testdir_digest.clone()]) + .ensure_local_has_files(vec![], vec![recursive_testdir_digest.clone().collect()]) .await .expect("Downloading recursive directory should have succeeded."); diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index d7b84e7f278..bb058d751ba 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -27,7 +27,7 @@ #[macro_use] extern crate derivative; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::convert::{TryFrom, TryInto}; use std::fmt::{self, Debug, Display}; use std::path::PathBuf; @@ -835,8 +835,8 @@ pub(crate) async fn check_cache_content( let response = response.clone(); let fetch_result = in_workunit!("eager_fetch_action_cache", Level::Trace, |_workunit| store .ensure_local_has_files( - vec![response.stdout_digest, response.stderr_digest], - vec![response.output_directory] + HashSet::from([response.stdout_digest, response.stderr_digest]), + HashSet::from([response.output_directory]) )) .await; match fetch_result { From 2bba68cca07fb5072279390b038606f2d8731060 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Wed, 9 Nov 2022 10:54:38 -0600 Subject: [PATCH 03/10] Parallelize directory digest handling --- src/rust/engine/fs/store/src/lib.rs | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 70cf36a16d9..e7c7ebe0345 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -958,17 +958,23 @@ impl Store { mut file_digests: HashSet, directory_digests: HashSet, ) -> Result<(), StoreError> { - for dir_digest in directory_digests.into_iter() { - self - .load_digest_trie(dir_digest) - .await? - .walk(SymlinkBehavior::Aware, &mut |_, entry| match entry { - directory::Entry::File(f) => { - file_digests.insert(f.digest()); - } - directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (), - }); - } + let file_digests_from_directories = + future::try_join_all(directory_digests.into_iter().map(|dir_digest| async move { + let mut maybe_file_digest = None; + self + .load_digest_trie(dir_digest) + .await? + .walk(SymlinkBehavior::Aware, &mut |_, entry| match entry { + directory::Entry::File(f) => { + maybe_file_digest = Some(f.digest()); + } + directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (), + }); + Ok::<_, StoreError>(maybe_file_digest) + })) + .await?; + file_digests.extend(file_digests_from_directories.into_iter().flatten()); + let missing_file_digests = self .local .get_missing_digests(EntryType::File, file_digests) From e5f6a4df4484e65780b9195166e6a31e1b8b04b0 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Wed, 9 Nov 2022 12:51:00 -0600 Subject: [PATCH 04/10] Only download, don't load the bytes --- src/rust/engine/fs/store/src/lib.rs | 109 +++++++++++++++++----------- 1 file changed, 66 insertions(+), 43 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index e7c7ebe0345..dadba5ecf86 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -229,6 +229,59 @@ impl RemoteStore { .await .map(|&()| ()) } + + /// Download the digest to the local byte store from this remote store. The function `f_remote` + /// can be used to validate/transform the bytes. + async fn download_digest_to_local< + FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static, + >( + &self, + local_store: local::ByteStore, + digest: Digest, + entry_type: EntryType, + f_remote: FRemote, + ) -> Result<(), StoreError> { + let remote_store = self.store.clone(); + self + .maybe_download(digest, async move { + // TODO: Now that we always copy from the remote store to the local store before executing + // the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no + // longer needs to accept a function. + let bytes = retry_call( + remote_store, + |remote_store| async move { remote_store.load_bytes_with(digest, Ok).await }, + |err| match err { + ByteStoreError::Grpc(status) => status_is_retryable(status), + _ => false, + }, + ) + .await + .map_err(|err| match err { + ByteStoreError::Grpc(status) => status_to_str(status), + ByteStoreError::Other(msg) => msg, + })? + .ok_or_else(|| { + StoreError::MissingDigest( + "Was not present in either the local or remote store".to_owned(), + digest, + ) + })?; + + f_remote(bytes.clone())?; + let stored_digest = local_store + .store_bytes(entry_type, None, bytes, true) + .await?; + if digest == stored_digest { + Ok(()) + } else { + Err(StoreError::Unclassified(format!( + "CAS gave wrong digest: expected {:?}, got {:?}", + digest, stored_digest + ))) + } + }) + .await + } } /// @@ -674,9 +727,6 @@ impl Store { f_local: FLocal, f_remote: FRemote, ) -> Result { - let local = self.local.clone(); - let maybe_remote = self.remote.clone(); - if let Some(bytes_res) = self .local .load_bytes_with(entry_type, digest, f_local.clone()) @@ -685,47 +735,11 @@ impl Store { return Ok(bytes_res?); } - let remote = maybe_remote.ok_or_else(|| { + let remote = self.remote.clone().ok_or_else(|| { StoreError::MissingDigest("Was not present in the local store".to_owned(), digest) })?; - let remote_store = remote.store.clone(); - remote - .maybe_download(digest, async move { - // TODO: Now that we always copy from the remote store to the local store before executing - // the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no - // longer needs to accept a function. - let bytes = retry_call( - remote_store, - |remote_store| async move { remote_store.load_bytes_with(digest, Ok).await }, - |err| match err { - ByteStoreError::Grpc(status) => status_is_retryable(status), - _ => false, - }, - ) - .await - .map_err(|err| match err { - ByteStoreError::Grpc(status) => status_to_str(status), - ByteStoreError::Other(msg) => msg, - })? - .ok_or_else(|| { - StoreError::MissingDigest( - "Was not present in either the local or remote store".to_owned(), - digest, - ) - })?; - - f_remote(bytes.clone())?; - let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?; - if digest == stored_digest { - Ok(()) - } else { - Err(StoreError::Unclassified(format!( - "CAS gave wrong digest: expected {:?}, got {:?}", - digest, stored_digest - ))) - } - }) + .download_digest_to_local(self.local.clone(), digest, entry_type, f_remote) .await?; Ok( @@ -979,10 +993,19 @@ impl Store { .local .get_missing_digests(EntryType::File, file_digests) .await?; + if missing_file_digests.is_empty() { + return Ok(()); + } + let remote = self.remote.clone().ok_or_else(|| { + StoreError::MissingDigest( + "Was not present in the local store".to_owned(), + *missing_file_digests.iter().next().unwrap(), + ) + })?; for file_digest in missing_file_digests { - if let Err(e) = self - .load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(())) + if let Err(e) = remote + .download_digest_to_local(self.local.clone(), file_digest, EntryType::File, |_| Ok(())) .await { log::debug!("Missing file digest from remote store: {:?}", file_digest); From 31b05439d96268de725f35df349e00309728abf0 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Wed, 9 Nov 2022 13:02:52 -0600 Subject: [PATCH 05/10] Parallelize --- src/rust/engine/fs/store/src/lib.rs | 42 ++++++++++++++++------------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index dadba5ecf86..45be25de5ee 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -997,29 +997,35 @@ impl Store { return Ok(()); } - let remote = self.remote.clone().ok_or_else(|| { + let remote = &self.remote.clone().ok_or_else(|| { StoreError::MissingDigest( "Was not present in the local store".to_owned(), *missing_file_digests.iter().next().unwrap(), ) })?; - for file_digest in missing_file_digests { - if let Err(e) = remote - .download_digest_to_local(self.local.clone(), file_digest, EntryType::File, |_| Ok(())) - .await - { - log::debug!("Missing file digest from remote store: {:?}", file_digest); - in_workunit!( - "missing_file_counter", - Level::Trace, - |workunit| async move { - workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1); - }, - ) - .await; - return Err(e); - } - } + let _ = future::try_join_all( + missing_file_digests + .into_iter() + .map(|file_digest| async move { + if let Err(e) = remote + .download_digest_to_local(self.local.clone(), file_digest, EntryType::File, |_| Ok(())) + .await + { + log::debug!("Missing file digest from remote store: {:?}", file_digest); + in_workunit!( + "missing_file_counter", + Level::Trace, + |workunit| async move { + workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1); + }, + ) + .await; + return Err(e); + } + Ok(()) + }), + ) + .await?; Ok(()) } From 0e76b700d7f4700bdf52225696aa10fe1d325637 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Wed, 9 Nov 2022 13:06:27 -0600 Subject: [PATCH 06/10] Fix test --- src/rust/engine/fs/store/src/tests.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index 26717f1e618..b459075c96a 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs::File; use std::io::Read; use std::os::unix::fs::PermissionsExt; @@ -219,7 +219,10 @@ async fn load_recursive_directory() { .build(); new_store(dir.path(), &cas.address()) - .ensure_local_has_files(vec![], vec![recursive_testdir_digest.clone().collect()]) + .ensure_local_has_files( + HashSet::new(), + HashSet::from([recursive_testdir_digest.clone().collect()]), + ) .await .expect("Downloading recursive directory should have succeeded."); From 839251f8cff8f1100bb5d512ea83749d4a49fc47 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Wed, 9 Nov 2022 17:15:37 -0600 Subject: [PATCH 07/10] Initial feedback --- src/rust/engine/fs/store/src/lib.rs | 10 +++++----- src/rust/engine/fs/store/src/tests.rs | 2 +- src/rust/engine/process_execution/src/lib.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 45be25de5ee..33d76b38a86 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -231,7 +231,7 @@ impl RemoteStore { } /// Download the digest to the local byte store from this remote store. The function `f_remote` - /// can be used to validate/transform the bytes. + /// can be used to validate the bytes. async fn download_digest_to_local< FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static, >( @@ -244,9 +244,9 @@ impl RemoteStore { let remote_store = self.store.clone(); self .maybe_download(digest, async move { - // TODO: Now that we always copy from the remote store to the local store before executing - // the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no - // longer needs to accept a function. + // TODO(#17065): Now that we always copy from the remote store to the local store before + // executing the caller's logic against the local store, + // `remote::ByteStore::load_bytes_with` no longer needs to accept a function. let bytes = retry_call( remote_store, |remote_store| async move { remote_store.load_bytes_with(digest, Ok).await }, @@ -967,7 +967,7 @@ impl Store { /// Ensure that the files are locally loadable. This will download them from the remote store as /// a side effect, if one is configured. - pub async fn ensure_local_has_files( + pub async fn ensure_downloaded( &self, mut file_digests: HashSet, directory_digests: HashSet, diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index b459075c96a..aa4b259d96d 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -219,7 +219,7 @@ async fn load_recursive_directory() { .build(); new_store(dir.path(), &cas.address()) - .ensure_local_has_files( + .ensure_downloaded( HashSet::new(), HashSet::from([recursive_testdir_digest.clone().collect()]), ) diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index bb058d751ba..249dce9ede7 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -834,7 +834,7 @@ pub(crate) async fn check_cache_content( CacheContentBehavior::Fetch => { let response = response.clone(); let fetch_result = in_workunit!("eager_fetch_action_cache", Level::Trace, |_workunit| store - .ensure_local_has_files( + .ensure_downloaded( HashSet::from([response.stdout_digest, response.stderr_digest]), HashSet::from([response.output_directory]) )) From 965ff984e56bdfd204204c384bdad201280bcb3f Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Wed, 9 Nov 2022 17:16:24 -0600 Subject: [PATCH 08/10] Fix test --- src/rust/engine/fs/store/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index aa4b259d96d..7d8a69b7999 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -221,7 +221,7 @@ async fn load_recursive_directory() { new_store(dir.path(), &cas.address()) .ensure_downloaded( HashSet::new(), - HashSet::from([recursive_testdir_digest.clone().collect()]), + HashSet::from([recursive_testdir_digest.clone()]), ) .await .expect("Downloading recursive directory should have succeeded."); From 6188dcd00ead8e4f2ee026495d8dffb2e6038ad3 Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Thu, 10 Nov 2022 10:36:29 -0600 Subject: [PATCH 09/10] Use `record_digest_trie` --- src/rust/engine/fs/store/src/lib.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 33d76b38a86..c31ba316f06 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -975,15 +975,14 @@ impl Store { let file_digests_from_directories = future::try_join_all(directory_digests.into_iter().map(|dir_digest| async move { let mut maybe_file_digest = None; - self - .load_digest_trie(dir_digest) - .await? - .walk(SymlinkBehavior::Aware, &mut |_, entry| match entry { - directory::Entry::File(f) => { - maybe_file_digest = Some(f.digest()); - } - directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (), - }); + let trie = self.load_digest_trie(dir_digest).await?; + trie.walk(SymlinkBehavior::Aware, &mut |_, entry| match entry { + directory::Entry::File(f) => { + maybe_file_digest = Some(f.digest()); + } + directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (), + }); + self.record_digest_trie(trie, true).await?; Ok::<_, StoreError>(maybe_file_digest) })) .await?; @@ -1026,6 +1025,12 @@ impl Store { }), ) .await?; + // let _ = future::try_join_all( + // directory_digests + // .into_iter() + // .map(self.ensure_directory_digest_persisted), + // ) + // .await?; Ok(()) } From 56c415f4f39a9d323ba9b23f2d8f7ba6da92c1db Mon Sep 17 00:00:00 2001 From: Eric Arellano Date: Thu, 10 Nov 2022 11:13:31 -0600 Subject: [PATCH 10/10] Fix bug! Dir digest may have multiple file entries --- src/rust/engine/fs/store/src/lib.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index c31ba316f06..11419ef0c87 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -974,16 +974,15 @@ impl Store { ) -> Result<(), StoreError> { let file_digests_from_directories = future::try_join_all(directory_digests.into_iter().map(|dir_digest| async move { - let mut maybe_file_digest = None; + let mut file_digests_for_dir = Vec::new(); let trie = self.load_digest_trie(dir_digest).await?; trie.walk(SymlinkBehavior::Aware, &mut |_, entry| match entry { - directory::Entry::File(f) => { - maybe_file_digest = Some(f.digest()); - } + directory::Entry::File(f) => file_digests_for_dir.push(f.digest()), directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (), }); + // Also ensure that the directory trie is persisted to disk, not only its file entries. self.record_digest_trie(trie, true).await?; - Ok::<_, StoreError>(maybe_file_digest) + Ok::<_, StoreError>(file_digests_for_dir) })) .await?; file_digests.extend(file_digests_from_directories.into_iter().flatten()); @@ -1025,12 +1024,6 @@ impl Store { }), ) .await?; - // let _ = future::try_join_all( - // directory_digests - // .into_iter() - // .map(self.ensure_directory_digest_persisted), - // ) - // .await?; Ok(()) }