Skip to content

Commit

Permalink
Merge pull request #185 from bgpkit/ris_live_refactor
Browse files Browse the repository at this point in the history
improved RIS Live module
  • Loading branch information
digizeph authored Oct 16, 2024
2 parents 74eb145 + 053efd7 commit 98fa324
Show file tree
Hide file tree
Showing 15 changed files with 372 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/target
target

# These are backup files generated by rustfmt
**/*.rs.bk
Expand Down
7 changes: 3 additions & 4 deletions examples/real-time-ris-live-websocket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bgpkit_parser::parse_ris_live_message;
use serde_json::json;
use bgpkit_parser::rislive::messages::{RisLiveClientMessage, RisSubscribe};
use tungstenite::{connect, Message};

const RIS_LIVE_URL: &str = "ws://ris-live.ripe.net/v1/ws/?client=rust-bgpkit-parser";
Expand All @@ -13,9 +13,8 @@ fn main() {
connect(RIS_LIVE_URL).expect("Can't connect to RIS Live websocket server");

// subscribe to messages from one collector
// let msg = json!({"type": "ris_subscribe", "data": {"host": "rrc21"}}).to_string();
let msg = json!({"type": "ris_subscribe", "data": null}).to_string();
socket.send(Message::Text(msg)).unwrap();
let msg = RisSubscribe::new().host("rrc21");
socket.send(Message::Text(msg.to_json_string())).unwrap();

loop {
let msg = socket.read().expect("Error reading message").to_string();
Expand Down
28 changes: 28 additions & 0 deletions src/parser/rislive/messages/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! This module contains the RIS-live client message definitions.
//!
//! Official manual available at <https://ris-live.ripe.net/manual/#client-messages>
pub mod ping;
pub mod request_rrc_list;
pub mod ris_subscribe;
pub mod ris_unsubscribe;

pub trait RisLiveClientMessage: Serialize {
fn msg_type(&self) -> &'static str;

fn to_json_string(&self) -> String {
serde_json::to_string(&json!({
"type": self.msg_type(),
"data": self
}))
.unwrap()
}
}

use serde::Serialize;
use serde_json::json;

pub use ping::Ping;
pub use request_rrc_list::RequestRrcList;
pub use ris_subscribe::RisSubscribe;
pub use ris_unsubscribe::RisUnsubscribe;
34 changes: 34 additions & 0 deletions src/parser/rislive/messages/client/ping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::rislive::messages::RisLiveClientMessage;
use serde::Serialize;

#[derive(Debug)]
pub struct Ping {}

impl Serialize for Ping {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_none()
}
}

impl RisLiveClientMessage for Ping {
fn msg_type(&self) -> &'static str {
"ping"
}
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::{json, Value};

#[test]
fn test_to_json_string() {
let ping_msg = Ping {};
let json_str = ping_msg.to_json_string();
let value: Value = serde_json::from_str(&json_str).unwrap();
assert_eq!(value, json!({"type": "ping", "data": null}));
}
}
40 changes: 40 additions & 0 deletions src/parser/rislive/messages/client/request_rrc_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::rislive::messages::RisLiveClientMessage;
use serde::Serialize;

#[derive(Default, Debug)]
pub struct RequestRrcList {
data: Vec<String>,
}

impl Serialize for RequestRrcList {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.data.serialize(serializer)
}
}

impl RequestRrcList {
pub fn new(rrc_list: Vec<String>) -> Self {
Self { data: rrc_list }
}
}

impl RisLiveClientMessage for RequestRrcList {
fn msg_type(&self) -> &'static str {
"request_rrc_list"
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_request_rrc_list() {
let rrc_list = RequestRrcList::new(vec!["rrc00".to_string(), "rrc01".to_string()]);

println!("{}", rrc_list.to_json_string());
}
}
206 changes: 206 additions & 0 deletions src/parser/rislive/messages/client/ris_subscribe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use crate::rislive::messages::RisLiveClientMessage;
use ipnet::IpNet;
use serde::Serialize;
use std::net::IpAddr;

#[derive(Debug, Serialize)]
#[allow(clippy::upper_case_acronyms)]
pub enum RisSubscribeType {
UPDATE,
OPEN,
NOTIFICATION,
KEEPALIVE,
RIS_PEER_STATE,
}

#[derive(Debug, Serialize)]
pub struct RisSubscribeSocketOptions {
/// Include a Base64-encoded version of the original binary BGP message as `raw` for all subscriptions
///
/// *Default: false*
#[serde(rename = "includeRaw")]
pub include_raw: Option<bool>,

/// Send a `ris_subscribe_ok` message for all succesful subscriptions
///
/// *Default: false*
pub acknowledge: Option<bool>,
}

#[derive(Default, Debug, Serialize)]
pub struct RisSubscribe {
/// Only include messages collected by a particular RRC
#[serde(skip_serializing_if = "Option::is_none")]
pub host: Option<String>,

/// Only include messages of a given BGP or RIS type
#[serde(rename = "type")]
#[serde(skip_serializing_if = "Option::is_none")]
pub data_type: Option<RisSubscribeType>,

/// Only include messages containing a given key
///
/// Examples:
/// * "announcements"
/// * "withdrawals"
#[serde(skip_serializing_if = "Option::is_none")]
pub require: Option<String>,

/// Only include messages sent by the given BGP peer
#[serde(skip_serializing_if = "Option::is_none")]
pub peer: Option<IpAddr>,

/// ASN or pattern to match against the AS PATH attribute
///
/// Any of:
/// * ASN (integer)
/// * AS path pattern (string)
///
/// Comma-separated pattern describing all or part of the AS path.
/// Can optionally begin with ^ to match the first item of the path (the last traversed ASN),
/// and/or end with $ to match the last item of the path
/// (the originating ASN).
///
/// The entire pattern can be prefixed with ! to invert the match.
/// AS_SETs should be written as JSON arrays of integers in ascending order and with no spaces.
/// Note: this is not a regular expression.
///
/// Examples:
/// * "789$"
/// * "^123,456,789,[789,10111]$"
/// * "!6666$"
/// * "!^3333"
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,

/// Filter UPDATE messages by prefixes in announcements or withdrawals
///
/// Any of:
/// * IPv4 or IPv6 CIDR prefix (string)
/// * Array of CIDR prefixes (array)
///
/// For the purposes of subsequent `ris_unsubscribe` messages,
/// each prefix results in a separate subscription that can be stopped independently
///
/// Array items:
/// * IPv4 or IPv6 CIDR prefix (string)
#[serde(skip_serializing_if = "Option::is_none")]
pub prefix: Option<IpNet>,

/// Match prefixes that are more specific (part of) `prefix`
///
/// *Default: true*
#[serde(rename = "moreSpecific")]
#[serde(skip_serializing_if = "Option::is_none")]
pub more_specific: Option<bool>,

/// Match prefixes that are less specific (contain) `prefix`
///
/// *Default: false*
#[serde(rename = "lessSpecific")]
#[serde(skip_serializing_if = "Option::is_none")]
pub less_specific: Option<bool>,

/// Options that apply to all subscriptions over the current WebSocket.
/// If a new subscription contains `socketOptions` it will override those from previous subscriptions
#[serde(rename = "socketOptions")]
#[serde(skip_serializing_if = "Option::is_none")]
pub socket_options: Option<RisSubscribeSocketOptions>,
}

impl RisSubscribe {
pub fn new() -> Self {
Default::default()
}

pub fn host(mut self, host: &str) -> Self {
self.host = Some(host.to_string());
self
}

pub fn data_type(mut self, data_type: RisSubscribeType) -> Self {
self.data_type = Some(data_type);
self
}

pub fn require(mut self, require: &str) -> Self {
self.require = Some(require.to_string());
self
}

pub fn peer(mut self, peer: IpAddr) -> Self {
self.peer = Some(peer);
self
}

pub fn path(mut self, path: &str) -> Self {
self.path = Some(path.to_string());
self
}

pub fn prefix(mut self, prefix: IpNet) -> Self {
self.prefix = Some(prefix);
self
}

pub fn more_specific(mut self, more_specific: bool) -> Self {
self.more_specific = Some(more_specific);
self
}

pub fn less_specific(mut self, less_specific: bool) -> Self {
self.less_specific = Some(less_specific);
self
}

pub fn include_raw(mut self, include_raw: bool) -> Self {
match self.socket_options.as_mut() {
None => {
self.socket_options = Some(RisSubscribeSocketOptions {
include_raw: Some(include_raw),
acknowledge: None,
});
}
Some(o) => {
o.include_raw = Some(include_raw);
}
}
self
}

pub fn acknowledge(mut self, acknowledge: bool) -> Self {
match self.socket_options.as_mut() {
None => {
self.socket_options = Some(RisSubscribeSocketOptions {
include_raw: None,
acknowledge: Some(acknowledge),
});
}
Some(o) => {
o.acknowledge = Some(acknowledge);
}
}
self
}
}

impl RisLiveClientMessage for RisSubscribe {
fn msg_type(&self) -> &'static str {
"ris_subscribe"
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_new() {
let ris_subscribe = RisSubscribe::new()
.host("rrc00")
.data_type(RisSubscribeType::UPDATE);
assert_eq!(ris_subscribe.host, Some("rrc00".to_string()));

println!("{}", ris_subscribe.to_json_string());
}
}
39 changes: 39 additions & 0 deletions src/parser/rislive/messages/client/ris_unsubscribe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::rislive::messages::{RisLiveClientMessage, RisSubscribe};
use serde::Serialize;

#[derive(Default, Debug, Serialize)]
pub struct RisUnsubscribe {
#[serde(flatten)]
data: RisSubscribe,
}

impl RisUnsubscribe {
pub fn new(subscribe: RisSubscribe) -> Self {
Self { data: subscribe }
}
}

impl RisLiveClientMessage for RisUnsubscribe {
fn msg_type(&self) -> &'static str {
"ris_unsubscribe"
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::rislive::messages::client::ris_subscribe::RisSubscribeType;

#[test]
fn test_subscribe() {
let subscribe = RisSubscribe::new()
.host("rrc00")
.data_type(RisSubscribeType::UPDATE)
.acknowledge(true);
let unsubscribe = RisUnsubscribe::new(subscribe);

assert_eq!(unsubscribe.msg_type(), "ris_unsubscribe");

println!("{}", unsubscribe.to_json_string());
}
}
Loading

0 comments on commit 98fa324

Please sign in to comment.