From 2a93fd42345743aba3616e3d8c1cb5c1c9a8af48 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Mon, 23 Oct 2023 21:13:48 -0700 Subject: [PATCH] Add best-effort limits on async file opens to reduce file handle counts (#20055) As described in #19765, `2.17.x` uses more file handles than previous versions. Based on the location of the reported error, I suspect that this is due to the move from using the LMDB store for all files, to using the filesystem-based store for large files (#18153). In particular: rather than digesting files inside of `spawn_blocking` while capturing them into the LMDB store (imposing the [limit of blocking threads](https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.max_blocking_threads) from the tokio runtime), `fn store` moved to digesting them using tokio's async file APIs, which impose no such limit. This change adds a semaphore to (some) file opens to provide a best-effort limit on files opened for the purposes of being captured. It additionally (in the first commit) fixes an extraneous file handle that was being kept open during capture. Fixes #19765. --- src/rust/engine/fs/store/src/local.rs | 71 +++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 9 deletions(-) diff --git a/src/rust/engine/fs/store/src/local.rs b/src/rust/engine/fs/store/src/local.rs index f8ebcb27afe..69bc8fa7b91 100644 --- a/src/rust/engine/fs/store/src/local.rs +++ b/src/rust/engine/fs/store/src/local.rs @@ -22,6 +22,7 @@ use task_executor::Executor; use tempfile::Builder; use tokio::fs::hard_link; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; +use tokio::sync::{Semaphore, SemaphorePermit}; use workunit_store::ObservationMetric; /// How big a file must be to be stored as a file on disk. @@ -58,6 +59,7 @@ trait UnderlyingByteStore { initial_lease: bool, src_is_immutable: bool, expected_digest: Digest, + file_source: &FileSource, src: PathBuf, ) -> Result<(), String>; @@ -113,13 +115,18 @@ impl UnderlyingByteStore for ShardedLmdb { initial_lease: bool, src_is_immutable: bool, expected_digest: Digest, + _file_source: &FileSource, src: PathBuf, ) -> Result<(), String> { self.store( initial_lease, src_is_immutable, expected_digest, - move || std::fs::File::open(&src), + move || { + // NB: This file access is bounded by the number of blocking threads on the runtime, and + // so we don't bother to acquire against the file handle limit in this case. + std::fs::File::open(&src) + }, ) .await } @@ -396,11 +403,13 @@ impl UnderlyingByteStore for ShardedFSDB { _initial_lease: bool, src_is_immutable: bool, expected_digest: Digest, + file_source: &FileSource, src: PathBuf, ) -> Result<(), String> { let mut attempts = 0; loop { - let reader = tokio::fs::File::open(src.clone()) + let (reader, _permit) = file_source + .open_readonly(&src) .await .map_err(|e| format!("Failed to open {src:?}: {e}"))?; @@ -519,6 +528,29 @@ impl UnderlyingByteStore for ShardedFSDB { } } +/// A best-effort limit on the number of concurrent attempts to open files. +#[derive(Debug)] +struct FileSource { + open_files: Semaphore, +} + +impl FileSource { + async fn open_readonly( + &self, + path: &Path, + ) -> Result<(tokio::fs::File, SemaphorePermit), String> { + let permit = self + .open_files + .acquire() + .await + .map_err(|e| format!("Failed to acquire permit to open file: {e}"))?; + let file = tokio::fs::File::open(path) + .await + .map_err(|e| e.to_string())?; + Ok((file, permit)) + } +} + #[derive(Debug, Clone)] pub struct ByteStore { inner: Arc, @@ -532,6 +564,7 @@ struct InnerStore { file_lmdb: Result, String>, directory_lmdb: Result, String>, file_fsdb: ShardedFSDB, + file_source: FileSource, } impl ByteStore { @@ -582,6 +615,13 @@ impl ByteStore { dest_initializer: Arc::new(Mutex::default()), hardlinkable_destinations: Arc::new(Mutex::default()), }, + // NB: This is much larger than the number of cores on modern machines, but still small + // enough to be a "reasonable" number of open files to set in `ulimit`. This is a + // best-effort limit (because it does-not/cannot cover all of the places where we open + // files). + file_source: FileSource { + open_files: Semaphore::new(1024), + }, }), }) } @@ -790,17 +830,28 @@ impl ByteStore { src_is_immutable: bool, src: PathBuf, ) -> Result { - let mut file = tokio::fs::File::open(src.clone()) - .await - .map_err(|e| format!("Failed to open {src:?}: {e}"))?; - let digest = async_copy_and_hash(&mut file, &mut tokio::io::sink()) - .await - .map_err(|e| format!("Failed to hash {src:?}: {e}"))?; + let digest = { + let (mut file, _permit) = self + .inner + .file_source + .open_readonly(&src) + .await + .map_err(|e| format!("Failed to open {src:?}: {e}"))?; + async_copy_and_hash(&mut file, &mut tokio::io::sink()) + .await + .map_err(|e| format!("Failed to hash {src:?}: {e}"))? + }; if ByteStore::should_use_fsdb(entry_type, digest.size_bytes) { self.inner .file_fsdb - .store(initial_lease, src_is_immutable, digest, src) + .store( + initial_lease, + src_is_immutable, + digest, + &self.inner.file_source, + src, + ) .await?; } else { let dbs = match entry_type { @@ -809,6 +860,8 @@ impl ByteStore { }; let _ = dbs .store(initial_lease, src_is_immutable, digest, move || { + // NB: This file access is bounded by the number of blocking threads on the runtime, and + // so we don't bother to acquire against the file handle limit in this case. std::fs::File::open(&src) }) .await;