Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary uses of DashMap and Arc #3413

Merged
merged 3 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crates/uv-build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::fmt::{Display, Formatter};
use std::io;
use std::path::{Path, PathBuf};
use std::process::{ExitStatus, Output};
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use std::{env, iter};

use fs_err as fs;
Expand Down Expand Up @@ -329,13 +329,13 @@ impl Pep517Backend {
}
}

/// Uses an [`Arc`] internally, clone freely.
/// Uses an [`Rc`] internally, clone freely.
#[derive(Debug, Default, Clone)]
pub struct SourceBuildContext {
/// An in-memory resolution of the default backend's requirements for PEP 517 builds.
default_resolution: Arc<Mutex<Option<Resolution>>>,
default_resolution: Rc<Mutex<Option<Resolution>>>,
/// An in-memory resolution of the build requirements for `--legacy-setup-py` builds.
setup_py_resolution: Arc<Mutex<Option<Resolution>>>,
setup_py_resolution: Rc<Mutex<Option<Resolution>>>,
}

/// Holds the state through a series of PEP 517 frontend to backend calls or a single setup.py
Expand Down
32 changes: 18 additions & 14 deletions crates/uv-client/src/cached_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
/// `CachedClient::get_cacheable`. If your types fit into the
/// `rkyvutil::OwnedArchive` mold, then an implementation of `Cacheable` is
/// already provided for that type.
pub trait Cacheable: Sized + Send {
pub trait Cacheable: Sized {
/// This associated type permits customizing what the "output" type of
/// deserialization is. It can be identical to `Self`.
///
Expand All @@ -54,7 +54,7 @@ pub struct SerdeCacheable<T> {
inner: T,
}

impl<T: Send + Serialize + DeserializeOwned> Cacheable for SerdeCacheable<T> {
impl<T: Serialize + DeserializeOwned> Cacheable for SerdeCacheable<T> {
type Target = T;

fn from_aligned_bytes(bytes: AlignedVec) -> Result<T, Error> {
Expand All @@ -75,7 +75,7 @@ impl<T: Send + Serialize + DeserializeOwned> Cacheable for SerdeCacheable<T> {
/// All `OwnedArchive` values are cacheable.
impl<A> Cacheable for OwnedArchive<A>
where
A: rkyv::Archive + rkyv::Serialize<crate::rkyvutil::Serializer<4096>> + Send,
A: rkyv::Archive + rkyv::Serialize<crate::rkyvutil::Serializer<4096>>,
A::Archived: for<'a> rkyv::CheckBytes<rkyv::validation::validators::DefaultValidator<'a>>
+ rkyv::Deserialize<A, rkyv::de::deserializers::SharedDeserializeMap>,
{
Expand Down Expand Up @@ -179,7 +179,7 @@ impl CachedClient {
/// allowed to make subsequent requests, e.g. through the uncached client.
#[instrument(skip_all)]
pub async fn get_serde<
Payload: Serialize + DeserializeOwned + Send + 'static,
Payload: Serialize + DeserializeOwned + 'static,
CallBackError,
Callback,
CallbackReturn,
Expand All @@ -191,8 +191,8 @@ impl CachedClient {
response_callback: Callback,
) -> Result<Payload, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn + Send,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>>,
{
let payload = self
.get_cacheable(req, cache_entry, cache_control, move |resp| async {
Expand Down Expand Up @@ -225,11 +225,15 @@ impl CachedClient {
) -> Result<Payload::Target, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
CallbackReturn: Future<Output = Result<Payload, CallBackError>>,
{
let fresh_req = req.try_clone().expect("HTTP request must be cloneable");
let cached_response = match Self::read_cache(cache_entry).await {
Some(cached) => self.send_cached(req, cache_control, cached).boxed().await?,
Some(cached) => {
self.send_cached(req, cache_control, cached)
.boxed_local()
.await?
}
None => {
debug!("No cache entry for: {}", req.url());
let (response, cache_policy) = self.fresh_request(req).await?;
Expand Down Expand Up @@ -301,7 +305,7 @@ impl CachedClient {

/// Make a request without checking whether the cache is fresh.
pub async fn skip_cache<
Payload: Serialize + DeserializeOwned + Send + 'static,
Payload: Serialize + DeserializeOwned + 'static,
CallBackError,
Callback,
CallbackReturn,
Expand All @@ -312,8 +316,8 @@ impl CachedClient {
response_callback: Callback,
) -> Result<Payload, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn + Send,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>>,
{
let (response, cache_policy) = self.fresh_request(req).await?;

Expand All @@ -335,7 +339,7 @@ impl CachedClient {
) -> Result<Payload::Target, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
CallbackReturn: Future<Output = Result<Payload, CallBackError>>,
{
let _ = fs_err::tokio::remove_file(&cache_entry.path()).await;
let (response, cache_policy) = self.fresh_request(req).await?;
Expand All @@ -352,11 +356,11 @@ impl CachedClient {
) -> Result<Payload::Target, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
CallbackReturn: Future<Output = Result<Payload, CallBackError>>,
{
let new_cache = info_span!("new_cache", file = %cache_entry.path().display());
let data = response_callback(response)
.boxed()
.boxed_local()
.await
.map_err(|err| CachedClientError::Callback(err))?;
let Some(cache_policy) = cache_policy else {
Expand Down
2 changes: 1 addition & 1 deletion crates/uv-client/src/flat_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl<'a> FlatIndexClient<'a> {
.collect();
Ok::<Vec<File>, CachedClientError<Error>>(files)
}
.boxed()
.boxed_local()
.instrument(info_span!("parse_flat_index_html", url = % url))
};
let response = self
Expand Down
4 changes: 2 additions & 2 deletions crates/uv-client/src/registry_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl RegistryClient {
};
OwnedArchive::from_unarchived(&unarchived)
}
.boxed()
.boxed_local()
.instrument(info_span!("parse_simple_api", package = %package_name))
};
let result = self
Expand Down Expand Up @@ -534,7 +534,7 @@ impl RegistryClient {
})?;
Ok::<Metadata23, CachedClientError<Error>>(metadata)
}
.boxed()
.boxed_local()
.instrument(info_span!("read_metadata_range_request", wheel = %filename))
};

Expand Down
2 changes: 1 addition & 1 deletion crates/uv-dispatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl<'a> BuildContext for BuildDispatch<'a> {
build_kind,
self.build_extra_env_vars.clone(),
)
.boxed()
.boxed_local()
.await?;
Ok(builder)
}
Expand Down
15 changes: 8 additions & 7 deletions crates/uv-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::io;
use std::path::Path;
use std::rc::Rc;
use std::sync::Arc;

use futures::{FutureExt, TryStreamExt};
Expand Down Expand Up @@ -41,20 +42,20 @@ use crate::{ArchiveMetadata, Error, LocalWheel, Reporter, SourceDistributionBuil
///
/// This struct also has the task of acquiring locks around source dist builds in general and git
/// operation especially.
pub struct DistributionDatabase<'a, Context: BuildContext + Send + Sync> {
pub struct DistributionDatabase<'a, Context: BuildContext> {
client: &'a RegistryClient,
build_context: &'a Context,
builder: SourceDistributionBuilder<'a, Context>,
locks: Arc<Locks>,
locks: Rc<Locks>,
}

impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> {
impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
pub fn new(client: &'a RegistryClient, build_context: &'a Context) -> Self {
Self {
client,
build_context,
builder: SourceDistributionBuilder::new(client, build_context),
locks: Arc::new(Locks::default()),
locks: Rc::new(Locks::default()),
}
}

Expand Down Expand Up @@ -307,7 +308,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
let built_wheel = self
.builder
.download_and_build(&BuildableSource::Dist(dist), tags, hashes)
.boxed()
.boxed_local()
.await?;

// If the wheel was unzipped previously, respect it. Source distributions are
Expand Down Expand Up @@ -360,7 +361,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
return Ok(ArchiveMetadata { metadata, hashes });
}

match self.client.wheel_metadata(dist).boxed().await {
match self.client.wheel_metadata(dist).boxed_local().await {
Ok(metadata) => Ok(ArchiveMetadata::from(metadata)),
Err(err) if err.is_http_streaming_unsupported() => {
warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})");
Expand Down Expand Up @@ -404,7 +405,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
let metadata = self
.builder
.download_and_build_metadata(source, hashes)
.boxed()
.boxed_local()
.await?;
Ok(metadata)
}
Expand Down
8 changes: 4 additions & 4 deletions crates/uv-distribution/src/locks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::rc::Rc;

use rustc_hash::FxHashMap;
use tokio::sync::Mutex;
Expand All @@ -7,14 +7,14 @@ use distribution_types::{Identifier, ResourceId};

/// A set of locks used to prevent concurrent access to the same resource.
#[derive(Debug, Default)]
pub(crate) struct Locks(Mutex<FxHashMap<ResourceId, Arc<Mutex<()>>>>);
pub(crate) struct Locks(Mutex<FxHashMap<ResourceId, Rc<Mutex<()>>>>);

impl Locks {
/// Acquire a lock on the given resource.
pub(crate) async fn acquire(&self, dist: &impl Identifier) -> Arc<Mutex<()>> {
pub(crate) async fn acquire(&self, dist: &impl Identifier) -> Rc<Mutex<()>> {
let mut map = self.0.lock().await;
map.entry(dist.resource_id())
.or_insert_with(|| Arc::new(Mutex::new(())))
.or_insert_with(|| Rc::new(Mutex::new(())))
.clone()
}
}
Loading
Loading