diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 534b5e499cc..13424be944a 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -905,19 +905,10 @@ impl Store { } // Filter out file digests that exist locally. - // TODO: Implement a local batch API: see https://github.com/pantsbuild/pants/issues/16400. - let local_file_exists = future::try_join_all( - file_digests - .iter() - .map(|file_digest| self.local.exists(EntryType::File, *file_digest)) - .collect::>(), - ) - .await?; - let missing_locally = local_file_exists - .into_iter() - .zip(file_digests.into_iter()) - .filter_map(|(exists, digest)| if exists { None } else { Some(digest) }) - .collect::>(); + let missing_locally = self + .local + .get_missing_digests(EntryType::File, file_digests) + .await?; // If there are any digests which don't exist locally, check remotely. if missing_locally.is_empty() { diff --git a/src/rust/engine/fs/store/src/local.rs b/src/rust/engine/fs/store/src/local.rs index 9fca35d9135..f4e57cc426e 100644 --- a/src/rust/engine/fs/store/src/local.rs +++ b/src/rust/engine/fs/store/src/local.rs @@ -1,6 +1,6 @@ use super::{EntryType, ShrinkBehavior}; -use std::collections::BinaryHeap; +use std::collections::{BinaryHeap, HashSet}; use std::fmt::Debug; use std::io::{self, Read}; use std::path::Path; @@ -338,18 +338,41 @@ impl ByteStore { .await } - pub async fn exists(&self, entry_type: EntryType, digest: Digest) -> Result { - if digest == EMPTY_DIGEST { - // Avoid I/O for this case. This allows some client-provided operations (like merging - // snapshots) to work without needing to first store the empty snapshot. - return Ok(true); - } + /// + /// Given a collection of Digests (digests), + /// returns the set of digests from that collection not present in the + /// underlying LMDB store. + /// + pub async fn get_missing_digests( + &self, + entry_type: EntryType, + digests: HashSet, + ) -> Result, String> { + let fingerprints_to_check = digests + .iter() + .filter_map(|digest| { + // Avoid I/O for this case. This allows some client-provided operations (like + // merging snapshots) to work without needing to first store the empty snapshot. + if *digest == EMPTY_DIGEST { + None + } else { + Some(digest.hash) + } + }) + .collect::>(); let dbs = match entry_type { EntryType::Directory => self.inner.directory_dbs.clone(), EntryType::File => self.inner.file_dbs.clone(), }?; - dbs.exists(digest.hash).await + + let existing = dbs.exists_batch(fingerprints_to_check).await?; + + let missing = digests + .into_iter() + .filter(|digest| *digest != EMPTY_DIGEST && !existing.contains(&digest.hash)) + .collect::>(); + Ok(missing) } /// diff --git a/src/rust/engine/sharded_lmdb/src/lib.rs b/src/rust/engine/sharded_lmdb/src/lib.rs index 2a925c33fc7..2157f821287 100644 --- a/src/rust/engine/sharded_lmdb/src/lib.rs +++ b/src/rust/engine/sharded_lmdb/src/lib.rs @@ -25,7 +25,7 @@ // Arc can be more clear than needing to grok Orderings: #![allow(clippy::mutex_atomic)] -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::fmt::Debug; use std::io::{self, Read}; @@ -327,25 +327,71 @@ impl ShardedLmdb { .await } + /// + /// Singular form of `Self::exists_batch`. When checking the existence of more than one item, + /// prefer `Self::exists_batch`. + /// pub async fn exists(&self, fingerprint: Fingerprint) -> Result { + let missing = self.exists_batch(vec![fingerprint]).await?; + Ok(missing.contains(&fingerprint)) + } + + /// + /// Determine which of the given Fingerprints are already present in the store, + /// returning them as a set. + /// + pub async fn exists_batch( + &self, + fingerprints: Vec, + ) -> Result, String> { let store = self.clone(); - let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); self .executor .spawn_blocking(move || { - let fingerprint = effective_key.get_fingerprint(); - let (env, db, _) = store.get(&fingerprint); - let txn = env - .begin_ro_txn() - .map_err(|err| format!("Failed to begin read transaction: {:?}", err))?; - match txn.get(db, &effective_key) { - Ok(_) => Ok(true), - Err(lmdb::Error::NotFound) => Ok(false), - Err(err) => Err(format!( - "Error reading from store when checking existence of {}: {}", - fingerprint, err - )), + // Group the items by the Environment that they will be applied to. + let mut items_by_env = HashMap::new(); + let mut exists = HashSet::new(); + + for fingerprint in &fingerprints { + let effective_key = VersionedFingerprint::new(*fingerprint, ShardedLmdb::SCHEMA_VERSION); + let (env_id, _, env, db, _) = store.get_raw(&fingerprint.0); + + let (_, _, batch) = items_by_env + .entry(*env_id) + .or_insert_with(|| (env.clone(), *db, vec![])); + batch.push(effective_key); + } + + // Open and commit a Transaction per Environment. Since we never have more than one + // Transaction open at a time, we don't have to worry about ordering. + for (_, (env, db, batch)) in items_by_env { + env + .begin_ro_txn() + .and_then(|txn| { + for effective_key in &batch { + let get_res = txn.get(db, &effective_key); + match get_res { + Ok(_) => { + exists.insert(effective_key.get_fingerprint()); + } + Err(lmdb::Error::NotFound) => (), + Err(err) => return Err(err), + }; + } + txn.commit() + }) + .map_err(|e| { + format!( + "Error checking existence of fingerprints {:?}: {}", + batch + .iter() + .map(|key| key.get_fingerprint()) + .collect::>(), + e + ) + })?; } + Ok(exists) }) .await }