Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved search/parse filters; support search RIB dump files #70

Merged
merged 4 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 14 additions & 245 deletions src/bin/monocle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use std::path::PathBuf;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;

use anyhow::{anyhow, Result};
use bgpkit_parser::encoder::MrtUpdatesEncoder;
use bgpkit_parser::BgpElem;
use clap::{Args, Parser, Subcommand};
use clap::{Parser, Subcommand};
use ipnet::IpNet;
use json_to_table::json_to_table;
use monocle::*;
Expand All @@ -19,10 +18,6 @@ use tabled::settings::{Merge, Style};
use tabled::{Table, Tabled};
use tracing::{info, Level};

trait Validate {
fn validate(&self) -> Result<()>;
}

#[derive(Parser)]
#[clap(author, version, about, long_about = None)]
#[clap(propagate_version = true)]
Expand All @@ -39,193 +34,11 @@ struct Cli {
command: Commands,
}

#[derive(Args, Debug)]
struct ParseFilters {
/// Filter by origin AS Number
#[clap(short = 'o', long)]
origin_asn: Option<u32>,

/// Filter by network prefix
#[clap(short = 'p', long)]
prefix: Option<String>,

/// Include super-prefix when filtering
#[clap(short = 's', long)]
include_super: bool,

/// Include sub-prefix when filtering
#[clap(short = 'S', long)]
include_sub: bool,

/// Filter by peer IP address
#[clap(short = 'j', long)]
peer_ip: Vec<IpAddr>,

/// Filter by peer ASN
#[clap(short = 'J', long)]
peer_asn: Option<u32>,

/// Filter by elem type: announce (a) or withdraw (w)
#[clap(short = 'm', long)]
elem_type: Option<String>,

/// Filter by start unix timestamp inclusive
#[clap(short = 't', long)]
start_ts: Option<String>,

/// Filter by end unix timestamp inclusive
#[clap(short = 'T', long)]
end_ts: Option<String>,

/// Filter by AS path regex string
#[clap(short = 'a', long)]
as_path: Option<String>,
}

#[derive(Args, Debug)]
struct SearchFilters {
/// Filter by start unix timestamp inclusive
#[clap(short = 't', long)]
start_ts: Option<String>,

/// Filter by end unix timestamp inclusive
#[clap(short = 'T', long)]
end_ts: Option<String>,

#[clap(short = 'd', long)]
duration: Option<String>,

/// Filter by collector, e.g. rrc00 or route-views2
#[clap(short = 'c', long)]
collector: Option<String>,

/// Filter by route collection project, i.e. riperis or routeviews
#[clap(short = 'P', long)]
project: Option<String>,

/// Filter by origin AS Number
#[clap(short = 'o', long)]
origin_asn: Option<u32>,

/// Filter by network prefix
#[clap(short = 'p', long)]
prefix: Option<String>,

/// Include super-prefix when filtering
#[clap(short = 's', long)]
include_super: bool,

/// Include sub-prefix when filtering
#[clap(short = 'S', long)]
include_sub: bool,

/// Filter by peer IP address
#[clap(short = 'j', long)]
peer_ip: Vec<IpAddr>,

/// Filter by peer ASN
#[clap(short = 'J', long)]
peer_asn: Option<u32>,

/// Filter by elem type: announce (a) or withdraw (w)
#[clap(short = 'm', long)]
elem_type: Option<String>,

/// Filter by AS path regex string
#[clap(short = 'a', long)]
as_path: Option<String>,
}

impl Validate for ParseFilters {
fn validate(&self) -> Result<()> {
if let Some(ts) = &self.start_ts {
if string_to_time(ts.as_str()).is_err() {
return Err(anyhow!("start-ts is not a valid time string: {}", ts));
}
}
if let Some(ts) = &self.end_ts {
if string_to_time(ts.as_str()).is_err() {
return Err(anyhow!("end-ts is not a valid time string: {}", ts));
}
}
Ok(())
}
}

impl SearchFilters {
fn parse_start_end_strings(&self) -> Result<(i64, i64)> {
let mut start_ts = None;
let mut end_ts = None;
if let Some(ts) = &self.start_ts {
match string_to_time(ts.as_str()) {
Ok(t) => start_ts = Some(t),
Err(_) => return Err(anyhow!("start-ts is not a valid time string: {}", ts)),
}
}
if let Some(ts) = &self.end_ts {
match string_to_time(ts.as_str()) {
Ok(t) => end_ts = Some(t),
Err(_) => return Err(anyhow!("end-ts is not a valid time string: {}", ts)),
}
}

match (&self.start_ts, &self.end_ts, &self.duration) {
(Some(_), Some(_), Some(_)) => {
return Err(anyhow!(
"cannot specify start_ts, end_ts, and duration all at the same time"
))
}
(Some(_), None, None) | (None, Some(_), None) => {
// only one start_ts or end_ts specified
return Err(anyhow!(
"must specify two from: start_ts, end_ts and duration"
));
}
(None, None, _) => {
return Err(anyhow!(
"must specify two from: start_ts, end_ts and duration"
));
}
_ => {}
}
if let Some(duration) = &self.duration {
// this case is duration + start_ts OR end_ts
let duration = match humantime::parse_duration(duration) {
Ok(d) => d,
Err(_) => {
return Err(anyhow!(
"duration is not a valid time duration string: {}",
duration
))
}
};

if let Some(ts) = start_ts {
return Ok((ts.timestamp(), (ts + duration).timestamp()));
}
if let Some(ts) = end_ts {
return Ok(((ts - duration).timestamp(), ts.timestamp()));
}
} else {
// this case is start_ts AND end_ts
return Ok((start_ts.unwrap().timestamp(), end_ts.unwrap().timestamp()));
}

Err(anyhow!("unexpected time-string parsing result"))
}
}
impl Validate for SearchFilters {
fn validate(&self) -> Result<()> {
let _ = self.parse_start_end_strings()?;
Ok(())
}
}

#[derive(Subcommand)]
enum Commands {
/// Parse individual MRT files given a file path, local or remote.
Parse {
/// File path to a MRT file, local or remote.
/// File path to an MRT file, local or remote.
#[clap(name = "FILE")]
file_path: PathBuf,

Expand Down Expand Up @@ -293,15 +106,15 @@ enum Commands {
#[clap(short = 'C', long)]
country_only: bool,

/// Refresh local as2org database
/// Refresh the local as2org database
#[clap(short, long)]
update: bool,

/// Output to pretty table, default markdown table
#[clap(short, long)]
pretty: bool,

/// Display full table (with ord_id, org_size)
/// Display a full table (with ord_id, org_size)
#[clap(short = 'F', long)]
full_table: bool,

Expand Down Expand Up @@ -343,7 +156,7 @@ enum Commands {
#[clap()]
ip: Option<IpAddr>,

/// Print IP address only (e.g. for getting the public IP address quickly)
/// Print IP address only (e.g., for getting the public IP address quickly)
#[clap(long)]
simple: bool,

Expand All @@ -370,7 +183,7 @@ enum RpkiCommands {

/// parse a RPKI ASPA file
ReadAspa {
/// File path to a ASPA file (.asa), local or remote.
/// File path to an ASPA file (.asa), local or remote.
#[clap(name = "FILE")]
file_path: PathBuf,

Expand Down Expand Up @@ -405,14 +218,14 @@ enum RpkiCommands {
enum RadarCommands {
/// get routing stats
Stats {
/// a two-letter country code or asn number (e.g. US or 13335)
/// a two-letter country code or asn number (e.g., US or 13335)
#[clap(name = "QUERY")]
query: Option<String>,
},

/// look up prefix to origin mapping on the most recent global routing table snapshot
/// look up prefix-to-origin mapping on the most recent global routing table snapshot
Pfx2as {
/// a IP prefix or an AS number (e.g. 1.1.1.0/24 or 13335)
/// an IP prefix or an AS number (e.g., 1.1.1.0/24 or 13335)
#[clap(name = "QUERY")]
query: String,

Expand Down Expand Up @@ -451,7 +264,7 @@ fn main() {
.init();
}

// You can check for the existence of subcommands, and if found use their
// You can check for the existence of subcommands, and if found, use their
// matches just as you would the top level cmd
match cli.command {
Commands::Parse {
Expand All @@ -467,20 +280,7 @@ fn main() {
}

let file_path = file_path.to_str().unwrap();
let parser = parser_with_filters(
file_path,
&filters.origin_asn,
&filters.prefix,
&filters.include_super,
&filters.include_sub,
&filters.peer_ip,
&filters.peer_asn,
&filters.elem_type,
&filters.start_ts.clone(),
&filters.end_ts.clone(),
&filters.as_path,
)
.unwrap();
let parser = filters.to_parser(file_path).unwrap();

let mut stdout = std::io::stdout();

Expand Down Expand Up @@ -542,24 +342,7 @@ fn main() {
let show_progress = sqlite_db.is_some() || mrt_path.is_some();

// it's fine to unwrap as the filters.validate() function has already checked for issues
let (ts_start, ts_end) = filters.parse_start_end_strings().unwrap();

let mut broker = bgpkit_broker::BgpkitBroker::new()
.ts_start(ts_start)
.ts_end(ts_end)
.data_type("update")
.page_size(1000);

if let Some(project) = filters.project {
broker = broker.project(project.as_str());
}
if let Some(collector) = filters.collector {
broker = broker.collector_id(collector.as_str());
}

let items = broker
.query()
.expect("broker query error: please check filters are valid");
let items = filters.to_broker_items().unwrap();

let total_items = items.len();

Expand Down Expand Up @@ -679,21 +462,7 @@ fn main() {
let url = item.url;
let collector = item.collector_id;
info!("start parsing {}", url.as_str());
let parser = parser_with_filters(
url.as_str(),
&filters.origin_asn,
&filters.prefix,
&filters.include_super,
&filters.include_sub,
&filters.peer_ip,
&filters.peer_asn,
&filters.elem_type,
// use the parsed new start and end ts
&Some(ts_start.to_string()),
&Some(ts_end.to_string()),
&filters.as_path,
)
.unwrap();
let parser = filters.to_parser(url.as_str()).unwrap();

let mut elems_count = 0;
for elem in parser {
Expand Down Expand Up @@ -726,7 +495,7 @@ fn main() {
let as2org = As2org::new(&Some(format!("{data_dir}/monocle-data.sqlite3"))).unwrap();

if update {
// if update flag is set, clear existing as2org data and re-download later
// if the update flag is set, clear existing as2org data and re-download later
as2org.clear_db();
}

Expand Down
33 changes: 33 additions & 0 deletions src/filters/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
pub use parse::ParseFilters;
pub use search::SearchFilters;

use bgpkit_parser::BgpkitParser;
use clap::ValueEnum;
use serde::Serialize;
use std::fmt::Display;
use std::io::Read;

mod parse;
mod search;

#[derive(ValueEnum, Clone, Debug, Serialize)]
pub enum ElemTypeEnum {
/// BGP announcement
A,
/// BGP withdrawal
W,
}

impl Display for ElemTypeEnum {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
ElemTypeEnum::A => "announcement",
ElemTypeEnum::W => "withdrawal",
})
}
}

pub trait MrtParserFilters {
fn validate(&self) -> anyhow::Result<()>;
fn to_parser(&self, path: &str) -> anyhow::Result<BgpkitParser<Box<dyn Read + Send>>>;
}
Loading
Loading