Skip to content

Commit

Permalink
DLPX-82101 Parallel object delete to accommodate GCP (openzfs#534)
Browse files Browse the repository at this point in the history
  • Loading branch information
manoj-joseph authored Aug 1, 2022
1 parent 1dd2749 commit 8c0e559
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 39 deletions.
9 changes: 2 additions & 7 deletions cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,12 +551,7 @@ impl ObjectAccessTrait for BlobObjectAccess {

async fn delete_objects(&self, stream: &mut (dyn Stream<Item = String> + Send + Unpin)) {
// azure-sdk-for-rust does not yet support batched deletion of objects. So we issue
// object deletion requests in parallel. This means we open many TCP connections in
// parallel. Experimentally, we see that we get good performance with about half the
// default batch size that we use with S3 which is 1000. This also avoids running out
// of file desciptors when the ulimit is low.
let batch_size = *OBJECT_DELETION_BATCH_SIZE / 2;

// object deletion requests in parallel.
stream
.map(|key| async move {
let op = self.access_stats.begin(ObjectAccessOpType::ObjectDelete);
Expand Down Expand Up @@ -591,7 +586,7 @@ impl ObjectAccessTrait for BlobObjectAccess {
.unwrap();
op.end(0);
})
.buffered(batch_size)
.buffer_unordered(*OBJECT_DELETION_BATCH_SIZE)
.count()
.await;
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ tunable! {
static ref LONG_OPERATION_DURATION: Duration = Duration::from_secs(2);
static ref XLONG_OPERATION_DURATION: Duration = Duration::from_secs(60);
static ref PANIC_ON_XLONG_OPERATION: bool = false;
pub static ref OBJECT_DELETION_BATCH_SIZE: usize = 1000;

// The default number of deletes to issue in parallel. Experimentally, we
// see that we get good performance with a batch size of 500. Since we open
// these many socket connections, we avoid a larger batch size to not run
// out of file desciptors when the ulimit is low.
pub static ref OBJECT_DELETION_BATCH_SIZE: usize = 500;
static ref PER_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
}

Expand Down
43 changes: 12 additions & 31 deletions cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::ops::Range;
use std::pin::Pin;
use std::time::Instant;

use anyhow::anyhow;
use anyhow::Context;
use anyhow::Result;
use async_stream::try_stream;
Expand All @@ -27,16 +26,14 @@ use rusoto_core::RusotoError;
use rusoto_credential::ChainProvider;
use rusoto_credential::InstanceMetadataProvider;
use rusoto_credential::ProfileProvider;
use rusoto_s3::Delete;
use rusoto_s3::DeleteObjectsRequest;
use rusoto_s3::DeleteObjectRequest;
use rusoto_s3::GetBucketLifecycleConfigurationError;
use rusoto_s3::GetBucketLifecycleConfigurationOutput;
use rusoto_s3::GetBucketLifecycleConfigurationRequest;
use rusoto_s3::GetObjectError;
use rusoto_s3::GetObjectRequest;
use rusoto_s3::HeadObjectRequest;
use rusoto_s3::ListObjectsV2Request;
use rusoto_s3::ObjectIdentifier;
use rusoto_s3::PutObjectError;
use rusoto_s3::PutObjectRequest;
use rusoto_s3::S3Client;
Expand Down Expand Up @@ -425,43 +422,27 @@ impl ObjectAccessTrait for S3ObjectAccess {

// Note: Stream is of raw keys (with prefix)
async fn delete_objects(&self, stream: &mut (dyn Stream<Item = String> + Send + Unpin)) {
// Note: we intentionally issue the delete calls serially because it
// doesn't seem to improve performance if we issue them in parallel
// (using StreamExt::for_each_concurrent()).
// GCP does not support S3's batched multi-delete API. So instead we issue
// object deletion requests in parallel. This performs better than the multi-delete API.
stream
.chunks(*OBJECT_DELETION_BATCH_SIZE)
.for_each(|chunk| async move {
let msg = format!("delete {} objects including {}", chunk.len(), &chunk[0]);
.map(|key| async move {
let op = self.access_stats.begin(ObjectAccessOpType::ObjectDelete);

let msg = format!("delete object {key}");
retry(&msg, None, || async {
let req = DeleteObjectsRequest {
let req = DeleteObjectRequest {
bucket: self.bucket.clone(),
delete: Delete {
objects: chunk
.iter()
.map(|key| ObjectIdentifier {
key: key.clone(),
..Default::default()
})
.collect(),
quiet: Some(true),
},
key: key.clone(),
..Default::default()
};
let output = self.client.delete_objects(req).await?;
match output.errors {
Some(errs) => match errs.get(0) {
Some(e) => Err(OAError::Other(anyhow!("{:?}", e))),
None => Ok(()),
},
None => Ok(()),
}
self.client.delete_object(req).await?;
Ok(())
})
.await
.unwrap();
op.end_multiple(0, chunk.len() as u64);
op.end(0);
})
.buffer_unordered(*OBJECT_DELETION_BATCH_SIZE)
.count()
.await;
}

Expand Down

0 comments on commit 8c0e559

Please sign in to comment.