Skip to content

Commit

Permalink
Merge pull request openzfs#524 from delphix/projects/merge-upstream/m…
Browse files Browse the repository at this point in the history
…aster

Merge remote-tracking branch '6.0/stage' into 'master'
  • Loading branch information
Prakash Surya authored Jul 19, 2022
2 parents 9cdcc7e + 625339b commit 5042d88
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 41 deletions.
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/zfs_object_agent/zettaobject/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ futures = "0.3.13"
futures-core = "0.3.13"
hostname = "0.3.1"
http = "0.2.4"
itertools = "0.10.3"
lazy_static = "1.4.0"
libc = "0.2"
log = "0.4"
Expand Down
44 changes: 42 additions & 2 deletions cmd/zfs_object_agent/zettaobject/src/object_access/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ use anyhow::Context;
use anyhow::Result;
use async_stream::try_stream;
use async_trait::async_trait;
use azure_core::auth::TokenResponse;
use azure_core::prelude::NextMarker;
use azure_core::HttpError;
use azure_identity::ImdsManagedIdentityCredential;
use azure_identity::ManagedIdentityCredentialError;
use azure_identity::TokenCredential;
use azure_storage::clients::AsStorageClient;
use azure_storage::clients::StorageAccountClient;
Expand All @@ -36,10 +38,12 @@ use futures::StreamExt;
use http::Response;
use http::StatusCode;
use ini::Ini;
use lazy_static::lazy_static;
use log::*;
use more_asserts::assert_le;
use rusoto_core::ByteStream;
use tokio::io::AsyncReadExt;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use url::Url;
use util::tunable;
Expand All @@ -66,6 +70,10 @@ tunable! {
static ref BLOB_CREDENTIALS_BUFFER_DURATION: chrono::Duration = chrono::Duration::minutes(15);
}

lazy_static! {
static ref MANAGED_IDENTITY_TOKEN: Mutex<Option<TokenResponse>> = Default::default();
}

/// MaybeFrom is basically just TryFrom that restricts the Err type to be the
/// From type. This allows us to consume the from value on success, and return it
/// on failure. This could probably also be done using TryFrom in combination with
Expand Down Expand Up @@ -542,6 +550,13 @@ impl ObjectAccessTrait for BlobObjectAccess {
}

async fn delete_objects(&self, stream: &mut (dyn Stream<Item = String> + Send + Unpin)) {
// azure-sdk-for-rust does not yet support batched deletion of objects. So we issue
// object deletion requests in parallel. This means we open many TCP connections in
// parallel. Experimentally, we see that we get good performance with about half the
// default batch size that we use with S3 which is 1000. This also avoids running out
// of file desciptors when the ulimit is low.
let batch_size = *OBJECT_DELETION_BATCH_SIZE / 2;

stream
.map(|key| async move {
let op = self.access_stats.begin(ObjectAccessOpType::ObjectDelete);
Expand Down Expand Up @@ -576,7 +591,7 @@ impl ObjectAccessTrait for BlobObjectAccess {
.unwrap();
op.end(0);
})
.buffered(*OBJECT_DELETION_BATCH_SIZE)
.buffered(batch_size)
.count()
.await;
}
Expand Down Expand Up @@ -668,6 +683,12 @@ impl ObjectAccessTrait for BlobObjectAccess {
fn supports_list_after(&self) -> bool {
false
}

/// Unfortunately, the Azure SDK doesn't support the Management Policy API, which allows us to
/// access retention policies. When that is implemented, we can properly check for retention.
async fn has_retention(&self) -> bool {
false
}
}

// Creation of a BlobObjectAccess object with invalid credentials can cause a crash as the azure sdk
Expand Down Expand Up @@ -699,6 +720,24 @@ async fn get_azure_storage_client_with_managed_key_profile(
get_azure_storage_client_with_managed_key(endpoint, azure_account).await
}

async fn get_cached_bearer_token(
creds: ImdsManagedIdentityCredential,
) -> Result<TokenResponse, ManagedIdentityCredentialError> {
let mut maybe_managed_token = MANAGED_IDENTITY_TOKEN.lock().await;
if let Some(managed_token) = maybe_managed_token.clone() {
// Cached credential has not expired
if managed_token.expires_on > Utc::now() + *BLOB_CREDENTIALS_BUFFER_DURATION {
return Ok(managed_token);
}
}

info!("Fetching managed identity credential");
let bearer_token = creds.get_token("https://storage.azure.com/").await?;
maybe_managed_token.replace(bearer_token.clone());

Ok(bearer_token)
}

async fn get_azure_storage_client_with_managed_key(
endpoint: Option<String>,
azure_account: &str,
Expand All @@ -720,7 +759,8 @@ async fn get_azure_storage_client_with_managed_key(
// See: https://github.com/Azure/azure-sdk-for-rust/pull/673
let creds = ImdsManagedIdentityCredential::default();

let bearer_token = creds.get_token("https://storage.azure.com/").await?;
let bearer_token = get_cached_bearer_token(creds).await?;

let expires_on = bearer_token.expires_on;
let client = StorageAccountClient::new_bearer_token(
http_client.clone(),
Expand Down
6 changes: 6 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/object_access/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ impl ObjectAccess {
pub fn collect_stats(&self) -> HashMap<String, StatMapValue> {
self.as_trait().collect_stats()
}

pub async fn has_retention(&self) -> bool {
self.as_trait().has_retention().await
}
}

enum BucketAccessEnum {
Expand Down Expand Up @@ -645,6 +649,8 @@ pub trait ObjectAccessTrait: Send + Sync {
fn collect_stats(&self) -> HashMap<String, StatMapValue>;

fn supports_list_after(&self) -> bool;

async fn has_retention(&self) -> bool;
}

#[derive(Debug)]
Expand Down
42 changes: 42 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/object_access/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ use rusoto_credential::InstanceMetadataProvider;
use rusoto_credential::ProfileProvider;
use rusoto_s3::Delete;
use rusoto_s3::DeleteObjectsRequest;
use rusoto_s3::GetBucketLifecycleConfigurationError;
use rusoto_s3::GetBucketLifecycleConfigurationOutput;
use rusoto_s3::GetBucketLifecycleConfigurationRequest;
use rusoto_s3::GetObjectError;
use rusoto_s3::GetObjectRequest;
use rusoto_s3::HeadObjectRequest;
Expand Down Expand Up @@ -522,4 +525,43 @@ impl ObjectAccessTrait for S3ObjectAccess {
fn supports_list_after(&self) -> bool {
true
}

async fn has_retention(&self) -> bool {
let client = self.client.clone();
let result: Result<
GetBucketLifecycleConfigurationOutput,
OAError<GetBucketLifecycleConfigurationError>,
> = retry(&format!("has_retention {}", self.bucket), None, || async {
let config = GetBucketLifecycleConfigurationRequest {
bucket: self.bucket.clone(),
expected_bucket_owner: None,
};
Ok(client.get_bucket_lifecycle_configuration(config).await?)
})
.await;
match result {
Ok(output) => output
.rules
.map(|v| {
v.iter().any(|rule| {
rule.status == "Enabled"
&& rule
.expiration
.as_ref()
.map(|expiration| {
expiration.days.is_some() || expiration.date.is_some()
})
.unwrap_or(false)
})
})
.unwrap_or(false),
Err(OAError::RequestError(RequestError::Unknown(response))) => {
if response.status() == StatusCode::NOT_FOUND {
return false;
}
panic!("has_retention error: {:?}", response);
}
e => panic!("has_retention error: {:?}", e),
}
}
}
19 changes: 12 additions & 7 deletions cmd/zfs_object_agent/zettaobject/src/object_based_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::future::join_all;
use futures::stream;
use futures::stream::StreamExt;
use futures_core::Stream;
use itertools::Either;
use log::*;
use serde::de::DeserializeOwned;
use serde::Deserialize;
Expand Down Expand Up @@ -252,7 +253,7 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
}

/// Iterates the on-disk state; panics if there are pending changes.
pub fn iterate(&self) -> impl Stream<Item = T> {
pub fn iterate(&self) -> impl Stream<Item = Result<T>> {
assert_eq!(self.num_flushed_chunks, self.num_chunks);
assert!(self.pending_entries.is_empty());
assert!(self.pending_flushes.is_empty());
Expand All @@ -265,7 +266,7 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
fn iter_impl(
&self,
first_chunk_opt: Option<ObjectBasedLogRemainder>,
) -> (impl Stream<Item = T>, ObjectBasedLogRemainder) {
) -> (impl Stream<Item = Result<T>>, ObjectBasedLogRemainder) {
let first_chunk = match first_chunk_opt {
Some(remainder) => remainder.chunk,
None => 0,
Expand All @@ -277,9 +278,8 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
let shared_state = shared_state.clone();
let name = name.clone();
async move {
ObjectBasedLogChunk::get(&shared_state.object_access, &name, generation, chunk)
ObjectBasedLogChunk::<T>::get(&shared_state.object_access, &name, generation, chunk)
.await
.unwrap()
}
});
(
Expand All @@ -288,7 +288,12 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
// connections we'd run into the open file descriptor limit.
stream::iter(futures)
.buffered(*OBJECT_LOG_ITERATE_QUEUE_DEPTH)
.flat_map(|chunk| stream::iter(chunk.entries.into_iter())),
.flat_map(|chunk| {
stream::iter(match chunk {
Ok(c) => Either::Left(c.entries.into_iter().map(|e| Ok(e))),
Err(e) => Either::Right(vec![Err(e)].into_iter()),
})
}),
ObjectBasedLogRemainder {
chunk: self.num_flushed_chunks,
},
Expand All @@ -298,7 +303,7 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
/// Iterates the on-disk state; pending changes (including pending_entries
/// and pending_flushes) will not be visited. Returns token for iterating
/// the remainder (entries after those visited here).
pub fn iter_most(&self) -> (impl Stream<Item = T>, ObjectBasedLogRemainder) {
pub fn iter_most(&self) -> (impl Stream<Item = Result<T>>, ObjectBasedLogRemainder) {
self.iter_impl(None)
}

Expand All @@ -312,6 +317,6 @@ impl<T: ObjectBasedLogEntry> ObjectBasedLog<T> {
self.flush(txg).await;
// XXX It would be faster if we kept all the "remainder" entries in RAM
// until we iter the remainder and transfer it to the new generation.
self.iter_impl(Some(first_chunk)).0
self.iter_impl(Some(first_chunk)).0.map(|r| r.unwrap())
}
}
17 changes: 8 additions & 9 deletions cmd/zfs_object_agent/zettaobject/src/object_block_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::RwLock;
use std::time::Instant;

use futures::future;
use futures::StreamExt;
use futures::TryStreamExt;
use log::*;
use more_asserts::*;
use serde::Deserialize;
Expand Down Expand Up @@ -41,14 +41,14 @@ impl ObjectBlockMap {
pub async fn load(
storage_object_log: &ObjectBasedLog<StorageObjectLogEntry>,
next_block: BlockId,
) -> Self {
) -> Result<Self, anyhow::Error> {
let begin = Instant::now();
let mut num_alloc_entries: u64 = 0;
let mut num_free_entries: u64 = 0;
let mut map: BTreeSet<ObjectId> = BTreeSet::new();
storage_object_log
.iterate()
.for_each(|ent| {
.try_for_each(|ent| {
match ent {
StorageObjectLogEntry::Alloc { object } => {
let inserted = with_alloctag(Self::MAP_TAG, || map.insert(object));
Expand All @@ -62,20 +62,19 @@ impl ObjectBlockMap {
}
}

future::ready(())
future::ready(Ok(()))
})
.await;
.await?;
info!(
"loaded mapping from {} objects with {} allocs and {} frees in {}ms",
storage_object_log.num_chunks,
num_alloc_entries,
num_free_entries,
begin.elapsed().as_millis()
begin.elapsed().as_millis(),
);

ObjectBlockMap {
Ok(ObjectBlockMap {
state: RwLock::new(ObjectBlockMapState { map, next_block }),
}
})
}

pub fn insert(&self, object: ObjectId, next_block: BlockId) {
Expand Down
Loading

0 comments on commit 5042d88

Please sign in to comment.