From 3ee3094723b3ac13d034382f7bf8966746537abd Mon Sep 17 00:00:00 2001 From: Tim de Jager Date: Mon, 11 Dec 2023 11:50:58 +0100 Subject: [PATCH 1/4] test: test deadlock in file store --- .../src/index/file_store.rs | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/crates/rattler_installs_packages/src/index/file_store.rs b/crates/rattler_installs_packages/src/index/file_store.rs index 37d67fa6..f9cd0103 100644 --- a/crates/rattler_installs_packages/src/index/file_store.rs +++ b/crates/rattler_installs_packages/src/index/file_store.rs @@ -275,6 +275,7 @@ fn lock(path: &Path, mode: LockMode) -> io::Result { // Lock the file. On unix this is apparently a thin wrapper around flock(2) and it doesn't // properly handle EINTR so we keep retrying when that happens. + retry_interrupted(|| lock.lock_exclusive())?; Ok(lock) @@ -283,6 +284,8 @@ fn lock(path: &Path, mode: LockMode) -> io::Result { #[cfg(test)] mod test { use super::*; + use std::sync::Arc; + use tokio::sync::Notify; #[test] fn test_file_store() { @@ -299,4 +302,33 @@ mod test { .unwrap(); assert_eq!(read_back, hello); } + + #[tokio::test] + async fn test_locking() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().to_path_buf(); + let path2 = dir.path().to_path_buf(); + + let notify = Arc::new(Notify::new()); + let notify2 = notify.clone(); + let notify3 = notify.clone(); + + let one = tokio::spawn(async move { + let lock = lock(&path, LockMode::Lock).unwrap(); + notify2.notify_one(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + }); + + let two = tokio::spawn(async move { + notify3.notified().await; + tokio::task::spawn_blocking(move || lock(&path2, LockMode::IfExists)) + .await + .unwrap() + .unwrap(); + }); + + let (a, b) = tokio::join!(one, two); + a.unwrap(); + b.unwrap(); + } } From 3003150c11c4080f69d8c7a70543e107a0adbb84 Mon Sep 17 00:00:00 2001 From: Tim de Jager Date: Mon, 11 Dec 2023 13:31:25 +0100 Subject: [PATCH 2/4] feat: made all file locking code async --- .../src/index/file_store.rs | 49 +++++++++++-------- .../src/index/http.rs | 2 +- .../src/index/package_database.rs | 21 ++++---- 3 files changed, 40 insertions(+), 32 deletions(-) diff --git a/crates/rattler_installs_packages/src/index/file_store.rs b/crates/rattler_installs_packages/src/index/file_store.rs index f9cd0103..ef9a6e49 100644 --- a/crates/rattler_installs_packages/src/index/file_store.rs +++ b/crates/rattler_installs_packages/src/index/file_store.rs @@ -12,6 +12,7 @@ use std::{ marker::PhantomData, path::{Path, PathBuf}, }; +use tokio::task; /// Types that implement this can be used as keys of the [`FileStore`]. pub trait CacheKey { @@ -88,11 +89,11 @@ impl FileStore { /// Gets readable access to the data with the specified key. If no such entry exists the /// function `f` is called to populate the entry. - pub fn get_or_set(&self, key: &K, f: F) -> io::Result + pub async fn get_or_set(&self, key: &K, f: F) -> io::Result where F: FnOnce(&mut dyn Write) -> io::Result<()>, { - let lock = self.lock(key)?; + let lock = self.lock(key).await?; if let Some(reader) = lock.reader() { // We use `detach_unlocked` here because we are sure that if the file exists it also has // immutable content. @@ -106,8 +107,8 @@ impl FileStore { /// Gets readable access to the data with the specified key. Returns `None` if no such key /// exists in the store. - pub fn get(&self, key: &K) -> Option { - if let Some(lock) = self.lock_if_exists(key) { + pub async fn get(&self, key: &K) -> Option { + if let Some(lock) = self.lock_if_exists(key).await { if let Some(reader) = lock.reader() { return Some(reader.detach_unlocked()); } @@ -116,9 +117,9 @@ impl FileStore { } /// Locks a certain file in the cache for exclusive access. - pub fn lock(&self, key: &K) -> io::Result { + pub async fn lock(&self, key: &K) -> io::Result { let path = self.base.join(key.key()); - let lock = lock(&path, LockMode::Lock)?; + let lock = lock(&path, LockMode::Lock).await?; Ok(FileLock { tmp: self.tmp.clone(), _lock_file: lock, @@ -130,13 +131,16 @@ impl FileStore { /// /// This function exists to ensure that we don't create tons of directories just to check if an /// entry exists or not. - pub fn lock_if_exists(&self, key: &K) -> Option { + pub async fn lock_if_exists(&self, key: &K) -> Option { let path = self.base.join(key.key()); - lock(&path, LockMode::IfExists).ok().map(|lock| FileLock { - tmp: self.tmp.clone(), - _lock_file: lock, - path, - }) + lock(&path, LockMode::IfExists) + .await + .ok() + .map(|lock| FileLock { + tmp: self.tmp.clone(), + _lock_file: lock, + path, + }) } } @@ -252,7 +256,7 @@ enum LockMode { /// Create a `.lock` file for the file at the specified `path`. Only a single process has access to /// the lock-file. -fn lock(path: &Path, mode: LockMode) -> io::Result { +async fn lock(path: &Path, mode: LockMode) -> io::Result { // Determine the path of the lockfile let lock_path = path.with_extension(".lock"); @@ -276,7 +280,12 @@ fn lock(path: &Path, mode: LockMode) -> io::Result { // Lock the file. On unix this is apparently a thin wrapper around flock(2) and it doesn't // properly handle EINTR so we keep retrying when that happens. - retry_interrupted(|| lock.lock_exclusive())?; + let lock = task::spawn_blocking(move || { + retry_interrupted(|| lock.lock_exclusive()).unwrap(); + lock + }) + .await + .unwrap(); Ok(lock) } @@ -287,8 +296,8 @@ mod test { use std::sync::Arc; use tokio::sync::Notify; - #[test] - fn test_file_store() { + #[tokio::test] + async fn test_file_store() { let dir = tempfile::tempdir().unwrap(); let store = FileStore::new(dir.path()).unwrap(); @@ -297,6 +306,7 @@ mod test { let mut read_back = Vec::new(); store .get_or_set(&hello, |w| w.write_all(hello)) + .await .unwrap() .read_to_end(&mut read_back) .unwrap(); @@ -314,17 +324,14 @@ mod test { let notify3 = notify.clone(); let one = tokio::spawn(async move { - let lock = lock(&path, LockMode::Lock).unwrap(); + let lock = lock(&path, LockMode::Lock).await.unwrap(); notify2.notify_one(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; }); let two = tokio::spawn(async move { notify3.notified().await; - tokio::task::spawn_blocking(move || lock(&path2, LockMode::IfExists)) - .await - .unwrap() - .unwrap(); + let lock = lock(&path, LockMode::Lock).await.unwrap(); }); let (a, b) = tokio::join!(one, two); diff --git a/crates/rattler_installs_packages/src/index/http.rs b/crates/rattler_installs_packages/src/index/http.rs index c29a88e1..14188482 100644 --- a/crates/rattler_installs_packages/src/index/http.rs +++ b/crates/rattler_installs_packages/src/index/http.rs @@ -97,7 +97,7 @@ impl Http { Ok(response) } else { let key = key_for_request(&url, method, &headers); - let lock = self.http_cache.lock(&key.as_slice())?; + let lock = self.http_cache.lock(&key.as_slice()).await?; if let Some((old_policy, final_url, old_body)) = lock .reader() diff --git a/crates/rattler_installs_packages/src/index/package_database.rs b/crates/rattler_installs_packages/src/index/package_database.rs index 21f4a656..7e2db33a 100644 --- a/crates/rattler_installs_packages/src/index/package_database.rs +++ b/crates/rattler_installs_packages/src/index/package_database.rs @@ -103,8 +103,8 @@ impl PackageDb { /// Reads the metadata for the given artifact from the cache or return `None` if the metadata /// could not be found in the cache. - fn metadata_from_cache(&self, ai: &ArtifactInfo) -> Option> { - let mut data = self.metadata_cache.get(&ai.hashes.as_ref()?)?; + async fn metadata_from_cache(&self, ai: &ArtifactInfo) -> Option> { + let mut data = self.metadata_cache.get(&ai.hashes.as_ref()?).await?; let mut bytes = Vec::new(); data.read_to_end(&mut bytes).ok()?; Some(bytes) @@ -112,10 +112,11 @@ impl PackageDb { /// Writes the metadata for the given artifact into the cache. If the metadata already exists /// its not overwritten. - fn put_metadata_in_cache(&self, ai: &ArtifactInfo, blob: &[u8]) -> miette::Result<()> { + async fn put_metadata_in_cache(&self, ai: &ArtifactInfo, blob: &[u8]) -> miette::Result<()> { if let Some(hash) = &ai.hashes { self.metadata_cache .get_or_set(&hash, |w| w.write_all(blob)) + .await .into_diagnostic()?; } Ok(()) @@ -140,7 +141,7 @@ impl PackageDb { let metadata = artifact.metadata(); match metadata { Ok((blob, metadata)) => { - self.put_metadata_in_cache(artifact_info, &blob)?; + self.put_metadata_in_cache(artifact_info, &blob).await?; return Ok(Some((artifact_info, metadata))); } Err(err) => { @@ -170,7 +171,7 @@ impl PackageDb { // Save the pep643 metadata in the cache if it is available let metadata = sdist.pep643_metadata(); if let Some((bytes, _)) = metadata { - self.put_metadata_in_cache(artifact_info, &bytes)?; + self.put_metadata_in_cache(artifact_info, &bytes).await?; } } Err(err) => match err.downcast_ref::() { @@ -213,7 +214,7 @@ impl PackageDb { let metadata = artifact.metadata(); match metadata { Ok((blob, metadata)) => { - self.put_metadata_in_cache(artifact_info, &blob)?; + self.put_metadata_in_cache(artifact_info, &blob).await?; return Ok(Some((artifact_info, metadata))); } Err(err) => { @@ -246,7 +247,7 @@ impl PackageDb { let metadata = wheel_builder.get_sdist_metadata(&artifact).await; match metadata { Ok((blob, metadata)) => { - self.put_metadata_in_cache(artifact_info, &blob)?; + self.put_metadata_in_cache(artifact_info, &blob).await?; return Ok(Some((artifact_info, metadata))); } Err(err) => { @@ -272,7 +273,7 @@ impl PackageDb { // Check if we already have information about any of the artifacts cached. // Return if we do for artifact_info in artifacts.iter().copied() { - if let Some(metadata_bytes) = self.metadata_from_cache(artifact_info) { + if let Some(metadata_bytes) = self.metadata_from_cache(artifact_info).await { return Ok(Some(( artifact_info, WheelCoreMetadata::try_from(metadata_bytes.as_slice()).into_diagnostic()?, @@ -329,7 +330,7 @@ impl PackageDb { { match Wheel::read_metadata_bytes(name, &mut reader).await { Ok((blob, metadata)) => { - self.put_metadata_in_cache(artifact_info, &blob)?; + self.put_metadata_in_cache(artifact_info, &blob).await?; return Ok(Some(metadata)); } Err(err) => { @@ -366,7 +367,7 @@ impl PackageDb { .into_diagnostic()?; let metadata = WheelCoreMetadata::try_from(bytes.as_slice()).into_diagnostic()?; - self.put_metadata_in_cache(artifact_info, &bytes)?; + self.put_metadata_in_cache(artifact_info, &bytes).await?; Ok((artifact_info, metadata)) } From 90aef007606d3dcbb012e0999cbe70facbcdbebe Mon Sep 17 00:00:00 2001 From: Tim de Jager Date: Mon, 11 Dec 2023 13:32:22 +0100 Subject: [PATCH 3/4] fix: tests --- crates/rattler_installs_packages/src/index/file_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rattler_installs_packages/src/index/file_store.rs b/crates/rattler_installs_packages/src/index/file_store.rs index ef9a6e49..b048a525 100644 --- a/crates/rattler_installs_packages/src/index/file_store.rs +++ b/crates/rattler_installs_packages/src/index/file_store.rs @@ -331,7 +331,7 @@ mod test { let two = tokio::spawn(async move { notify3.notified().await; - let lock = lock(&path, LockMode::Lock).await.unwrap(); + let lock = lock(&path2, LockMode::Lock).await.unwrap(); }); let (a, b) = tokio::join!(one, two); From 47b59201e228017f8480f23dbede4e1ca4d96ba0 Mon Sep 17 00:00:00 2001 From: Tim de Jager Date: Tue, 12 Dec 2023 10:13:15 +0100 Subject: [PATCH 4/4] test: made the test timeout if deadlocking --- .../src/index/file_store.rs | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/crates/rattler_installs_packages/src/index/file_store.rs b/crates/rattler_installs_packages/src/index/file_store.rs index b048a525..a0a84a19 100644 --- a/crates/rattler_installs_packages/src/index/file_store.rs +++ b/crates/rattler_installs_packages/src/index/file_store.rs @@ -294,7 +294,9 @@ async fn lock(path: &Path, mode: LockMode) -> io::Result { mod test { use super::*; use std::sync::Arc; + use std::time::Duration; use tokio::sync::Notify; + use tokio::time::timeout; #[tokio::test] async fn test_file_store() { @@ -313,8 +315,12 @@ mod test { assert_eq!(read_back, hello); } + /// Test deadlock situation that occurred + /// We want to test that progress can still be made even though a task is holding the lock + /// In the old implementation this would deadlock. #[tokio::test] async fn test_locking() { + // Start with some annoying async rust bookkeeping let dir = tempfile::tempdir().unwrap(); let path = dir.path().to_path_buf(); let path2 = dir.path().to_path_buf(); @@ -323,19 +329,25 @@ mod test { let notify2 = notify.clone(); let notify3 = notify.clone(); + // Use the same lock file for both tasks let one = tokio::spawn(async move { - let lock = lock(&path, LockMode::Lock).await.unwrap(); + let _lock = lock(&path, LockMode::Lock).await.unwrap(); notify2.notify_one(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; }); let two = tokio::spawn(async move { notify3.notified().await; - let lock = lock(&path2, LockMode::Lock).await.unwrap(); + let _lock = lock(&path2, LockMode::Lock).await.unwrap(); }); - let (a, b) = tokio::join!(one, two); - a.unwrap(); - b.unwrap(); + // We expect this to finish in a reasonable amount of time + // so we set a timeout of 2 seconds + let (a, b) = tokio::join!( + timeout(Duration::from_secs(2), one), + timeout(Duration::from_secs(2), two) + ); + a.unwrap().unwrap(); + b.unwrap().unwrap(); } }