Skip to content

Commit

Permalink
Merge pull request #70 from bgpkit/feature/refactor_filters
Browse files Browse the repository at this point in the history
Improved search/parse filters; support search RIB dump files
  • Loading branch information
digizeph authored Jan 8, 2025
2 parents 13b108a + 07cace2 commit 262a1bd
Show file tree
Hide file tree
Showing 6 changed files with 441 additions and 420 deletions.
259 changes: 14 additions & 245 deletions src/bin/monocle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use std::path::PathBuf;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;

use anyhow::{anyhow, Result};
use bgpkit_parser::encoder::MrtUpdatesEncoder;
use bgpkit_parser::BgpElem;
use clap::{Args, Parser, Subcommand};
use clap::{Parser, Subcommand};
use ipnet::IpNet;
use json_to_table::json_to_table;
use monocle::*;
Expand All @@ -19,10 +18,6 @@ use tabled::settings::{Merge, Style};
use tabled::{Table, Tabled};
use tracing::{info, Level};

trait Validate {
fn validate(&self) -> Result<()>;
}

#[derive(Parser)]
#[clap(author, version, about, long_about = None)]
#[clap(propagate_version = true)]
Expand All @@ -39,193 +34,11 @@ struct Cli {
command: Commands,
}

#[derive(Args, Debug)]
struct ParseFilters {
/// Filter by origin AS Number
#[clap(short = 'o', long)]
origin_asn: Option<u32>,

/// Filter by network prefix
#[clap(short = 'p', long)]
prefix: Option<String>,

/// Include super-prefix when filtering
#[clap(short = 's', long)]
include_super: bool,

/// Include sub-prefix when filtering
#[clap(short = 'S', long)]
include_sub: bool,

/// Filter by peer IP address
#[clap(short = 'j', long)]
peer_ip: Vec<IpAddr>,

/// Filter by peer ASN
#[clap(short = 'J', long)]
peer_asn: Option<u32>,

/// Filter by elem type: announce (a) or withdraw (w)
#[clap(short = 'm', long)]
elem_type: Option<String>,

/// Filter by start unix timestamp inclusive
#[clap(short = 't', long)]
start_ts: Option<String>,

/// Filter by end unix timestamp inclusive
#[clap(short = 'T', long)]
end_ts: Option<String>,

/// Filter by AS path regex string
#[clap(short = 'a', long)]
as_path: Option<String>,
}

#[derive(Args, Debug)]
struct SearchFilters {
/// Filter by start unix timestamp inclusive
#[clap(short = 't', long)]
start_ts: Option<String>,

/// Filter by end unix timestamp inclusive
#[clap(short = 'T', long)]
end_ts: Option<String>,

#[clap(short = 'd', long)]
duration: Option<String>,

/// Filter by collector, e.g. rrc00 or route-views2
#[clap(short = 'c', long)]
collector: Option<String>,

/// Filter by route collection project, i.e. riperis or routeviews
#[clap(short = 'P', long)]
project: Option<String>,

/// Filter by origin AS Number
#[clap(short = 'o', long)]
origin_asn: Option<u32>,

/// Filter by network prefix
#[clap(short = 'p', long)]
prefix: Option<String>,

/// Include super-prefix when filtering
#[clap(short = 's', long)]
include_super: bool,

/// Include sub-prefix when filtering
#[clap(short = 'S', long)]
include_sub: bool,

/// Filter by peer IP address
#[clap(short = 'j', long)]
peer_ip: Vec<IpAddr>,

/// Filter by peer ASN
#[clap(short = 'J', long)]
peer_asn: Option<u32>,

/// Filter by elem type: announce (a) or withdraw (w)
#[clap(short = 'm', long)]
elem_type: Option<String>,

/// Filter by AS path regex string
#[clap(short = 'a', long)]
as_path: Option<String>,
}

impl Validate for ParseFilters {
fn validate(&self) -> Result<()> {
if let Some(ts) = &self.start_ts {
if string_to_time(ts.as_str()).is_err() {
return Err(anyhow!("start-ts is not a valid time string: {}", ts));
}
}
if let Some(ts) = &self.end_ts {
if string_to_time(ts.as_str()).is_err() {
return Err(anyhow!("end-ts is not a valid time string: {}", ts));
}
}
Ok(())
}
}

impl SearchFilters {
fn parse_start_end_strings(&self) -> Result<(i64, i64)> {
let mut start_ts = None;
let mut end_ts = None;
if let Some(ts) = &self.start_ts {
match string_to_time(ts.as_str()) {
Ok(t) => start_ts = Some(t),
Err(_) => return Err(anyhow!("start-ts is not a valid time string: {}", ts)),
}
}
if let Some(ts) = &self.end_ts {
match string_to_time(ts.as_str()) {
Ok(t) => end_ts = Some(t),
Err(_) => return Err(anyhow!("end-ts is not a valid time string: {}", ts)),
}
}

match (&self.start_ts, &self.end_ts, &self.duration) {
(Some(_), Some(_), Some(_)) => {
return Err(anyhow!(
"cannot specify start_ts, end_ts, and duration all at the same time"
))
}
(Some(_), None, None) | (None, Some(_), None) => {
// only one start_ts or end_ts specified
return Err(anyhow!(
"must specify two from: start_ts, end_ts and duration"
));
}
(None, None, _) => {
return Err(anyhow!(
"must specify two from: start_ts, end_ts and duration"
));
}
_ => {}
}
if let Some(duration) = &self.duration {
// this case is duration + start_ts OR end_ts
let duration = match humantime::parse_duration(duration) {
Ok(d) => d,
Err(_) => {
return Err(anyhow!(
"duration is not a valid time duration string: {}",
duration
))
}
};

if let Some(ts) = start_ts {
return Ok((ts.timestamp(), (ts + duration).timestamp()));
}
if let Some(ts) = end_ts {
return Ok(((ts - duration).timestamp(), ts.timestamp()));
}
} else {
// this case is start_ts AND end_ts
return Ok((start_ts.unwrap().timestamp(), end_ts.unwrap().timestamp()));
}

Err(anyhow!("unexpected time-string parsing result"))
}
}
impl Validate for SearchFilters {
fn validate(&self) -> Result<()> {
let _ = self.parse_start_end_strings()?;
Ok(())
}
}

#[derive(Subcommand)]
enum Commands {
/// Parse individual MRT files given a file path, local or remote.
Parse {
/// File path to a MRT file, local or remote.
/// File path to an MRT file, local or remote.
#[clap(name = "FILE")]
file_path: PathBuf,

Expand Down Expand Up @@ -293,15 +106,15 @@ enum Commands {
#[clap(short = 'C', long)]
country_only: bool,

/// Refresh local as2org database
/// Refresh the local as2org database
#[clap(short, long)]
update: bool,

/// Output to pretty table, default markdown table
#[clap(short, long)]
pretty: bool,

/// Display full table (with ord_id, org_size)
/// Display a full table (with ord_id, org_size)
#[clap(short = 'F', long)]
full_table: bool,

Expand Down Expand Up @@ -343,7 +156,7 @@ enum Commands {
#[clap()]
ip: Option<IpAddr>,

/// Print IP address only (e.g. for getting the public IP address quickly)
/// Print IP address only (e.g., for getting the public IP address quickly)
#[clap(long)]
simple: bool,

Expand All @@ -370,7 +183,7 @@ enum RpkiCommands {

/// parse a RPKI ASPA file
ReadAspa {
/// File path to a ASPA file (.asa), local or remote.
/// File path to an ASPA file (.asa), local or remote.
#[clap(name = "FILE")]
file_path: PathBuf,

Expand Down Expand Up @@ -405,14 +218,14 @@ enum RpkiCommands {
enum RadarCommands {
/// get routing stats
Stats {
/// a two-letter country code or asn number (e.g. US or 13335)
/// a two-letter country code or asn number (e.g., US or 13335)
#[clap(name = "QUERY")]
query: Option<String>,
},

/// look up prefix to origin mapping on the most recent global routing table snapshot
/// look up prefix-to-origin mapping on the most recent global routing table snapshot
Pfx2as {
/// a IP prefix or an AS number (e.g. 1.1.1.0/24 or 13335)
/// an IP prefix or an AS number (e.g., 1.1.1.0/24 or 13335)
#[clap(name = "QUERY")]
query: String,

Expand Down Expand Up @@ -451,7 +264,7 @@ fn main() {
.init();
}

// You can check for the existence of subcommands, and if found use their
// You can check for the existence of subcommands, and if found, use their
// matches just as you would the top level cmd
match cli.command {
Commands::Parse {
Expand All @@ -467,20 +280,7 @@ fn main() {
}

let file_path = file_path.to_str().unwrap();
let parser = parser_with_filters(
file_path,
&filters.origin_asn,
&filters.prefix,
&filters.include_super,
&filters.include_sub,
&filters.peer_ip,
&filters.peer_asn,
&filters.elem_type,
&filters.start_ts.clone(),
&filters.end_ts.clone(),
&filters.as_path,
)
.unwrap();
let parser = filters.to_parser(file_path).unwrap();

let mut stdout = std::io::stdout();

Expand Down Expand Up @@ -542,24 +342,7 @@ fn main() {
let show_progress = sqlite_db.is_some() || mrt_path.is_some();

// it's fine to unwrap as the filters.validate() function has already checked for issues
let (ts_start, ts_end) = filters.parse_start_end_strings().unwrap();

let mut broker = bgpkit_broker::BgpkitBroker::new()
.ts_start(ts_start)
.ts_end(ts_end)
.data_type("update")
.page_size(1000);

if let Some(project) = filters.project {
broker = broker.project(project.as_str());
}
if let Some(collector) = filters.collector {
broker = broker.collector_id(collector.as_str());
}

let items = broker
.query()
.expect("broker query error: please check filters are valid");
let items = filters.to_broker_items().unwrap();

let total_items = items.len();

Expand Down Expand Up @@ -679,21 +462,7 @@ fn main() {
let url = item.url;
let collector = item.collector_id;
info!("start parsing {}", url.as_str());
let parser = parser_with_filters(
url.as_str(),
&filters.origin_asn,
&filters.prefix,
&filters.include_super,
&filters.include_sub,
&filters.peer_ip,
&filters.peer_asn,
&filters.elem_type,
// use the parsed new start and end ts
&Some(ts_start.to_string()),
&Some(ts_end.to_string()),
&filters.as_path,
)
.unwrap();
let parser = filters.to_parser(url.as_str()).unwrap();

let mut elems_count = 0;
for elem in parser {
Expand Down Expand Up @@ -726,7 +495,7 @@ fn main() {
let as2org = As2org::new(&Some(format!("{data_dir}/monocle-data.sqlite3"))).unwrap();

if update {
// if update flag is set, clear existing as2org data and re-download later
// if the update flag is set, clear existing as2org data and re-download later
as2org.clear_db();
}

Expand Down
33 changes: 33 additions & 0 deletions src/filters/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
pub use parse::ParseFilters;
pub use search::SearchFilters;

use bgpkit_parser::BgpkitParser;
use clap::ValueEnum;
use serde::Serialize;
use std::fmt::Display;
use std::io::Read;

mod parse;
mod search;

#[derive(ValueEnum, Clone, Debug, Serialize)]
pub enum ElemTypeEnum {
/// BGP announcement
A,
/// BGP withdrawal
W,
}

impl Display for ElemTypeEnum {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
ElemTypeEnum::A => "announcement",
ElemTypeEnum::W => "withdrawal",
})
}
}

pub trait MrtParserFilters {
fn validate(&self) -> anyhow::Result<()>;
fn to_parser(&self, path: &str) -> anyhow::Result<BgpkitParser<Box<dyn Read + Send>>>;
}
Loading

0 comments on commit 262a1bd

Please sign in to comment.