Skip to content

Commit

Permalink
test_connectivity should not crash (openzfs#529)
Browse files Browse the repository at this point in the history
  • Loading branch information
manoj-joseph authored Nov 2, 2021
1 parent cea597e commit 97f5149
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 75 deletions.
1 change: 1 addition & 0 deletions cmd/zfs_object_agent/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/zfs_object_agent/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
async-recursion = "0.3.2"
chrono = "0.4.19"
clap = "2"
Expand Down
39 changes: 1 addition & 38 deletions cmd/zfs_object_agent/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::time::{Duration, Instant};
use tokio::io::AsyncReadExt;
use zettacache::base_types::*;
use zettaobject::base_types::*;
use zettaobject::ObjectAccess;
use zettaobject::Pool;
use zettaobject::{ObjectAccess, ObjectAccessStatType};
mod client;

const ENDPOINT: &str = "https://s3-us-west-2.amazonaws.com";
Expand Down Expand Up @@ -411,39 +411,6 @@ fn get_object_access(
}
}

// Test by writing and deleting an object.
async fn do_test_connectivity(object_access: &ObjectAccess) {
let num = thread_rng().gen::<u64>();
let file = format!("test/test_connectivity_{}", num);
let content = "test connectivity to S3".as_bytes().to_vec();

object_access
.put_object(
file.clone(),
content.into(),
ObjectAccessStatType::MetadataPut,
)
.await;
object_access.delete_object(file).await;
}

async fn test_connectivity(object_access: &ObjectAccess) -> Result<(), Box<dyn Error>> {
std::process::exit(
match tokio::time::timeout(Duration::from_secs(30), do_test_connectivity(object_access))
.await
{
Err(_) => {
eprintln!("Connectivity test failed.");
-1
}
Ok(_) => {
println!("Connectivity test succeeded.");
0
}
},
);
}

#[tokio::main]
async fn main() {
let matches = clap::App::new("zoa_test")
Expand Down Expand Up @@ -518,7 +485,6 @@ async fn main() {
.help("number of days"),
),
)
.subcommand(SubCommand::with_name("test_connectivity").about("test connectivity"))
.get_matches();

// Command line parameters
Expand Down Expand Up @@ -585,9 +551,6 @@ async fn main() {

do_destroy_old_pools(&object_access, min_age).await.unwrap();
}
("test_connectivity", Some(_matches)) => {
test_connectivity(&object_access).await.unwrap();
}
_ => {
matches.usage();
}
Expand Down
150 changes: 115 additions & 35 deletions cmd/zfs_object_agent/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use clap::Arg;
use clap::{Arg, SubCommand};
use git_version::git_version;
use log::*;
use zettaobject::test_connectivity;

static GIT_VERSION: &str = git_version!(
fallback = match option_env!("CARGO_ZOA_GITREV") {
Expand Down Expand Up @@ -62,49 +63,128 @@ fn main() {
.conflicts_with("verbosity")
.takes_value(true),
)
.subcommand(
SubCommand::with_name("test_connectivity")
.about("test connectivity")
.arg(
Arg::with_name("endpoint")
.short("e")
.long("endpoint")
.help("S3 endpoint")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("region")
.short("r")
.long("region")
.help("S3 region")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("bucket")
.short("b")
.long("bucket")
.help("S3 bucket")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("aws_access_key_id")
.short("i")
.long("aws_access_key_id")
.takes_value(true)
.requires("aws_secret_access_key")
.required_unless("aws_instance_profile")
.conflicts_with("aws_instance_profile")
.help("AWS access key id"),
)
.arg(
Arg::with_name("aws_secret_access_key")
.short("s")
.long("aws_secret_access_key")
.takes_value(true)
.requires("aws_access_key_id")
.required_unless("aws_instance_profile")
.conflicts_with("aws_instance_profile")
.help("AWS secret access key"),
)
.arg(
Arg::with_name("aws_instance_profile")
.long("aws_instance_profile")
.takes_value(false)
.help("Use AWS instance profile"),
),
)
.get_matches();

let socket_dir = matches.value_of("socket-dir").unwrap();
let cache_path = matches.value_of("cache-file");
match matches.subcommand() {
("test_connectivity", Some(cmd_options)) => {
let endpoint = cmd_options.value_of("endpoint").unwrap().to_string();
let region = cmd_options.value_of("region").unwrap().to_string();
let bucket = cmd_options.value_of("bucket").unwrap().to_string();
let aws_access_key_id = cmd_options
.value_of("aws_access_key_id")
.map(str::to_string);
let aws_secret_access_key = cmd_options
.value_of("aws_secret_access_key")
.map(str::to_string);
let aws_instance_profile = cmd_options.is_present("aws_instance_profile");

zettaobject::init::setup_logging(
matches.occurrences_of("verbosity"),
matches.value_of("output-file"),
matches.value_of("log-config"),
);
test_connectivity::test_connectivity(
endpoint,
region,
bucket,
aws_access_key_id,
aws_secret_access_key,
aws_instance_profile,
);
}
_ => {
let socket_dir = matches.value_of("socket-dir").unwrap();
let cache_path = matches.value_of("cache-file");

if let Some(file_name) = matches.value_of("config-file") {
util::read_tunable_config(file_name);
}
zettaobject::init::setup_logging(
matches.occurrences_of("verbosity"),
matches.value_of("output-file"),
matches.value_of("log-config"),
);

error!(
"Starting ZFS Object Agent ({}). Local timezone is {}",
GIT_VERSION,
chrono::Local::now().format("%Z (%:z)")
);
if let Some(file_name) = matches.value_of("config-file") {
util::read_tunable_config(file_name);
}

// error!() should be used when an invalid state is encountered; the related
// operation will fail and the program may exit. E.g. an invalid request
// was received from the client (kernel).
error!("logging level ERROR enabled");
error!(
"Starting ZFS Object Agent ({}). Local timezone is {}",
GIT_VERSION,
chrono::Local::now().format("%Z (%:z)")
);

// warn!() should be used when something unexpected has happened, but it can
// be recovered from.
warn!("logging level WARN enabled");
// error!() should be used when an invalid state is encountered; the related
// operation will fail and the program may exit. E.g. an invalid request
// was received from the client (kernel).
error!("logging level ERROR enabled");

// info!() should be used for very high level operations which are expected
// to happen infrequently (no more than once per minute in typical
// operation). e.g. opening/closing a pool, long-lived background tasks,
// things that might be in `zpool history -i`.
info!("logging level INFO enabled");
// warn!() should be used when something unexpected has happened, but it can
// be recovered from.
warn!("logging level WARN enabled");

// debug!() can be used for all but the most frequent operations.
// e.g. not every single read/write/free operation, but perhaps for every
// call to S3.
debug!("logging level DEBUG enabled");
// info!() should be used for very high level operations which are expected
// to happen infrequently (no more than once per minute in typical
// operation). e.g. opening/closing a pool, long-lived background tasks,
// things that might be in `zpool history -i`.
info!("logging level INFO enabled");

// trace!() can be used indiscriminately.
trace!("logging level TRACE enabled");
// debug!() can be used for all but the most frequent operations.
// e.g. not every single read/write/free operation, but perhaps for every
// call to S3.
debug!("logging level DEBUG enabled");

zettaobject::init::start(socket_dir, cache_path);
// trace!() can be used indiscriminately.
trace!("logging level TRACE enabled");

zettaobject::init::start(socket_dir, cache_path);
}
}
}
3 changes: 2 additions & 1 deletion cmd/zfs_object_agent/zettaobject/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ mod public_connection;
mod root_connection;
mod server;

pub use object_access::{ObjectAccess, ObjectAccessStatType, StatMapValue};
pub use object_access::{OAError, ObjectAccess, ObjectAccessStatType, StatMapValue};
pub mod test_connectivity;
pub use pool::Pool;
16 changes: 15 additions & 1 deletion cmd/zfs_object_agent/zettaobject/src/object_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use log::*;
use lru::LruCache;
use rand::prelude::*;
use rusoto_core::{ByteStream, RusotoError};
use rusoto_credential::{AutoRefreshingProvider, ChainProvider, ProfileProvider};
use rusoto_credential::{
AutoRefreshingProvider, ChainProvider, InstanceMetadataProvider, ProfileProvider,
};
use rusoto_s3::{
Delete, DeleteObjectsRequest, GetObjectRequest, HeadObjectOutput, HeadObjectRequest,
ListObjectsV2Request, ObjectIdentifier, PutObjectError, PutObjectOutput, PutObjectRequest,
Expand Down Expand Up @@ -348,6 +350,18 @@ impl ObjectAccess {
rusoto_s3::S3Client::new_with(http_client, creds, region)
}

/// Get client using the instance metadata provider and ignoring all other sources of credentials.
pub fn get_client_with_instance_profile(endpoint: &str, region_str: &str) -> S3Client {
let http_client = rusoto_core::HttpClient::new().unwrap();
let creds = InstanceMetadataProvider::new();
let region = ObjectAccess::get_custom_region(endpoint, region_str);
rusoto_s3::S3Client::new_with(http_client, creds, region)
}

/// Get client by checking in order, the following sources for credentials.
/// 1. Environment variables
/// 2. AWS credentials file
/// 3. IAM instance profile.
pub fn get_client(
endpoint: &str,
region_str: &str,
Expand Down
64 changes: 64 additions & 0 deletions cmd/zfs_object_agent/zettaobject/src/test_connectivity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::{ObjectAccess, ObjectAccessStatType};
use rand::Rng;
use std::time::Duration;

// Test by writing and deleting an object.
async fn do_test_connectivity(object_access: &ObjectAccess) -> anyhow::Result<()> {
let num: u64 = rand::thread_rng().gen();
let file = format!("test/test_connectivity_{}", num);
let content = "test connectivity to S3".as_bytes().to_vec();

object_access
.put_object_timed(
file.clone(),
content.into(),
ObjectAccessStatType::MetadataPut,
Some(Duration::from_secs(30)),
)
.await?;

object_access.delete_object(file).await;

Ok(())
}

pub fn test_connectivity(
endpoint: String,
region: String,
bucket: String,
aws_access_key_id: Option<String>,
aws_secret_access_key: Option<String>,
aws_instance_profile: bool,
) {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.thread_name("zoa_test_connectivity")
.build()
.unwrap()
.block_on(async move {
let client = if aws_instance_profile {
ObjectAccess::get_client_with_instance_profile(&endpoint, &region)
} else {
// Both aws_access_key_id and aws_secret_access_key should also be specified.
ObjectAccess::get_client_with_creds(
&endpoint,
&region,
aws_access_key_id.unwrap().as_str(),
aws_secret_access_key.unwrap().as_str(),
)
};
let object_access =
ObjectAccess::from_client(client, &bucket, false, &endpoint, &region);

std::process::exit(match do_test_connectivity(&object_access).await {
Err(_) => {
eprintln!("Connectivity test failed.");
1
}
Ok(_) => {
println!("Connectivity test succeeded.");
0
}
});
});
}

0 comments on commit 97f5149

Please sign in to comment.