diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs index 5f6ad3d2b8fa..1d737bfe5350 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs @@ -551,12 +551,7 @@ impl ObjectAccessTrait for BlobObjectAccess { async fn delete_objects(&self, stream: &mut (dyn Stream + Send + Unpin)) { // azure-sdk-for-rust does not yet support batched deletion of objects. So we issue - // object deletion requests in parallel. This means we open many TCP connections in - // parallel. Experimentally, we see that we get good performance with about half the - // default batch size that we use with S3 which is 1000. This also avoids running out - // of file desciptors when the ulimit is low. - let batch_size = *OBJECT_DELETION_BATCH_SIZE / 2; - + // object deletion requests in parallel. stream .map(|key| async move { let op = self.access_stats.begin(ObjectAccessOpType::ObjectDelete); @@ -591,7 +586,7 @@ impl ObjectAccessTrait for BlobObjectAccess { .unwrap(); op.end(0); }) - .buffered(batch_size) + .buffer_unordered(*OBJECT_DELETION_BATCH_SIZE) .count() .await; } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs index 235683aa7b3c..efdc0c219b10 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs @@ -61,7 +61,12 @@ tunable! { static ref LONG_OPERATION_DURATION: Duration = Duration::from_secs(2); static ref XLONG_OPERATION_DURATION: Duration = Duration::from_secs(60); static ref PANIC_ON_XLONG_OPERATION: bool = false; - pub static ref OBJECT_DELETION_BATCH_SIZE: usize = 1000; + + // The default number of deletes to issue in parallel. Experimentally, we + // see that we get good performance with a batch size of 500. Since we open + // these many socket connections, we avoid a larger batch size to not run + // out of file desciptors when the ulimit is low. + pub static ref OBJECT_DELETION_BATCH_SIZE: usize = 500; static ref PER_REQUEST_TIMEOUT: Duration = Duration::from_secs(2); } diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs index fde63230e636..bf5ce65aef37 100644 --- a/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs +++ b/cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs @@ -5,7 +5,6 @@ use std::ops::Range; use std::pin::Pin; use std::time::Instant; -use anyhow::anyhow; use anyhow::Context; use anyhow::Result; use async_stream::try_stream; @@ -27,8 +26,7 @@ use rusoto_core::RusotoError; use rusoto_credential::ChainProvider; use rusoto_credential::InstanceMetadataProvider; use rusoto_credential::ProfileProvider; -use rusoto_s3::Delete; -use rusoto_s3::DeleteObjectsRequest; +use rusoto_s3::DeleteObjectRequest; use rusoto_s3::GetBucketLifecycleConfigurationError; use rusoto_s3::GetBucketLifecycleConfigurationOutput; use rusoto_s3::GetBucketLifecycleConfigurationRequest; @@ -36,7 +34,6 @@ use rusoto_s3::GetObjectError; use rusoto_s3::GetObjectRequest; use rusoto_s3::HeadObjectRequest; use rusoto_s3::ListObjectsV2Request; -use rusoto_s3::ObjectIdentifier; use rusoto_s3::PutObjectError; use rusoto_s3::PutObjectRequest; use rusoto_s3::S3Client; @@ -425,43 +422,27 @@ impl ObjectAccessTrait for S3ObjectAccess { // Note: Stream is of raw keys (with prefix) async fn delete_objects(&self, stream: &mut (dyn Stream + Send + Unpin)) { - // Note: we intentionally issue the delete calls serially because it - // doesn't seem to improve performance if we issue them in parallel - // (using StreamExt::for_each_concurrent()). + // GCP does not support S3's batched multi-delete API. So instead we issue + // object deletion requests in parallel. This performs better than the multi-delete API. stream - .chunks(*OBJECT_DELETION_BATCH_SIZE) - .for_each(|chunk| async move { - let msg = format!("delete {} objects including {}", chunk.len(), &chunk[0]); + .map(|key| async move { let op = self.access_stats.begin(ObjectAccessOpType::ObjectDelete); - + let msg = format!("delete object {key}"); retry(&msg, None, || async { - let req = DeleteObjectsRequest { + let req = DeleteObjectRequest { bucket: self.bucket.clone(), - delete: Delete { - objects: chunk - .iter() - .map(|key| ObjectIdentifier { - key: key.clone(), - ..Default::default() - }) - .collect(), - quiet: Some(true), - }, + key: key.clone(), ..Default::default() }; - let output = self.client.delete_objects(req).await?; - match output.errors { - Some(errs) => match errs.get(0) { - Some(e) => Err(OAError::Other(anyhow!("{:?}", e))), - None => Ok(()), - }, - None => Ok(()), - } + self.client.delete_object(req).await?; + Ok(()) }) .await .unwrap(); - op.end_multiple(0, chunk.len() as u64); + op.end(0); }) + .buffer_unordered(*OBJECT_DELETION_BATCH_SIZE) + .count() .await; }