diff --git a/cmd/zfs_object_agent/client/src/main.rs b/cmd/zfs_object_agent/client/src/main.rs
index 58b1745a4a57..a22b69200ead 100644
--- a/cmd/zfs_object_agent/client/src/main.rs
+++ b/cmd/zfs_object_agent/client/src/main.rs
@@ -4,7 +4,6 @@ use clap::Arg;
use clap::SubCommand;
use client::Client;
use futures::stream::StreamExt;
-use lazy_static::lazy_static;
use nvpair::*;
use rand::prelude::*;
use rusoto_core::ByteStream;
@@ -15,7 +14,6 @@ use rusoto_credential::ProfileProvider;
use rusoto_credential::ProvideAwsCredentials;
use rusoto_s3::*;
use std::collections::BTreeSet;
-use std::env;
use std::error::Error;
use std::fs;
use std::fs::File;
@@ -35,13 +33,6 @@ const BUCKET_NAME: &str = "cloudburst-data-2";
const POOL_NAME: &str = "testpool";
const POOL_GUID: u64 = 1234;
-lazy_static! {
- static ref AWS_PREFIX: String = match env::var("AWS_PREFIX") {
- Ok(val) => format!("{}/", val),
- Err(_) => "".to_string(),
- };
-}
-
async fn do_rusoto_provider
(credentials_provider: P, file: &str)
where
P: ProvideAwsCredentials + Send + Sync + 'static,
@@ -323,34 +314,21 @@ async fn print_super(
);
}
Err(_e) => {
- /*
- * XXX Pool::get_config() only works for pools under the AWS_PREFIX because it assumes the
- * path to the "super" object.
- */
- if AWS_PREFIX.len() == 0 && !pool_key.starts_with("zfs/") {
- println!("\t(pool inside an alt AWS_PREFIX)");
- } else {
- println!("\t-unknown format-");
- };
+ println!("\t-unknown format-");
}
}
}
}
-fn strip_prefix(prefix: &str) -> &str {
- if prefix.starts_with(AWS_PREFIX.as_str()) {
- &prefix[AWS_PREFIX.len()..]
- } else {
- prefix
- }
-}
-
async fn find_old_pools(object_access: &ObjectAccess, min_age: Duration) -> Vec {
- let pool_keys: Vec = object_access.list_prefixes("zfs/").collect().await;
+ let pool_keys: Vec = object_access
+ .list_prefixes("zfs/".to_string())
+ .collect()
+ .await;
let mut vec = Vec::new();
for pool_key in pool_keys {
match object_access
- .head_object(strip_prefix(&format!("{}super", pool_key)))
+ .head_object(format!("{}super", pool_key))
.await
{
Some(output) => {
@@ -358,7 +336,7 @@ async fn find_old_pools(object_access: &ObjectAccess, min_age: Duration) -> Vec<
DateTime::parse_from_rfc2822(output.last_modified.as_ref().unwrap()).unwrap();
print_super(object_access, &pool_key, &mod_time).await;
if has_expired(&mod_time, min_age) {
- vec.push(strip_prefix(&pool_key).to_string());
+ vec.push(pool_key);
} else {
println!(
"Skipping pool as it is not {} days old.",
@@ -382,7 +360,7 @@ async fn do_list_pools(
// Lookup all objects in the pool.
if list_all_objects {
object_access
- .list_objects(&pool_key, None, false)
+ .list_objects(pool_key, None, false)
.for_each(|object| async move { println!(" {}", object) })
.await;
}
@@ -396,7 +374,7 @@ async fn do_destroy_old_pools(
) -> Result<(), Box> {
for pool_keys in find_old_pools(object_access, min_age).await {
object_access
- .delete_objects(object_access.list_objects(&pool_keys, None, false))
+ .delete_objects(object_access.list_objects(pool_keys, None, false))
.await;
}
Ok(())
@@ -430,8 +408,8 @@ async fn do_test_connectivity(object_access: &ObjectAccess) {
let file = format!("test/test_connectivity_{}", num);
let content = "test connectivity to S3".as_bytes().to_vec();
- object_access.put_object(&file, content).await;
- object_access.delete_object(&file).await;
+ object_access.put_object(file.clone(), content).await;
+ object_access.delete_object(file).await;
}
async fn test_connectivity(object_access: &ObjectAccess) -> Result<(), Box> {
diff --git a/cmd/zfs_object_agent/object_perf/src/main.rs b/cmd/zfs_object_agent/object_perf/src/main.rs
index c719d9444aa5..be53ac3772e0 100644
--- a/cmd/zfs_object_agent/object_perf/src/main.rs
+++ b/cmd/zfs_object_agent/object_perf/src/main.rs
@@ -136,12 +136,12 @@ async fn main() {
println!("Using prefix: '{}'", key_prefix);
match matches.subcommand() {
("write", Some(_matches)) => {
- s3perf::write_test(&object_access, &key_prefix, objsize_bytes, qdepth, duration)
+ s3perf::write_test(&object_access, key_prefix, objsize_bytes, qdepth, duration)
.await
.unwrap();
}
("read", Some(_matches)) => {
- s3perf::read_test(&object_access, &key_prefix, objsize_bytes, qdepth, duration)
+ s3perf::read_test(&object_access, key_prefix, objsize_bytes, qdepth, duration)
.await
.unwrap();
}
diff --git a/cmd/zfs_object_agent/object_perf/src/s3perf.rs b/cmd/zfs_object_agent/object_perf/src/s3perf.rs
index a071d8d18d15..f550d178b441 100644
--- a/cmd/zfs_object_agent/object_perf/src/s3perf.rs
+++ b/cmd/zfs_object_agent/object_perf/src/s3perf.rs
@@ -29,16 +29,16 @@ impl Perf {
#[measure(InFlight)]
#[measure(Throughput)]
#[measure(HitCount)]
- async fn put(&self, object_access: &ObjectAccess, key: &str, data: Vec) {
- object_access.put_object(&key.to_string(), data).await;
+ async fn put(&self, object_access: &ObjectAccess, key: String, data: Vec) {
+ object_access.put_object(key, data).await;
}
#[measure(type = ResponseTime)]
#[measure(InFlight)]
#[measure(Throughput)]
#[measure(HitCount)]
- async fn get(&self, object_access: &ObjectAccess, key: &str) {
- object_access.get_object(&key.to_string()).await.unwrap();
+ async fn get(&self, object_access: &ObjectAccess, key: String) {
+ object_access.get_object(key).await.unwrap();
}
fn log_metrics(&self, duration: Duration) {
@@ -60,7 +60,7 @@ impl Perf {
duration: Duration,
) {
let num_objects = object_access
- .list_objects(&key_prefix, None, true)
+ .list_objects(key_prefix.clone(), None, true)
.fold(0, |count, _key| async move { count + 1 })
.await;
let mut key_id = 0;
@@ -74,7 +74,7 @@ impl Perf {
my_perf
.get(
&my_object_access,
- &format!("{}{}", my_key_prefix, key_id % num_objects + 1),
+ format!("{}{}", my_key_prefix, key_id % num_objects + 1),
)
.await;
})
@@ -106,7 +106,7 @@ impl Perf {
my_perf
.put(
&my_object_access,
- &format!("{}{}", my_key_prefix, key_id),
+ format!("{}{}", my_key_prefix, key_id),
my_data,
)
.await
@@ -135,7 +135,7 @@ impl Perf {
pub async fn write_test(
object_access: &ObjectAccess,
- key_prefix: &str,
+ key_prefix: String,
objsize: u64,
qdepth: u64,
duration: Duration,
@@ -144,14 +144,8 @@ pub async fn write_test(
let bounds = WriteTestBounds::Time(duration);
perf.log_metrics(Duration::from_secs(1));
- perf.write_objects(
- object_access,
- key_prefix.to_string(),
- objsize,
- qdepth,
- bounds,
- )
- .await;
+ perf.write_objects(object_access, key_prefix.clone(), objsize, qdepth, bounds)
+ .await;
println!("{:#?}", perf.metrics.put);
@@ -164,7 +158,7 @@ pub async fn write_test(
pub async fn read_test(
object_access: &ObjectAccess,
- key_prefix: &str,
+ key_prefix: String,
objsize: u64,
qdepth: u64,
duration: Duration,
@@ -173,16 +167,10 @@ pub async fn read_test(
let bounds = WriteTestBounds::Objects(max(qdepth * 10, 200));
perf.log_metrics(Duration::from_secs(1));
- perf.write_objects(
- object_access,
- key_prefix.to_string(),
- objsize,
- qdepth,
- bounds,
- )
- .await;
+ perf.write_objects(object_access, key_prefix.clone(), objsize, qdepth, bounds)
+ .await;
- perf.read_objects(object_access, key_prefix.to_string(), qdepth, duration)
+ perf.read_objects(object_access, key_prefix.clone(), qdepth, duration)
.await;
println!("{:#?}", perf.metrics.get);
diff --git a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs
index e86ffee9e9b6..6a394799a918 100644
--- a/cmd/zfs_object_agent/zettacache/src/block_based_log.rs
+++ b/cmd/zfs_object_agent/zettacache/src/block_based_log.rs
@@ -277,7 +277,7 @@ impl BlockBasedLog {
// XXX handle checksum error here
let (chunk, consumed): (BlockBasedLogChunk, usize) = block_access
.chunk_from_raw(&extent_bytes[total_consumed..])
- .context(format!("{:?} at {:?}", chunk_id, chunk_location))
+ .with_context(|| format!("{:?} at {:?}", chunk_id, chunk_location))
.unwrap();
assert_eq!(chunk.id, chunk_id);
for entry in chunk.entries {
diff --git a/cmd/zfs_object_agent/zettaobject/src/data_object.rs b/cmd/zfs_object_agent/zettaobject/src/data_object.rs
index bad93b428496..56341153c3a8 100644
--- a/cmd/zfs_object_agent/zettaobject/src/data_object.rs
+++ b/cmd/zfs_object_agent/zettaobject/src/data_object.rs
@@ -99,27 +99,45 @@ impl DataObjectPhys {
pub async fn get(
object_access: &ObjectAccess,
guid: PoolGuid,
- obj: ObjectId,
+ object: ObjectId,
bypass_cache: bool,
) -> Result {
- let this = Self::get_from_key(object_access, &Self::key(guid, obj), bypass_cache).await?;
+ let buf = match bypass_cache {
+ true => {
+ object_access
+ .get_object_uncached(Self::key(guid, object))
+ .await?
+ }
+ false => object_access.get_object(Self::key(guid, object)).await?,
+ };
+ let begin = Instant::now();
+ let this: DataObjectPhys = bincode::deserialize(&buf)
+ .with_context(|| format!("Failed to decode contents of {}", Self::key(guid, object)))?;
+ trace!(
+ "{:?}: deserialized {} blocks from {} bytes in {}ms",
+ this.object,
+ this.blocks.len(),
+ buf.len(),
+ begin.elapsed().as_millis()
+ );
assert_eq!(this.guid, guid);
- assert_eq!(this.object, obj);
+ assert_eq!(this.object, object);
+ this.verify();
Ok(this)
}
pub async fn get_from_key(
object_access: &ObjectAccess,
- key: &str,
+ key: String,
bypass_cache: bool,
) -> Result {
let buf = match bypass_cache {
- true => object_access.get_object_uncached(key).await?,
- false => object_access.get_object(key).await?,
+ true => object_access.get_object_uncached(key.clone()).await?,
+ false => object_access.get_object(key.clone()).await?,
};
let begin = Instant::now();
- let this: DataObjectPhys =
- bincode::deserialize(&buf).context(format!("Failed to decode contents of {}", key))?;
+ let this: DataObjectPhys = bincode::deserialize(&buf)
+ .with_context(|| format!("Failed to decode contents of {}", key))?;
trace!(
"{:?}: deserialized {} blocks from {} bytes in {}ms",
this.object,
@@ -143,7 +161,7 @@ impl DataObjectPhys {
);
self.verify();
object_access
- .put_object(&Self::key(self.guid, self.object), contents)
+ .put_object(Self::key(self.guid, self.object), contents)
.await;
}
diff --git a/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs b/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs
index 44db39374cca..2b6f7a8fd3e5 100644
--- a/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs
+++ b/cmd/zfs_object_agent/zettaobject/src/heartbeat.rs
@@ -35,10 +35,9 @@ impl HeartbeatPhys {
}
pub async fn get(object_access: &ObjectAccess, id: Uuid) -> anyhow::Result {
- let key = Self::key(id);
- let buf = object_access.get_object_impl(&key, None).await?;
+ let buf = object_access.get_object_impl(Self::key(id), None).await?;
let this: Self = serde_json::from_slice(&buf)
- .context(format!("Failed to decode contents of {}", key))?;
+ .with_context(|| format!("Failed to decode contents of {}", Self::key(id)))?;
debug!("got {:#?}", this);
assert_eq!(this.id, id);
Ok(this)
@@ -53,12 +52,12 @@ impl HeartbeatPhys {
debug!("putting {:#?}", self);
let buf = serde_json::to_vec(&self).unwrap();
object_access
- .put_object_timed(&Self::key(self.id), buf, timeout)
+ .put_object_timed(Self::key(self.id), buf, timeout)
.await
}
pub async fn delete(object_access: &ObjectAccess, id: Uuid) {
- object_access.delete_object(&Self::key(id)).await;
+ object_access.delete_object(Self::key(id)).await;
}
}
diff --git a/cmd/zfs_object_agent/zettaobject/src/object_access.rs b/cmd/zfs_object_agent/zettaobject/src/object_access.rs
index 0f39c5de99fe..1b8a0f689ac0 100644
--- a/cmd/zfs_object_agent/zettaobject/src/object_access.rs
+++ b/cmd/zfs_object_agent/zettaobject/src/object_access.rs
@@ -15,11 +15,11 @@ use rusoto_core::{ByteStream, RusotoError};
use rusoto_credential::{AutoRefreshingProvider, ChainProvider, ProfileProvider};
use rusoto_s3::*;
use std::convert::TryFrom;
+use std::error::Error;
use std::iter;
use std::sync::Arc;
use std::time::Instant;
use std::{collections::HashMap, fmt::Display};
-use std::{env, error::Error};
use tokio::{sync::watch, time::error::Elapsed};
use zettacache::get_tunable;
@@ -34,10 +34,6 @@ lazy_static! {
cache: LruCache::new(100),
reading: HashMap::new(),
});
- static ref PREFIX: String = match env::var("AWS_PREFIX") {
- Ok(val) => format!("{}/", val),
- Err(_) => "".to_string(),
- };
static ref NON_RETRYABLE_ERRORS: Vec = vec![
StatusCode::BAD_REQUEST,
StatusCode::FORBIDDEN,
@@ -62,20 +58,6 @@ pub struct ObjectAccess {
credentials_profile: Option,
}
-/*
- * For testing, prefix all object keys with this string. In cases where objects are returned from
- * a call like list_objects and then fetched with get, we could end up doubling the prefix. We
- * could either strip the prefix from the beginning of every object we return, or we can only
- * prefix an object if it isn't already prefixed. We do the latter here, for conciseness, but in
- * the future we may want to revisit this decision.
- */
-fn prefixed(key: &str) -> String {
- match key.starts_with(format!("{}zfs", *PREFIX).as_str()) {
- true => key.to_string(),
- false => format!("{}{}", *PREFIX, key),
- }
-}
-
#[derive(Debug)]
#[allow(clippy::upper_case_acronyms)]
pub enum OAError {
@@ -259,12 +241,12 @@ impl ObjectAccess {
self.client
}
- pub async fn get_object_impl(&self, key: &str, timeout: Option) -> Result> {
- let msg = format!("get {}", prefixed(key));
+ pub async fn get_object_impl(&self, key: String, timeout: Option) -> Result> {
+ let msg = format!("get {}", key);
let v = retry(&msg, timeout, || async {
let req = GetObjectRequest {
bucket: self.bucket_str.clone(),
- key: prefixed(key),
+ key: key.clone(),
..Default::default()
};
let output = self.client.get_object(req).await?;
@@ -304,8 +286,8 @@ impl ObjectAccess {
Ok(v)
}
- pub async fn get_object_uncached(&self, key: &str) -> Result>> {
- let vec = self.get_object_impl(key, None).await?;
+ pub async fn get_object_uncached(&self, key: String) -> Result>> {
+ let vec = self.get_object_impl(key.clone(), None).await?;
// Note: we *should* have the same data from S3 (in the `vec`) and in
// the cache, so this invalidation is normally not necessary. However,
// in case a bug (or undetected RAM error) resulted in incorrect cached
@@ -315,26 +297,25 @@ impl ObjectAccess {
Ok(Arc::new(vec))
}
- pub async fn get_object(&self, key: &str) -> Result>> {
+ pub async fn get_object(&self, key: String) -> Result>> {
let either = {
// need this block separate so that we can drop the mutex before the .await
let mut c = CACHE.lock().unwrap();
- let mykey = key.to_string();
- match c.cache.get(&mykey) {
+ match c.cache.get(&key) {
Some(v) => {
debug!("found {} in cache", key);
return Ok(v.clone());
}
- None => match c.reading.get(key) {
+ None => match c.reading.get(&key) {
None => {
let (tx, rx) = watch::channel::