From 69f110599c6a28e3fceafe0f5392b4b72ce295fc Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 29 Dec 2024 12:53:57 -0800 Subject: [PATCH 1/4] Refactor filters and time logic into dedicated modules - Introduced `filters` and `time` modules for cleaner separation - Moved `ParseFilters` and `SearchFilters` implementations to `filters` - Centralized time-related logic into the `time.rs` module - Replaced redundant `parser_with_filters` function with traits in `filters` - Simplified dependencies in `monocle.rs` by leveraging modularized functions --- src/bin/monocle.rs | 241 +----------------------------------------- src/filters/mod.rs | 33 ++++++ src/filters/parse.rs | 189 +++++++++++++++++++++++++++++++++ src/filters/search.rs | 56 ++++++++++ src/lib.rs | 179 +------------------------------ src/time.rs | 118 +++++++++++++++++++++ 6 files changed, 405 insertions(+), 411 deletions(-) create mode 100644 src/filters/mod.rs create mode 100644 src/filters/parse.rs create mode 100644 src/filters/search.rs create mode 100644 src/time.rs diff --git a/src/bin/monocle.rs b/src/bin/monocle.rs index 9bd7ed5..41aea3a 100644 --- a/src/bin/monocle.rs +++ b/src/bin/monocle.rs @@ -5,10 +5,9 @@ use std::path::PathBuf; use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread; -use anyhow::{anyhow, Result}; use bgpkit_parser::encoder::MrtUpdatesEncoder; use bgpkit_parser::BgpElem; -use clap::{Args, Parser, Subcommand}; +use clap::{Parser, Subcommand}; use ipnet::IpNet; use json_to_table::json_to_table; use monocle::*; @@ -19,10 +18,6 @@ use tabled::settings::{Merge, Style}; use tabled::{Table, Tabled}; use tracing::{info, Level}; -trait Validate { - fn validate(&self) -> Result<()>; -} - #[derive(Parser)] #[clap(author, version, about, long_about = None)] #[clap(propagate_version = true)] @@ -39,188 +34,6 @@ struct Cli { command: Commands, } -#[derive(Args, Debug)] -struct ParseFilters { - /// Filter by origin AS Number - #[clap(short = 'o', long)] - origin_asn: Option, - - /// Filter by network prefix - #[clap(short = 'p', long)] - prefix: Option, - - /// Include super-prefix when filtering - #[clap(short = 's', long)] - include_super: bool, - - /// Include sub-prefix when filtering - #[clap(short = 'S', long)] - include_sub: bool, - - /// Filter by peer IP address - #[clap(short = 'j', long)] - peer_ip: Vec, - - /// Filter by peer ASN - #[clap(short = 'J', long)] - peer_asn: Option, - - /// Filter by elem type: announce (a) or withdraw (w) - #[clap(short = 'm', long)] - elem_type: Option, - - /// Filter by start unix timestamp inclusive - #[clap(short = 't', long)] - start_ts: Option, - - /// Filter by end unix timestamp inclusive - #[clap(short = 'T', long)] - end_ts: Option, - - /// Filter by AS path regex string - #[clap(short = 'a', long)] - as_path: Option, -} - -#[derive(Args, Debug)] -struct SearchFilters { - /// Filter by start unix timestamp inclusive - #[clap(short = 't', long)] - start_ts: Option, - - /// Filter by end unix timestamp inclusive - #[clap(short = 'T', long)] - end_ts: Option, - - #[clap(short = 'd', long)] - duration: Option, - - /// Filter by collector, e.g. rrc00 or route-views2 - #[clap(short = 'c', long)] - collector: Option, - - /// Filter by route collection project, i.e. riperis or routeviews - #[clap(short = 'P', long)] - project: Option, - - /// Filter by origin AS Number - #[clap(short = 'o', long)] - origin_asn: Option, - - /// Filter by network prefix - #[clap(short = 'p', long)] - prefix: Option, - - /// Include super-prefix when filtering - #[clap(short = 's', long)] - include_super: bool, - - /// Include sub-prefix when filtering - #[clap(short = 'S', long)] - include_sub: bool, - - /// Filter by peer IP address - #[clap(short = 'j', long)] - peer_ip: Vec, - - /// Filter by peer ASN - #[clap(short = 'J', long)] - peer_asn: Option, - - /// Filter by elem type: announce (a) or withdraw (w) - #[clap(short = 'm', long)] - elem_type: Option, - - /// Filter by AS path regex string - #[clap(short = 'a', long)] - as_path: Option, -} - -impl Validate for ParseFilters { - fn validate(&self) -> Result<()> { - if let Some(ts) = &self.start_ts { - if string_to_time(ts.as_str()).is_err() { - return Err(anyhow!("start-ts is not a valid time string: {}", ts)); - } - } - if let Some(ts) = &self.end_ts { - if string_to_time(ts.as_str()).is_err() { - return Err(anyhow!("end-ts is not a valid time string: {}", ts)); - } - } - Ok(()) - } -} - -impl SearchFilters { - fn parse_start_end_strings(&self) -> Result<(i64, i64)> { - let mut start_ts = None; - let mut end_ts = None; - if let Some(ts) = &self.start_ts { - match string_to_time(ts.as_str()) { - Ok(t) => start_ts = Some(t), - Err(_) => return Err(anyhow!("start-ts is not a valid time string: {}", ts)), - } - } - if let Some(ts) = &self.end_ts { - match string_to_time(ts.as_str()) { - Ok(t) => end_ts = Some(t), - Err(_) => return Err(anyhow!("end-ts is not a valid time string: {}", ts)), - } - } - - match (&self.start_ts, &self.end_ts, &self.duration) { - (Some(_), Some(_), Some(_)) => { - return Err(anyhow!( - "cannot specify start_ts, end_ts, and duration all at the same time" - )) - } - (Some(_), None, None) | (None, Some(_), None) => { - // only one start_ts or end_ts specified - return Err(anyhow!( - "must specify two from: start_ts, end_ts and duration" - )); - } - (None, None, _) => { - return Err(anyhow!( - "must specify two from: start_ts, end_ts and duration" - )); - } - _ => {} - } - if let Some(duration) = &self.duration { - // this case is duration + start_ts OR end_ts - let duration = match humantime::parse_duration(duration) { - Ok(d) => d, - Err(_) => { - return Err(anyhow!( - "duration is not a valid time duration string: {}", - duration - )) - } - }; - - if let Some(ts) = start_ts { - return Ok((ts.timestamp(), (ts + duration).timestamp())); - } - if let Some(ts) = end_ts { - return Ok(((ts - duration).timestamp(), ts.timestamp())); - } - } else { - // this case is start_ts AND end_ts - return Ok((start_ts.unwrap().timestamp(), end_ts.unwrap().timestamp())); - } - - Err(anyhow!("unexpected time-string parsing result")) - } -} -impl Validate for SearchFilters { - fn validate(&self) -> Result<()> { - let _ = self.parse_start_end_strings()?; - Ok(()) - } -} - #[derive(Subcommand)] enum Commands { /// Parse individual MRT files given a file path, local or remote. @@ -467,20 +280,7 @@ fn main() { } let file_path = file_path.to_str().unwrap(); - let parser = parser_with_filters( - file_path, - &filters.origin_asn, - &filters.prefix, - &filters.include_super, - &filters.include_sub, - &filters.peer_ip, - &filters.peer_asn, - &filters.elem_type, - &filters.start_ts.clone(), - &filters.end_ts.clone(), - &filters.as_path, - ) - .unwrap(); + let parser = filters.to_parser(file_path).unwrap(); let mut stdout = std::io::stdout(); @@ -542,24 +342,7 @@ fn main() { let show_progress = sqlite_db.is_some() || mrt_path.is_some(); // it's fine to unwrap as the filters.validate() function has already checked for issues - let (ts_start, ts_end) = filters.parse_start_end_strings().unwrap(); - - let mut broker = bgpkit_broker::BgpkitBroker::new() - .ts_start(ts_start) - .ts_end(ts_end) - .data_type("update") - .page_size(1000); - - if let Some(project) = filters.project { - broker = broker.project(project.as_str()); - } - if let Some(collector) = filters.collector { - broker = broker.collector_id(collector.as_str()); - } - - let items = broker - .query() - .expect("broker query error: please check filters are valid"); + let items = filters.to_broker_items().unwrap(); let total_items = items.len(); @@ -679,21 +462,7 @@ fn main() { let url = item.url; let collector = item.collector_id; info!("start parsing {}", url.as_str()); - let parser = parser_with_filters( - url.as_str(), - &filters.origin_asn, - &filters.prefix, - &filters.include_super, - &filters.include_sub, - &filters.peer_ip, - &filters.peer_asn, - &filters.elem_type, - // use the parsed new start and end ts - &Some(ts_start.to_string()), - &Some(ts_end.to_string()), - &filters.as_path, - ) - .unwrap(); + let parser = filters.to_parser(url.as_str()).unwrap(); let mut elems_count = 0; for elem in parser { @@ -726,7 +495,7 @@ fn main() { let as2org = As2org::new(&Some(format!("{data_dir}/monocle-data.sqlite3"))).unwrap(); if update { - // if update flag is set, clear existing as2org data and re-download later + // if the update flag is set, clear existing as2org data and re-download later as2org.clear_db(); } diff --git a/src/filters/mod.rs b/src/filters/mod.rs new file mode 100644 index 0000000..d7267d6 --- /dev/null +++ b/src/filters/mod.rs @@ -0,0 +1,33 @@ +pub use parse::ParseFilters; +pub use search::SearchFilters; + +use bgpkit_parser::BgpkitParser; +use clap::ValueEnum; +use serde::Serialize; +use std::fmt::Display; +use std::io::Read; + +mod parse; +mod search; + +#[derive(ValueEnum, Clone, Debug, Serialize)] +pub enum ElemTypeEnum { + /// BGP announcement + A, + /// BGP withdrawal + W, +} + +impl Display for ElemTypeEnum { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + ElemTypeEnum::A => "announcement", + ElemTypeEnum::W => "withdrawal", + }) + } +} + +pub trait MrtParserFilters { + fn validate(&self) -> anyhow::Result<()>; + fn to_parser(&self, path: &str) -> anyhow::Result>>; +} diff --git a/src/filters/parse.rs b/src/filters/parse.rs new file mode 100644 index 0000000..6edf40f --- /dev/null +++ b/src/filters/parse.rs @@ -0,0 +1,189 @@ +use crate::filters::MrtParserFilters; +use crate::time::string_to_time; +use crate::ElemTypeEnum; +use anyhow::anyhow; +use anyhow::Result; +use bgpkit_parser::BgpkitParser; +use clap::Args; +use itertools::Itertools; +use std::io::Read; +use std::net::IpAddr; + +#[derive(Args, Debug, Clone)] +pub struct ParseFilters { + /// Filter by origin AS Number + #[clap(short = 'o', long)] + pub origin_asn: Option, + + /// Filter by network prefix + #[clap(short = 'p', long)] + pub prefix: Option, + + /// Include super-prefix when filtering + #[clap(short = 's', long)] + pub include_super: bool, + + /// Include sub-prefix when filtering + #[clap(short = 'S', long)] + pub include_sub: bool, + + /// Filter by peer IP address + #[clap(short = 'j', long)] + pub peer_ip: Vec, + + /// Filter by peer ASN + #[clap(short = 'J', long)] + pub peer_asn: Option, + + /// Filter by elem type: announce (a) or withdraw (w) + #[clap(short = 'm', long, value_enum)] + pub elem_type: Option, + + /// Filter by start unix timestamp inclusive + #[clap(short = 't', long)] + pub start_ts: Option, + + /// Filter by end unix timestamp inclusive + #[clap(short = 'T', long)] + pub end_ts: Option, + + /// Duration from the start-ts or end-ts, e.g. 1h + #[clap(short = 'd', long)] + pub duration: Option, + + /// Filter by AS path regex string + #[clap(short = 'a', long)] + pub as_path: Option, +} + +impl ParseFilters { + pub fn parse_start_end_strings(&self) -> Result<(String, String)> { + let mut start_ts = None; + let mut end_ts = None; + if let Some(ts) = &self.start_ts { + match string_to_time(ts.as_str()) { + Ok(t) => start_ts = Some(t), + Err(_) => return Err(anyhow!("start-ts is not a valid time string: {}", ts)), + } + } + if let Some(ts) = &self.end_ts { + match string_to_time(ts.as_str()) { + Ok(t) => end_ts = Some(t), + Err(_) => return Err(anyhow!("end-ts is not a valid time string: {}", ts)), + } + } + + match (&self.start_ts, &self.end_ts, &self.duration) { + (Some(_), Some(_), Some(_)) => { + return Err(anyhow!( + "cannot specify start_ts, end_ts, and duration all at the same time" + )) + } + (Some(_), None, None) | (None, Some(_), None) => { + // only one start_ts or end_ts specified + return Err(anyhow!( + "must specify two from: start_ts, end_ts and duration" + )); + } + (None, None, _) => { + return Err(anyhow!( + "must specify two from: start_ts, end_ts and duration" + )); + } + _ => {} + } + if let Some(duration) = &self.duration { + // this case is duration + start_ts OR end_ts + let duration = match humantime::parse_duration(duration) { + Ok(d) => d, + Err(_) => { + return Err(anyhow!( + "duration is not a valid time duration string: {}", + duration + )) + } + }; + + if let Some(ts) = start_ts { + return Ok((ts.to_rfc3339(), (ts + duration).to_rfc3339())); + } + if let Some(ts) = end_ts { + return Ok(((ts - duration).to_rfc3339(), ts.to_rfc3339())); + } + } else { + // this case is start_ts AND end_ts + return Ok((start_ts.unwrap().to_rfc3339(), end_ts.unwrap().to_rfc3339())); + } + + Err(anyhow!("unexpected time-string parsing result")) + } +} + +impl MrtParserFilters for ParseFilters { + fn validate(&self) -> Result<()> { + if let Some(ts) = &self.start_ts { + if string_to_time(ts.as_str()).is_err() { + return Err(anyhow!("start-ts is not a valid time string: {}", ts)); + } + } + if let Some(ts) = &self.end_ts { + if string_to_time(ts.as_str()).is_err() { + return Err(anyhow!("end-ts is not a valid time string: {}", ts)); + } + } + Ok(()) + } + + fn to_parser(&self, file_path: &str) -> Result>> { + let mut parser = BgpkitParser::new(file_path)?.disable_warnings(); + + if let Some(v) = &self.as_path { + parser = parser.add_filter("as_path", v.to_string().as_str())?; + } + if let Some(v) = &self.origin_asn { + parser = parser.add_filter("origin_asn", v.to_string().as_str())?; + } + if let Some(v) = &self.prefix { + let filter_type = match (self.include_super, self.include_sub) { + (false, false) => "prefix", + (true, false) => "prefix_super", + (false, true) => "prefix_sub", + (true, true) => "prefix_super_sub", + }; + parser = parser.add_filter(filter_type, v.as_str())?; + } + if !self.peer_ip.is_empty() { + let v = self.peer_ip.iter().map(|p| p.to_string()).join(","); + parser = parser.add_filter("peer_ips", v.as_str())?; + } + if let Some(v) = &self.peer_asn { + parser = parser.add_filter("peer_asn", v.to_string().as_str())?; + } + if let Some(v) = &self.elem_type { + parser = parser.add_filter("type", v.to_string().as_str())?; + } + + match self.parse_start_end_strings() { + Ok((start_ts, end_ts)) => { + // in case we have full start_ts and end_ts, like in `monocle search` command input, + // we will use the parsed start_ts and end_ts. + parser = parser.add_filter("start_ts", start_ts.to_string().as_str())?; + parser = parser.add_filter("end_ts", end_ts.to_string().as_str())?; + } + Err(_) => { + // we could also likely not have any time filters, in this case, add filters + // as we see them, and no modification is needed. + if let Some(v) = &self.start_ts { + let ts = string_to_time(v.as_str())?.timestamp(); + parser = parser.add_filter("start_ts", ts.to_string().as_str())?; + } + if let Some(v) = &self.end_ts { + let ts = string_to_time(v.as_str())?.timestamp(); + parser = parser.add_filter("end_ts", ts.to_string().as_str())?; + } + } + } + + Ok(parser) + } +} diff --git a/src/filters/search.rs b/src/filters/search.rs new file mode 100644 index 0000000..e7f5430 --- /dev/null +++ b/src/filters/search.rs @@ -0,0 +1,56 @@ +use crate::filters::parse::ParseFilters; +use crate::filters::MrtParserFilters; +use anyhow::Result; +use bgpkit_broker::BrokerItem; +use bgpkit_parser::BgpkitParser; +use clap::Args; +use std::io::Read; + +#[derive(Args, Debug, Clone)] +pub struct SearchFilters { + #[clap(flatten)] + pub parse_filters: ParseFilters, + + /// Filter by collector, e.g., rrc00 or route-views2 + #[clap(short = 'c', long)] + pub collector: Option, + + /// Filter by route collection project, i.e., riperis or routeviews + #[clap(short = 'P', long)] + pub project: Option, +} + +impl SearchFilters { + pub fn to_broker_items(&self) -> Result> { + // it's fine to unwrap as the filters.validate() function has already checked for issues + let (ts_start, ts_end) = self.parse_filters.parse_start_end_strings()?; + + let mut broker = bgpkit_broker::BgpkitBroker::new() + .ts_start(ts_start) + .ts_end(ts_end) + .data_type("update") + .page_size(1000); + + if let Some(project) = &self.project { + broker = broker.project(project.as_str()); + } + if let Some(collector) = &self.collector { + broker = broker.collector_id(collector.as_str()); + } + + broker + .query() + .map_err(|_| anyhow::anyhow!("broker query error: please check filters are valid")) + } +} + +impl MrtParserFilters for SearchFilters { + fn validate(&self) -> Result<()> { + let _ = self.parse_filters.parse_start_end_strings()?; + Ok(()) + } + + fn to_parser(&self, file_path: &str) -> Result>> { + self.parse_filters.to_parser(file_path) + } +} diff --git a/src/lib.rs b/src/lib.rs index dc63f5e..e1eff2b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,182 +1,11 @@ mod config; mod database; mod datasets; - -use anyhow::{anyhow, Result}; -use bgpkit_parser::BgpkitParser; -use chrono::{DateTime, TimeZone, Utc}; -use chrono_humanize::HumanTime; -use itertools::Itertools; -use std::io::Read; -use std::net::IpAddr; -use tabled::settings::Style; -use tabled::{Table, Tabled}; +mod filters; +mod time; pub use crate::config::MonocleConfig; pub use crate::database::*; pub use crate::datasets::*; - -#[allow(clippy::too_many_arguments)] -pub fn parser_with_filters( - file_path: &str, - origin_asn: &Option, - prefix: &Option, - include_super: &bool, - include_sub: &bool, - peer_ip: &[IpAddr], - peer_asn: &Option, - elem_type: &Option, - start_ts: &Option, - end_ts: &Option, - as_path: &Option, -) -> Result>> { - let mut parser = BgpkitParser::new(file_path).unwrap().disable_warnings(); - - if let Some(v) = as_path { - parser = parser.add_filter("as_path", v.to_string().as_str())?; - } - if let Some(v) = origin_asn { - parser = parser.add_filter("origin_asn", v.to_string().as_str())?; - } - if let Some(v) = prefix { - let filter_type = match (include_super, include_sub) { - (false, false) => "prefix", - (true, false) => "prefix_super", - (false, true) => "prefix_sub", - (true, true) => "prefix_super_sub", - }; - parser = parser.add_filter(filter_type, v.as_str())?; - } - if !peer_ip.is_empty() { - let v = peer_ip.iter().map(|p| p.to_string()).join(","); - parser = parser.add_filter("peer_ips", v.as_str())?; - } - if let Some(v) = peer_asn { - parser = parser.add_filter("peer_asn", v.to_string().as_str())?; - } - if let Some(v) = elem_type { - parser = parser.add_filter("type", v.to_string().as_str())?; - } - if let Some(v) = start_ts { - let ts = string_to_time(v.as_str())?.timestamp(); - parser = parser.add_filter("start_ts", ts.to_string().as_str())?; - } - if let Some(v) = end_ts { - let ts = string_to_time(v.as_str())?.timestamp(); - parser = parser.add_filter("end_ts", ts.to_string().as_str())?; - } - Ok(parser) -} - -#[derive(Tabled)] -struct BgpTime { - unix: i64, - rfc3339: String, - human: String, -} - -pub fn string_to_time(time_string: &str) -> Result> { - let ts = match dateparser::parse_with( - time_string, - &Utc, - chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap(), - ) { - Ok(ts) => ts, - Err(_) => { - return Err(anyhow!( - "Input time must be either Unix timestamp or time string compliant with RFC3339" - )) - } - }; - - Ok(ts) -} - -pub fn parse_time_string_to_rfc3339(time_vec: &[String]) -> Result { - let mut time_strings = vec![]; - if time_vec.is_empty() { - time_strings.push(Utc::now().to_rfc3339()) - } else { - for ts in time_vec { - match string_to_time(ts) { - Ok(ts) => time_strings.push(ts.to_rfc3339()), - Err(_) => return Err(anyhow!("unable to parse timestring: {}", ts)), - } - } - } - - Ok(time_strings.join("\n")) -} - -pub fn time_to_table(time_vec: &[String]) -> Result { - let now_ts = Utc::now().timestamp(); - let ts_vec = match time_vec.is_empty() { - true => vec![now_ts], - false => time_vec - .iter() - .map(|ts| string_to_time(ts.as_str()).map(|dt| dt.timestamp())) - .collect::>>()?, - }; - - let bgptime_vec = ts_vec - .into_iter() - .map(|ts| { - let ht = HumanTime::from(chrono::Local::now() - chrono::Duration::seconds(now_ts - ts)); - let human = ht.to_string(); - let rfc3339 = Utc - .from_utc_datetime(&DateTime::from_timestamp(ts, 0).unwrap().naive_utc()) - .to_rfc3339(); - BgpTime { - unix: ts, - rfc3339, - human, - } - }) - .collect::>(); - - Ok(Table::new(bgptime_vec).with(Style::rounded()).to_string()) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_string_to_time() { - use chrono::TimeZone; - - // Test with a valid Unix timestamp - let unix_ts = "1697043600"; // Example timestamp - let result = string_to_time(unix_ts); - assert!(result.is_ok()); - assert_eq!(result.unwrap(), Utc.timestamp_opt(1697043600, 0).unwrap()); - - // Test with a valid RFC3339 string - let rfc3339_str = "2023-10-11T00:00:00Z"; - let result = string_to_time(rfc3339_str); - assert!(result.is_ok()); - assert_eq!(result.unwrap(), Utc.timestamp_opt(1696982400, 0).unwrap()); - - // Test with an incorrect date string - let invalid_date = "not-a-date"; - let result = string_to_time(invalid_date); - assert!(result.is_err()); - - // Test with an empty string - let empty_string = ""; - let result = string_to_time(empty_string); - assert!(result.is_err()); - - // Test with incomplete RFC3339 string - let incomplete_rfc3339 = "2023-10-11T"; - let result = string_to_time(incomplete_rfc3339); - assert!(result.is_err()); - - // Test with a human-readable date string allowed by `dateparser` - let human_readable = "October 11, 2023"; - let result = string_to_time(human_readable); - assert!(result.is_ok()); - let expected_time = Utc.with_ymd_and_hms(2023, 10, 11, 0, 0, 0).unwrap(); - assert_eq!(result.unwrap(), expected_time); - } -} +pub use crate::filters::*; +pub use crate::time::*; diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 0000000..647ae4b --- /dev/null +++ b/src/time.rs @@ -0,0 +1,118 @@ +use anyhow::anyhow; +use chrono::{DateTime, TimeZone, Utc}; +use chrono_humanize::HumanTime; +use tabled::settings::Style; +use tabled::{Table, Tabled}; + +#[derive(Tabled)] +struct BgpTime { + unix: i64, + rfc3339: String, + human: String, +} + +pub fn string_to_time(time_string: &str) -> anyhow::Result> { + let ts = match dateparser::parse_with( + time_string, + &Utc, + chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap(), + ) { + Ok(ts) => ts, + Err(_) => { + return Err(anyhow!( + "Input time must be either Unix timestamp or time string compliant with RFC3339" + )) + } + }; + + Ok(ts) +} + +pub fn parse_time_string_to_rfc3339(time_vec: &[String]) -> anyhow::Result { + let mut time_strings = vec![]; + if time_vec.is_empty() { + time_strings.push(Utc::now().to_rfc3339()) + } else { + for ts in time_vec { + match string_to_time(ts) { + Ok(ts) => time_strings.push(ts.to_rfc3339()), + Err(_) => return Err(anyhow!("unable to parse timestring: {}", ts)), + } + } + } + + Ok(time_strings.join("\n")) +} + +pub fn time_to_table(time_vec: &[String]) -> anyhow::Result { + let now_ts = Utc::now().timestamp(); + let ts_vec = match time_vec.is_empty() { + true => vec![now_ts], + false => time_vec + .iter() + .map(|ts| string_to_time(ts.as_str()).map(|dt| dt.timestamp())) + .collect::>>()?, + }; + + let bgptime_vec = ts_vec + .into_iter() + .map(|ts| { + let ht = HumanTime::from(chrono::Local::now() - chrono::Duration::seconds(now_ts - ts)); + let human = ht.to_string(); + let rfc3339 = Utc + .from_utc_datetime(&DateTime::from_timestamp(ts, 0).unwrap().naive_utc()) + .to_rfc3339(); + BgpTime { + unix: ts, + rfc3339, + human, + } + }) + .collect::>(); + + Ok(Table::new(bgptime_vec).with(Style::rounded()).to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_string_to_time() { + use chrono::TimeZone; + + // Test with a valid Unix timestamp + let unix_ts = "1697043600"; // Example timestamp + let result = string_to_time(unix_ts); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), Utc.timestamp_opt(1697043600, 0).unwrap()); + + // Test with a valid RFC3339 string + let rfc3339_str = "2023-10-11T00:00:00Z"; + let result = string_to_time(rfc3339_str); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), Utc.timestamp_opt(1696982400, 0).unwrap()); + + // Test with an incorrect date string + let invalid_date = "not-a-date"; + let result = string_to_time(invalid_date); + assert!(result.is_err()); + + // Test with an empty string + let empty_string = ""; + let result = string_to_time(empty_string); + assert!(result.is_err()); + + // Test with incomplete RFC3339 string + let incomplete_rfc3339 = "2023-10-11T"; + let result = string_to_time(incomplete_rfc3339); + assert!(result.is_err()); + + // Test with a human-readable date string allowed by `dateparser` + let human_readable = "October 11, 2023"; + let result = string_to_time(human_readable); + assert!(result.is_ok()); + let expected_time = Utc.with_ymd_and_hms(2023, 10, 11, 0, 0, 0).unwrap(); + assert_eq!(result.unwrap(), expected_time); + } +} From 5d909eec587a25766aa2e0c9de9894edf8efd3ab Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 29 Dec 2024 12:56:27 -0800 Subject: [PATCH 2/4] Fix grammatical and formatting issues in CLI command descriptions - Corrected article usage (e.g., 'a' to 'an') in multiple descriptions. - Added missing commas in example clarifications (e.g., 'e.g., US'). - Improved phrasing for clarity (e.g., 'prefix-to-origin mapping'). - Adjusted minor formatting inconsistencies in comments and annotations. --- src/bin/monocle.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/bin/monocle.rs b/src/bin/monocle.rs index 41aea3a..ae54a19 100644 --- a/src/bin/monocle.rs +++ b/src/bin/monocle.rs @@ -38,7 +38,7 @@ struct Cli { enum Commands { /// Parse individual MRT files given a file path, local or remote. Parse { - /// File path to a MRT file, local or remote. + /// File path to an MRT file, local or remote. #[clap(name = "FILE")] file_path: PathBuf, @@ -106,7 +106,7 @@ enum Commands { #[clap(short = 'C', long)] country_only: bool, - /// Refresh local as2org database + /// Refresh the local as2org database #[clap(short, long)] update: bool, @@ -114,7 +114,7 @@ enum Commands { #[clap(short, long)] pretty: bool, - /// Display full table (with ord_id, org_size) + /// Display a full table (with ord_id, org_size) #[clap(short = 'F', long)] full_table: bool, @@ -156,7 +156,7 @@ enum Commands { #[clap()] ip: Option, - /// Print IP address only (e.g. for getting the public IP address quickly) + /// Print IP address only (e.g., for getting the public IP address quickly) #[clap(long)] simple: bool, @@ -183,7 +183,7 @@ enum RpkiCommands { /// parse a RPKI ASPA file ReadAspa { - /// File path to a ASPA file (.asa), local or remote. + /// File path to an ASPA file (.asa), local or remote. #[clap(name = "FILE")] file_path: PathBuf, @@ -218,14 +218,14 @@ enum RpkiCommands { enum RadarCommands { /// get routing stats Stats { - /// a two-letter country code or asn number (e.g. US or 13335) + /// a two-letter country code or asn number (e.g., US or 13335) #[clap(name = "QUERY")] query: Option, }, - /// look up prefix to origin mapping on the most recent global routing table snapshot + /// look up prefix-to-origin mapping on the most recent global routing table snapshot Pfx2as { - /// a IP prefix or an AS number (e.g. 1.1.1.0/24 or 13335) + /// an IP prefix or an AS number (e.g., 1.1.1.0/24 or 13335) #[clap(name = "QUERY")] query: String, @@ -264,7 +264,7 @@ fn main() { .init(); } - // You can check for the existence of subcommands, and if found use their + // You can check for the existence of subcommands, and if found, use their // matches just as you would the top level cmd match cli.command { Commands::Parse { From 94b6a96557438fb45172442f147e6259713794ab Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 29 Dec 2024 17:10:04 -0800 Subject: [PATCH 3/4] change `parse_start_end_strings` to return timestamps as i64 - updated `parse_start_end_strings` to return `i64` instead of `String`. - replaced `.to_rfc3339()` calls with `.timestamp()`. - adjusted return values to reflect the new timestamp format. --- src/filters/parse.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/filters/parse.rs b/src/filters/parse.rs index 6edf40f..c49a41b 100644 --- a/src/filters/parse.rs +++ b/src/filters/parse.rs @@ -57,7 +57,7 @@ pub struct ParseFilters { } impl ParseFilters { - pub fn parse_start_end_strings(&self) -> Result<(String, String)> { + pub fn parse_start_end_strings(&self) -> Result<(i64, i64)> { let mut start_ts = None; let mut end_ts = None; if let Some(ts) = &self.start_ts { @@ -105,14 +105,14 @@ impl ParseFilters { }; if let Some(ts) = start_ts { - return Ok((ts.to_rfc3339(), (ts + duration).to_rfc3339())); + return Ok((ts.timestamp(), (ts + duration).timestamp())); } if let Some(ts) = end_ts { - return Ok(((ts - duration).to_rfc3339(), ts.to_rfc3339())); + return Ok(((ts - duration).timestamp(), ts.timestamp())); } } else { // this case is start_ts AND end_ts - return Ok((start_ts.unwrap().to_rfc3339(), end_ts.unwrap().to_rfc3339())); + return Ok((start_ts.unwrap().timestamp(), end_ts.unwrap().timestamp())); } Err(anyhow!("unexpected time-string parsing result")) From 07cace20443cde6370a5cb5d70a5f21d39c6317e Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 29 Dec 2024 17:12:25 -0800 Subject: [PATCH 4/4] support data dump type filtering in search filters - added `DumpType` enum with options for updates, rib, and both - added `dump_type` field with `updates` as default - updated broker query to filter by selected dump type --- src/filters/search.rs | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/src/filters/search.rs b/src/filters/search.rs index e7f5430..65d6965 100644 --- a/src/filters/search.rs +++ b/src/filters/search.rs @@ -3,7 +3,8 @@ use crate::filters::MrtParserFilters; use anyhow::Result; use bgpkit_broker::BrokerItem; use bgpkit_parser::BgpkitParser; -use clap::Args; +use clap::{Args, ValueEnum}; +use serde::Serialize; use std::io::Read; #[derive(Args, Debug, Clone)] @@ -18,6 +19,21 @@ pub struct SearchFilters { /// Filter by route collection project, i.e., riperis or routeviews #[clap(short = 'P', long)] pub project: Option, + + /// Specify data dump type to search (updates or RIB dump) + #[clap(short = 'D', long, default_value_t, value_enum)] + pub dump_type: DumpType, +} + +#[derive(ValueEnum, Clone, Debug, Default, Serialize)] +pub enum DumpType { + /// BGP updates only + #[default] + Updates, + /// BGP RIB dump only + Rib, + /// BGP RIB dump and BGP updates + RibUpdates, } impl SearchFilters { @@ -28,7 +44,6 @@ impl SearchFilters { let mut broker = bgpkit_broker::BgpkitBroker::new() .ts_start(ts_start) .ts_end(ts_end) - .data_type("update") .page_size(1000); if let Some(project) = &self.project { @@ -38,6 +53,18 @@ impl SearchFilters { broker = broker.collector_id(collector.as_str()); } + match self.dump_type { + DumpType::Updates => { + broker = broker.data_type("updates"); + } + DumpType::Rib => { + broker = broker.data_type("rib"); + } + DumpType::RibUpdates => { + // do nothing here -> getting all RIB and updates + } + } + broker .query() .map_err(|_| anyhow::anyhow!("broker query error: please check filters are valid"))