Skip to content

Commit

Permalink
Merge pull request #38 from bgpkit/s3-subcommands
Browse files Browse the repository at this point in the history
add subcommand for s3 list
  • Loading branch information
digizeph authored Jan 26, 2024
2 parents a0eb35e + 4dd8136 commit 052d0fa
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 46 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ jobs:
- name: Build lib with all features
run: cargo build --all-features

- name: Run test for lib feature
run: cargo test --features lib --test oneio_test
- name: Run test for lib + s3 features (s3 for doc compilation tests)
run: cargo test --features s3

- name: Run format check
run: cargo fmt --check
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ lz4 = {version = "1.24", optional = true }
xz2 = {version = "0.1", optional = true }

# cli
clap = {version= "4.1", features=["derive"], optional=true}
clap = {version= "4.4", features=["derive"], optional=true}
tracing = {version="0.1.37", optional=true}

# json
Expand Down
2 changes: 1 addition & 1 deletion examples/s3_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn main() {
);

info!("list S3 files");
let res = s3_list("oneio-test", "test/", Some("/")).unwrap();
let res = s3_list("oneio-test", "test/", Some("/".to_string()), false).unwrap();
dbg!(res);

info!("read compressed s3 file by url");
Expand Down
105 changes: 83 additions & 22 deletions src/bin/oneio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::process::exit;
struct Cli {
/// file to open, remote or local
#[clap(name = "FILE")]
file: PathBuf,
file: Option<PathBuf>,

/// download the file to current directory, similar to run `wget`
#[clap(short, long)]
Expand Down Expand Up @@ -42,45 +42,106 @@ struct Cli {

#[derive(Subcommand)]
enum Commands {
/// Upload file to S3-compatible object storage
UploadToS3 {
/// S3-related subcommands
S3 {
#[clap(subcommand)]
s3_command: S3Commands,
},
}

#[derive(Subcommand)]
enum S3Commands {
/// Upload file to S3
Upload {
/// S3 bucket name
s3_bucket: String,
#[clap()]
bucket: String,

/// S3 file path
#[clap()]
path: String,
},
/// List S3 bucket
List {
/// S3 bucket name
#[clap()]
bucket: String,

/// S3 file path
#[clap(default_value = "")]
prefix: String,

/// delimiter for directory listing
#[clap(short, long, default_value = "/")]
delimiter: String,

/// S3 file path (starting with `/`)
s3_path: String,
/// showing directories only
#[clap(short, long)]
dirs: bool,
},
}

fn main() {
let cli = Cli::parse();
let path: &str = cli.file.to_str().unwrap();
let outfile: Option<PathBuf> = cli.outfile;

if let Some(command) = cli.command {
match command {
Commands::UploadToS3 { s3_bucket, s3_path } => {
if let Err(e) = oneio::s3_env_check() {
eprintln!("missing s3 credentials");
eprintln!("{}", e);
exit(1);
Commands::S3 { s3_command } => match s3_command {
S3Commands::Upload {
bucket: s3_bucket,
path: s3_path,
} => {
if let Err(e) = oneio::s3_env_check() {
eprintln!("missing s3 credentials");
eprintln!("{}", e);
exit(1);
}
let path_string = cli.file.clone().unwrap().to_str().unwrap().to_string();
let path = path_string.as_str();
match oneio::s3_upload(s3_bucket.as_str(), s3_path.as_str(), path) {
Ok(_) => {
println!(
"file successfully uploaded to s3://{}/{}",
s3_bucket, s3_path
);
}
Err(e) => {
eprintln!("file upload error: {}", e);
}
}
return;
}
match oneio::s3_upload(s3_bucket.as_str(), s3_path.as_str(), path) {
Ok(_) => {
println!(
"file successfully uploaded to s3://{}/{}",
s3_bucket, s3_path
);
S3Commands::List {
bucket,
prefix,
delimiter,
dirs,
} => {
if let Err(e) = oneio::s3_env_check() {
eprintln!("missing s3 credentials");
eprintln!("{}", e);
exit(1);
}
Err(e) => {
eprintln!("file upload error: {}", e);
match oneio::s3_list(bucket.as_str(), prefix.as_str(), Some(delimiter), dirs) {
Ok(paths) => {
paths.iter().for_each(|p| println!("{p}"));
}
Err(e) => {
eprintln!("unable to list bucket content");
eprintln!("{}", e);
exit(1);
}
}
return;
}
}
},
}
return;
}

let path_string = cli.file.clone().unwrap().to_str().unwrap().to_string();
let path = path_string.as_str();

if cli.download {
let out_path = match outfile {
None => path.split('/').last().unwrap().to_string(),
Expand Down
34 changes: 23 additions & 11 deletions src/oneio/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,17 @@ pub fn s3_bucket(bucket: &str) -> Result<Bucket, OneIoError> {
///
/// # Example
///
/// ```
/// ```rust,no_run
/// use std::io::Read;
/// use oneio::s3_reader;
///
/// let bucket = "my_bucket";
/// let path = "path/to/file.txt";
///
/// let mut reader = s3_reader(bucket, path)?;
/// let mut reader = s3_reader(bucket, path).unwrap();
///
/// let mut buffer = Vec::new();
/// reader.read_to_end(&mut buffer)?;
/// reader.read_to_end(&mut buffer).unwrap();
///
/// assert_eq!(buffer, b"File content in S3 bucket");
/// ```
Expand All @@ -191,7 +191,7 @@ pub fn s3_reader(bucket: &str, path: &str) -> Result<Box<dyn Read + Send>, OneIo
///
/// # Examples
///
/// ```
/// ```rust,no_run
/// use oneio::s3_upload;
///
/// let result = s3_upload("my-bucket", "path/to/file.txt", "/path/to/local_file.txt");
Expand Down Expand Up @@ -327,7 +327,7 @@ pub fn s3_download(bucket: &str, s3_path: &str, file_path: &str) -> Result<(), O
///
/// ## Example
///
/// ```no_run
/// ```rust,no_run
/// use oneio::s3_stats;
///
/// let bucket = "my-bucket";
Expand Down Expand Up @@ -397,6 +397,7 @@ pub fn s3_exists(bucket: &str, path: &str) -> Result<bool, OneIoError> {
/// * `bucket` - Name of the S3 bucket.
/// * `prefix` - A prefix to filter the objects by.
/// * `delimiter` - An optional delimiter used to separate object key hierarchies.
/// * `dirs` - A flag to show only directories under the given prefix if set to true
///
/// # Returns
///
Expand All @@ -410,9 +411,9 @@ pub fn s3_exists(bucket: &str, path: &str) -> Result<bool, OneIoError> {
///
/// let bucket = "my-bucket";
/// let prefix = "folder/";
/// let delimiter = Some("/");
/// let delimiter = Some("/".to_string());
///
/// let result = s3_list(bucket, prefix, delimiter);
/// let result = s3_list(bucket, prefix, delimiter, false);
/// match result {
/// Ok(objects) => {
/// println!("Found {} objects:", objects.len());
Expand All @@ -428,14 +429,25 @@ pub fn s3_exists(bucket: &str, path: &str) -> Result<bool, OneIoError> {
pub fn s3_list(
bucket: &str,
prefix: &str,
delimiter: Option<&str>,
delimiter: Option<String>,
dirs: bool,
) -> Result<Vec<String>, OneIoError> {
let fixed_delimiter = match dirs && delimiter.is_none() {
true => Some("/".to_string()),
false => delimiter,
};
let bucket = s3_bucket(bucket)?;
let mut list: Vec<ListBucketResult> =
bucket.list(prefix.to_string(), delimiter.map(|x| x.to_string()))?;
let mut list: Vec<ListBucketResult> = bucket.list(prefix.to_string(), fixed_delimiter)?;
let mut result = vec![];
for item in list.iter_mut() {
result.extend(item.contents.iter().map(|x| x.key.clone()));
match dirs {
true => result.extend(
item.common_prefixes
.iter()
.flat_map(|x| x.iter().map(|p| p.prefix.clone())),
),
false => result.extend(item.contents.iter().map(|x| x.key.clone())),
}
}
Ok(result)
}
Expand Down
19 changes: 10 additions & 9 deletions src/oneio/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,17 @@ pub fn read_json_struct<T: serde::de::DeserializeOwned>(path: &str) -> Result<T,
/// # Example
///
/// ```rust,no_run
/// use std::io::{BufRead, BufReader, Lines, Read};
/// use std::fs::File;
/// use std::io::Error as OneIoError;
/// use oneio::get_reader;
/// use std::io::BufRead;
/// use std::io::BufReader;
/// const TEST_TEXT: &str = "OneIO test file.
/// This is a test.";
///
/// pub fn read_lines(path: &str) -> Result<Lines<BufReader<Box<dyn Read + Send>>>, OneIoError> {
/// let reader = get_reader(path)?;
/// let buf_reader = BufReader::new(reader);
/// Ok(buf_reader.lines())
/// }
/// let lines = oneio::read_lines("https://spaces.bgpkit.org/oneio/test_data.txt.gz").unwrap()
/// .map(|line| line.unwrap()).collect::<Vec<String>>();
///
/// assert_eq!(lines.len(), 2);
/// assert_eq!(lines[0].as_str(), "OneIO test file.");
/// assert_eq!(lines[1].as_str(), "This is a test.");
/// ```
pub fn read_lines(path: &str) -> Result<Lines<BufReader<Box<dyn Read + Send>>>, OneIoError> {
let reader = get_reader(path)?;
Expand Down

0 comments on commit 052d0fa

Please sign in to comment.