Skip to content

Commit

Permalink
DLPX-81196 DOSE-Azure: Update zoa_test list-pools|destroy-old-pools t…
Browse files Browse the repository at this point in the history
…o work with Azure-Blob (openzfs#439)
  • Loading branch information
manoj-joseph authored May 20, 2022
1 parent dabd402 commit fb93d7c
Showing 1 changed file with 151 additions and 79 deletions.
230 changes: 151 additions & 79 deletions cmd/zfs_object_agent/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use zettaobject::object_access::ObjectAccess;
use zettaobject::object_access::ObjectAccessProtocol;
use zettaobject::object_access::S3Credentials;

const PROTOCOL: &str = "s3";
const ENDPOINT: &str = "https://s3-us-west-2.amazonaws.com";
const REGION: &str = "us-west-2";
const BUCKET_NAME: &str = "cloudburst-data-2";
Expand Down Expand Up @@ -442,58 +443,22 @@ async fn do_dump_object(
};
}

async fn get_object_access(
endpoint: &str,
region: &str,
bucket: &str,
profile: &str,
aws_access_key_id: Option<&str>,
aws_secret_access_key: Option<&str>,
) -> Arc<ObjectAccess> {
let credentials = match aws_access_key_id {
None => S3Credentials::Profile(profile.to_string()),
Some(aws_access_key_id) =>
// If access_id is specified, aws_secret_access_key should also be specified.
{
S3Credentials::Key {
aws_access_key_id: aws_access_key_id.to_string(),
aws_secret_access_key: aws_secret_access_key.unwrap().to_string(),
}
}
};
ObjectAccess::new(
ObjectAccessProtocol::S3 {
endpoint: endpoint.to_string(),
region: region.to_string(),
credentials,
},
bucket.to_string(),
false,
)
.await
.unwrap()
async fn get_object_access(cli_params: CliParams) -> Arc<ObjectAccess> {
ObjectAccess::new(cli_params.oa_protocol, cli_params.bucket, false)
.await
.unwrap()
}

async fn do_blob(bucket: String, profile: String) -> Result<(), Box<dyn Error>> {
async fn do_blob(cli_params: CliParams) -> Result<(), Box<dyn Error>> {
let key = "blob2.txt".to_string();
let bucket_access = BucketAccess::new(ObjectAccessProtocol::Blob {
credentials: BlobCredentials::Profile(profile.clone()),
})
.await?;
let bucket_access = BucketAccess::new(cli_params.oa_protocol.clone()).await?;
let buckets = bucket_access.list_buckets().await;
println!("List containers {:?}", buckets);
if !buckets.contains(&bucket) {
return Err(format!("Bucket {} not found.", bucket).into());
if !buckets.contains(&cli_params.bucket) {
return Err(format!("Bucket {} not found.", cli_params.bucket).into());
}

let object_access = ObjectAccess::new(
ObjectAccessProtocol::Blob {
credentials: BlobCredentials::Profile(profile.clone()),
},
bucket,
false,
)
.await?;
let object_access = ObjectAccess::new(cli_params.oa_protocol, cli_params.bucket, false).await?;

let content = "I want to go to azure".as_bytes().to_vec();
object_access
Expand Down Expand Up @@ -525,16 +490,11 @@ async fn do_blob(bucket: String, profile: String) -> Result<(), Box<dyn Error>>
Ok(())
}

async fn do_blob_loop(bucket: String, seconds: u64) -> Result<(), Box<dyn Error>> {
async fn do_blob_loop(
object_access: &Arc<ObjectAccess>,
seconds: u64,
) -> Result<(), Box<dyn Error>> {
let key = "blob_loop.txt".to_string();
let object_access = ObjectAccess::new(
ObjectAccessProtocol::Blob {
credentials: BlobCredentials::Automatic,
},
bucket,
false,
)
.await?;

let start = Instant::now();
let mut count = 1;
Expand All @@ -556,31 +516,42 @@ async fn do_blob_loop(bucket: String, seconds: u64) -> Result<(), Box<dyn Error>
Ok(())
}

#[derive(Parser)]
#[derive(Clone, Parser)]
//#[clap(long_about = None)]
#[clap(version=GIT_VERSION)]
#[clap(about = "ZFS Object Agent test")]
#[clap(propagate_version = true)]
struct Cli {
#[clap(short, long, help = "S3 endpoint", default_value = ENDPOINT)]
#[clap(short, long, help = "protocol", default_value = PROTOCOL, arg_enum)]
protocol: Protocol,
#[clap(short, long, help = "endpoint", default_value = ENDPOINT)]
endpoint: String,
#[clap(short, long, help = "S3 region", default_value = REGION)]
#[clap(short, long, help = "region", default_value = REGION)]
region: String,
#[clap(short, long, help = "S3 bucket", default_value = BUCKET_NAME)]
#[clap(short, long, help = "bucket", default_value = BUCKET_NAME)]
bucket: String,
#[clap(short, long, help = "credentials profile", default_value = "default")]
profile: String,
#[clap(short = 'c', long, help = "credentials profile")]
credentials_profile: Option<String>,

#[clap(short = 'i', long, requires = "aws-secret-access-key")]
aws_access_key_id: Option<String>,
#[clap(short = 's', long, requires = "aws-access-key-id")]
aws_secret_access_key: Option<String>,

#[clap(short = 'a', long)]
azure_account: Option<String>,
#[clap(short = 'k', long, conflicts_with = "managed-identity")]
azure_key: Option<String>,
#[clap(short = 'm', long)]
managed_identity: bool,

#[clap(short, long, parse(from_occurrences))]
verbose: usize,
#[clap(subcommand)]
command: Commands,
}

#[derive(Subcommand)]
#[derive(Clone, Subcommand)]
enum Commands {
S3Rusoto,
Blob,
Expand Down Expand Up @@ -613,40 +584,141 @@ enum Commands {
},
}

#[derive(Clone, clap::ArgEnum)]
enum Protocol {
S3,
Blob,
}

#[derive(Clone)]
struct CliParams {
oa_protocol: ObjectAccessProtocol,
bucket: String,
}

impl CliParams {
/// Perform extra validation that are not easliy expressed in clap.
fn validate_params(cli: &Cli) {
let valid = match cli.protocol {
Protocol::S3 => {
let mut valid: bool = true;
if cli.azure_account.is_some() {
writeln_stderr!("Error: azure_account is valid only for blob protocol.");
valid = false;
}
if cli.azure_key.is_some() {
writeln_stderr!("Error: azure_key is valid only for blob protocol.");
valid = false;
}
if cli.managed_identity {
writeln_stderr!("Error: managed_identity is valid only for blob protocol.");
valid = false;
}
valid
}
Protocol::Blob => {
let mut valid: bool = true;
if cli.aws_access_key_id.is_some() {
writeln_stderr!("Error: aws_access_key_id is valid only for s3 protocol.");
valid = false;
}
if cli.aws_secret_access_key.is_some() {
writeln_stderr!("Error: aws_secret_access_key is valid only for s3 protocol.");
valid = false;
}

valid
}
};
if !valid {
std::process::exit(1);
}
}
}

impl From<Cli> for CliParams {
fn from(cli: Cli) -> Self {
CliParams::validate_params(&cli);

CliParams {
oa_protocol: match cli.protocol {
Protocol::S3 => {
let credentials = match cli.aws_access_key_id {
None => match cli.credentials_profile {
Some(profile) => S3Credentials::Profile(profile),
None => S3Credentials::Automatic,
},
Some(aws_access_key_id) =>
// If access_id is specified, aws_secret_access_key should also be
// specified.
{
S3Credentials::Key {
aws_access_key_id,
aws_secret_access_key: cli.aws_secret_access_key.unwrap(),
}
}
};

ObjectAccessProtocol::S3 {
endpoint: cli.endpoint,
region: cli.region,
credentials,
}
}
Protocol::Blob => ObjectAccessProtocol::Blob {
credentials: if cli.managed_identity {
BlobCredentials::ManagedCredentials {
azure_account: cli.azure_account.unwrap(),
}
} else {
match cli.azure_key {
Some(azure_key) => BlobCredentials::Key {
azure_account: cli.azure_account.unwrap(),
azure_key,
},
None => match cli.credentials_profile {
Some(profile) => BlobCredentials::Profile(profile),
None => BlobCredentials::Automatic,
},
}
},
},
},
bucket: cli.bucket,
}
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let cli = Cli::parse();

if cli.verbose > 0 {
println!(
"endpoint: {}, region: {}, bucket: {} profile: {} access_id: {:?}, secret_key: {:?}",
"endpoint: {}, region: {}, bucket: {} profile: {:?} access_id: {:?}, secret_key: {:?} \
azure_key {:?} azure_account {:?} managed_identity {}",
cli.endpoint,
cli.region,
cli.bucket,
cli.profile,
cli.credentials_profile,
cli.aws_access_key_id,
cli.aws_secret_access_key
cli.aws_secret_access_key,
cli.azure_key,
cli.azure_account,
cli.managed_identity
);
}

setup_logging(cli.verbose.try_into().unwrap(), None, None, false);

let object_access = get_object_access(
&cli.endpoint,
&cli.region,
&cli.bucket,
&cli.profile,
cli.aws_access_key_id.as_deref(),
cli.aws_secret_access_key.as_deref(),
)
.await;
if cli.verbose > 0 {
setup_logging(cli.verbose as u64, None, None, false);
}

setup_logging(cli.verbose as u64, None, None, false);
let cli_params = CliParams::from(cli.clone());
let object_access = get_object_access(cli_params.clone()).await;

match cli.command {
Commands::S3Rusoto => do_s3_rusoto().await?,
Commands::Blob => do_blob(cli.bucket, cli.profile).await?,
Commands::BlobLoop { seconds } => do_blob_loop(cli.bucket, seconds).await?,
Commands::Blob => do_blob(cli_params).await?,
Commands::BlobLoop { seconds } => do_blob_loop(&object_access, seconds).await?,
Commands::Create => do_create().await?,
Commands::Write => do_write().await?,
Commands::Read => do_read().await?,
Expand Down

0 comments on commit fb93d7c

Please sign in to comment.