Skip to content

Commit

Permalink
DLPX-80977 protect against s3 object expiration (openzfs#507)
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dagnelie <pcd@delphix.com>
  • Loading branch information
pcd1193182 authored Jul 18, 2022
1 parent 9ed4bb6 commit 584d227
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 3 deletions.
6 changes: 6 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,12 @@ impl ObjectAccessTrait for BlobObjectAccess {
fn supports_list_after(&self) -> bool {
false
}

/// Unfortunately, the Azure SDK doesn't support the Management Policy API, which allows us to
/// access retention policies. When that is implemented, we can properly check for retention.
async fn has_retention(&self) -> bool {
false
}
}

// Creation of a BlobObjectAccess object with invalid credentials can cause a crash as the azure sdk
Expand Down
6 changes: 6 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ impl ObjectAccess {
pub fn collect_stats(&self) -> HashMap<String, StatMapValue> {
self.as_trait().collect_stats()
}

pub async fn has_retention(&self) -> bool {
self.as_trait().has_retention().await
}
}

enum BucketAccessEnum {
Expand Down Expand Up @@ -645,6 +649,8 @@ pub trait ObjectAccessTrait: Send + Sync {
fn collect_stats(&self) -> HashMap<String, StatMapValue>;

fn supports_list_after(&self) -> bool;

async fn has_retention(&self) -> bool;
}

#[derive(Debug)]
Expand Down
42 changes: 42 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ use rusoto_credential::InstanceMetadataProvider;
use rusoto_credential::ProfileProvider;
use rusoto_s3::Delete;
use rusoto_s3::DeleteObjectsRequest;
use rusoto_s3::GetBucketLifecycleConfigurationError;
use rusoto_s3::GetBucketLifecycleConfigurationOutput;
use rusoto_s3::GetBucketLifecycleConfigurationRequest;
use rusoto_s3::GetObjectError;
use rusoto_s3::GetObjectRequest;
use rusoto_s3::HeadObjectRequest;
Expand Down Expand Up @@ -522,4 +525,43 @@ impl ObjectAccessTrait for S3ObjectAccess {
fn supports_list_after(&self) -> bool {
true
}

async fn has_retention(&self) -> bool {
let client = self.client.clone();
let result: Result<
GetBucketLifecycleConfigurationOutput,
OAError<GetBucketLifecycleConfigurationError>,
> = retry(&format!("has_retention {}", self.bucket), None, || async {
let config = GetBucketLifecycleConfigurationRequest {
bucket: self.bucket.clone(),
expected_bucket_owner: None,
};
Ok(client.get_bucket_lifecycle_configuration(config).await?)
})
.await;
match result {
Ok(output) => output
.rules
.map(|v| {
v.iter().any(|rule| {
rule.status == "Enabled"
&& rule
.expiration
.as_ref()
.map(|expiration| {
expiration.days.is_some() || expiration.date.is_some()
})
.unwrap_or(false)
})
})
.unwrap_or(false),
Err(OAError::RequestError(RequestError::Unknown(response))) => {
if response.status() == StatusCode::NOT_FOUND {
return false;
}
panic!("has_retention error: {:?}", response);
}
e => panic!("has_retention error: {:?}", e),
}
}
}
45 changes: 42 additions & 3 deletions cmd/zfs_object_agent/zettaobject/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;

use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
Expand Down Expand Up @@ -198,6 +199,8 @@ pub struct PoolPhys {
pub destroying_state: Option<PoolDestroyingPhys>,
#[serde(default)]
checkpoint_txg: Option<Txg>,
#[serde(default)]
sentinel_creation: Option<SystemTime>,
}

/// contains a pending_frees_log and matching object_size_log
Expand Down Expand Up @@ -316,7 +319,14 @@ impl PoolPhys {
.with_context(|| format!("Failed to decode contents of {}", Self::key(guid)))?;
debug!("got {:#?}", this);
assert_eq!(this.guid, guid);
Ok(this)
if this.sentinel_creation.is_some() && !this.sentinel_exists(object_access).await {
Err(anyhow!(
"Sentinel object (created {:?}) missing, pool may be corrupted",
this.sentinel_creation.unwrap()
))
} else {
Ok(this)
}
}

pub async fn put(&self, object_access: &ObjectAccess) {
Expand Down Expand Up @@ -349,6 +359,22 @@ impl PoolPhys {
)
.await
}

pub async fn put_sentinel(&self, object_access: &ObjectAccess) {
object_access
.put_object(
format!("zfs/{}/sentinel", self.guid),
Bytes::new(),
ObjectAccessOpType::MetadataPut,
)
.await
}

async fn sentinel_exists(&self, object_access: &ObjectAccess) -> bool {
object_access
.object_exists(format!("zfs/{}/sentinel", self.guid))
.await
}
}

impl UberblockPhys {
Expand Down Expand Up @@ -423,6 +449,7 @@ pub struct PoolState {
pub shared_state: Arc<PoolSharedState>,
resuming: watch_once::Receiver<()>,
heartbeat_guard: Option<HeartbeatGuard>,
sentinel_creation: Option<SystemTime>,
}

/// runtime data for each pair of pending frees + object sizes logs
Expand Down Expand Up @@ -775,10 +802,13 @@ impl Pool {
last_txg: Txg(0),
destroying_state: None,
checkpoint_txg: None,
sentinel_creation: Some(SystemTime::now()),
};
// XXX make sure it doesn't already exist
phys.put_timed(object_access, Some(*CREATE_WAIT_DURATION))
.await
.await?;
phys.put_sentinel(object_access).await;
Ok(())
}

async fn open_from_txg(
Expand Down Expand Up @@ -881,6 +911,7 @@ impl Pool {
object_block_map,
resuming: rx,
heartbeat_guard,
sentinel_creation: pool_phys.sentinel_creation,
}),
};

Expand Down Expand Up @@ -909,7 +940,13 @@ impl Pool {
syncing_txg: Option<Txg>,
rollback: bool,
) -> Result<(Pool, Option<UberblockPhys>, BlockId), PoolOpenError> {
let phys = PoolPhys::get(&object_access, guid).await?;
let mut phys = PoolPhys::get(&object_access, guid).await?;
if phys.sentinel_creation.is_none() && !object_access.readonly() {
if !phys.sentinel_exists(&object_access).await {
phys.put_sentinel(&object_access).await;
}
phys.sentinel_creation = Some(SystemTime::now());
}
if phys.last_txg.0 == 0 {
assert!(!rollback);
let shared_state = Arc::new(PoolSharedState {
Expand Down Expand Up @@ -979,6 +1016,7 @@ impl Pool {
} else {
None
},
sentinel_creation: phys.sentinel_creation,
}),
};

Expand Down Expand Up @@ -1346,6 +1384,7 @@ impl Pool {
last_txg: txg,
destroying_state: None,
checkpoint_txg,
sentinel_creation: state.sentinel_creation,
}
.put(&state.shared_state.object_access)
.await;
Expand Down
4 changes: 4 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ async fn do_test_connectivity(object_access: &ObjectAccess) -> Result<(), String
return Err("unable to delete objects".to_string());
}

if object_access.has_retention().await {
return Err("bucket has retention policy".to_string());
}

Ok(())
}

Expand Down

0 comments on commit 584d227

Please sign in to comment.