From ff3b1ece638f4b9907e8a847ba6e164564316dfb Mon Sep 17 00:00:00 2001 From: JR Conlin Date: Fri, 2 Nov 2018 10:40:37 -0700 Subject: [PATCH] feat: Switch channelIDs to base64 (#48) * feat: Switch channelIDs to base64 * change channelID to random char string * fix group add (again) * reduce number of times metadata fetched * remove ChannelType, ChannelId * change channel id to "session_id" to be clear where it comes from * convert `From` trait to `from_str() -> Result` Closes #47 --- Cargo.lock | 12 ++- channelserver/Cargo.toml | 3 +- channelserver/src/channelid.rs | 67 ++++++++++++++++ channelserver/src/main.rs | 44 ++++++----- channelserver/src/server.rs | 135 ++++++++++++++++----------------- channelserver/src/session.rs | 13 ++-- 6 files changed, 177 insertions(+), 97 deletions(-) create mode 100644 channelserver/src/channelid.rs diff --git a/Cargo.lock b/Cargo.lock index 6ae5e24..4458d94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,6 +202,14 @@ dependencies = [ "safemem 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "base64" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "bitflags" version = "1.0.4" @@ -264,10 +272,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "channelserver" -version = "0.5.0" +version = "0.6.0" dependencies = [ "actix 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 0.7.13 (registry+https://github.com/rust-lang/crates.io-index)", + "base64 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.2.7 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "cadence 0.15.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2208,6 +2217,7 @@ dependencies = [ "checksum backtrace 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "346d7644f0b5f9bc73082d3b2236b69a05fd35cce0cfa3724e184e6a5c9e2a2f" "checksum backtrace 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "89a47830402e9981c5c41223151efcced65a0510c13097c769cede7efb34782a" "checksum backtrace-sys 0.1.24 (registry+https://github.com/rust-lang/crates.io-index)" = "c66d56ac8dabd07f6aacdaf633f4b8262f5b3601a810a0dcddffd5c22c69daa0" +"checksum base64 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "621fc7ecb8008f86d7fb9b95356cd692ce9514b80a86d85b397f32a22da7b9e2" "checksum base64 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "489d6c0ed21b11d038c31b6ceccca973e65d73ba3bd8ecb9a2babf5546164643" "checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12" "checksum brotli-sys 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4445dea95f4c2b41cde57cc9fee236ae4dbae88d8fcbdb4750fc1bb5d86aaecd" diff --git a/channelserver/Cargo.toml b/channelserver/Cargo.toml index df362e2..6044149 100644 --- a/channelserver/Cargo.toml +++ b/channelserver/Cargo.toml @@ -1,9 +1,10 @@ [package] name = "channelserver" -version = "0.5.0" +version = "0.6.0" authors = ["jr conlin String { + base64::encode_config(&self.value, base64::URL_SAFE_NO_PAD) + } + + pub fn from_str<'a>(string: &'a str) -> Result { + let bytes = base64::decode_config(string, base64::URL_SAFE_NO_PAD)?; + let mut array = [0; 16]; + array.copy_from_slice(&bytes[..16]); + Ok(ChannelID { value: array }) + } +} + +impl Default for ChannelID { + fn default() -> Self { + let mut rng = rand::thread_rng(); + let mut bytes = [0; CHANNELID_LEN]; + rng.fill_bytes(&mut bytes); + Self { value: bytes } + } +} + +impl fmt::Display for ChannelID { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // calling to_string() causes a stack overflow. + let as_b64 = base64::encode_config(&self.value, base64::URL_SAFE_NO_PAD); + write!(f, "{}", as_b64) + } +} + +impl Serialize for ChannelID { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_parse() { + let raw_id = "j6jLPVPeQR6diyrkQinRAQ"; + // From URLSafe b64 + let chan = ChannelID::from_str(raw_id).unwrap(); + assert!(chan.to_string() == raw_id.to_owned()); + ChannelID::from_str("invalid").expect_err("rejected"); + let output = format!("{}", chan); + assert_eq!("j6jLPVPeQR6diyrkQinRAQ".to_owned(), output); + } +} diff --git a/channelserver/src/main.rs b/channelserver/src/main.rs index f25cac1..4d3b6bb 100644 --- a/channelserver/src/main.rs +++ b/channelserver/src/main.rs @@ -1,5 +1,6 @@ //#![feature(custom_derive, try_from)] #![allow(unused_variables)] +extern crate base64; extern crate byteorder; extern crate bytes; extern crate config; @@ -35,8 +36,8 @@ use std::time::{Duration, Instant}; use actix::Arbiter; use actix_web::server::HttpServer; use actix_web::{fs, http, ws, App, Error, HttpRequest, HttpResponse}; -use uuid::Uuid; +mod channelid; mod logging; mod meta; mod metrics; @@ -48,12 +49,19 @@ mod settings; /* * based on the Actix websocket example ChatServer */ +fn channel_route_base( + req: &HttpRequest, +) -> Result { + info!(&req.state().log.log, "### Channel Route Base!"); + channel_route(req) +} /// Entry point for our route fn channel_route(req: &HttpRequest) -> Result { // not sure if it's possible to have actix_web parse the path and have a properly // scoped request, since the calling structure is different for the two, so // manually extracting the id from the path. + info!(&req.state().log.log, "@@@ Channel Route!"); let mut path: Vec<_> = req.path().split("/").collect(); let meta_info = meta::SenderData::from(req.clone()); let mut initial_connect = true; @@ -61,27 +69,28 @@ fn channel_route(req: &HttpRequest) -> Result { // if the id is valid, but not present, treat it like a "None" if id.len() == 0 { - Uuid::new_v4() + channelid::ChannelID::default() } else { initial_connect = false; - Uuid::parse_str(id).unwrap_or_else(|e| { - warn!(&req.state().log.log, - "Invalid ChannelID specified: {:?}", e; - "remote_ip" => &meta_info.remote); - Uuid::nil() - }) + let channel_id = match channelid::ChannelID::from_str(id) { + Ok(channelid) => channelid, + Err(err) => { + warn!(&req.state().log.log, + "Invalid ChannelID specified: {:?}", id; + "remote_ip" => &meta_info.remote); + return Ok(HttpResponse::new(http::StatusCode::NOT_FOUND)); + } + }; + channel_id } } - None => Uuid::new_v4(), + None => channelid::ChannelID::default(), }; - if channel == Uuid::nil() { - return Ok(HttpResponse::new(http::StatusCode::NOT_FOUND)); - } info!( &req.state().log.log, "Creating session for {} channel: \"{}\"", if initial_connect {"new"} else {"candiate"}, - channel.to_simple().to_string(); + channel.to_string(); "remote_ip" => &meta_info.remote ); @@ -120,10 +129,10 @@ fn show_version(req: &HttpRequest) -> Result) -> App { let mut mapp = app + // connecting to an empty channel creates a new one. + .resource("/v1/ws/", |r| r.route().f(channel_route_base)) // websocket to an existing channel .resource("/v1/ws/{channel}", |r| r.route().f(channel_route)) - // connecting to an empty channel creates a new one. - .resource("/v1/ws/", |r| r.route().f(channel_route)) .resource("/__version__", |r| { r.method(http::Method::GET).f(show_version) }) @@ -186,8 +195,7 @@ fn main() { } else { logging::MozLogger::new_json() }; - let metrics = metrics::metrics_from_opts( - &msettings, logging).unwrap(); + let metrics = metrics::metrics_from_opts(&msettings, logging).unwrap(); let iploc = maxminddb::Reader::open(&db_loc).unwrap_or_else(|x| { use std::process::exit; println!("Could not read geoip database {:?}", x); @@ -221,7 +229,7 @@ mod test { use actix_web::test; use actix_web::HttpMessage; - use cadence::{StatsdClient, NopMetricSink}; + use cadence::{NopMetricSink, StatsdClient}; use super::*; fn get_server() -> test::TestServer { diff --git a/channelserver/src/server.rs b/channelserver/src/server.rs index c4ac371..75a577d 100644 --- a/channelserver/src/server.rs +++ b/channelserver/src/server.rs @@ -11,11 +11,11 @@ use actix::prelude::{Actor, Context, Handler, Recipient}; // use cadence::StatsdClient; use rand::{self, Rng, ThreadRng}; +use channelid::ChannelID; use logging::MozLogger; use meta; // use metrics; use perror; -use session; use settings::Settings; pub const EOL: &'static str = "\x04"; @@ -54,14 +54,13 @@ pub struct TextMessage(pub MessageType, pub String); /// Message for chat server communications /// Individual session identifier pub type SessionId = usize; -pub type ChannelId = usize; /// New chat session is created #[derive(Message)] #[rtype(SessionId)] pub struct Connect { pub addr: Recipient, - pub channel: session::ChannelName, + pub channel: ChannelID, pub remote: Option, pub initial_connect: bool, } @@ -69,7 +68,7 @@ pub struct Connect { /// Session is disconnected #[derive(Message)] pub struct Disconnect { - pub channel: session::ChannelName, + pub channel: ChannelID, pub id: SessionId, pub reason: DisconnectReason, } @@ -84,27 +83,27 @@ pub struct ClientMessage { /// Peer message pub message: String, /// channel name - pub channel: session::ChannelName, + pub channel: ChannelID, /// Sender info pub sender: meta::SenderData, } #[derive(Eq, PartialEq, Clone, Debug)] pub struct Channel { - pub id: ChannelId, + pub session_id: SessionId, pub started: Instant, pub msg_count: u8, pub data_exchanged: usize, pub remote: Option, } -type Channels = HashMap; +type Channels = HashMap; /// `ChannelServer` manages chat channels and responsible for coordinating chat /// session. implementation is super primitive pub struct ChannelServer { // collections of sessions grouped by channel - channels: HashMap, + channels: HashMap, // individual connections sessions: HashMap>, // random number generator @@ -136,7 +135,7 @@ impl ChannelServer { /// Send message to all users in the channel except skip_id fn send_message( &mut self, - channel: &session::ChannelName, + channel: &ChannelID, message: &str, skip_id: SessionId, ) -> Result<(), perror::HandlerError> { @@ -174,8 +173,8 @@ impl ChannelServer { // self.metrics.borrow().incr("conn.max.msg").ok(); return Err(perror::HandlerErrorKind::XSMessageErr(remote.to_owned()).into()); } - if party.id != skip_id { - if let Some(addr) = self.sessions.get(&party.id) { + if party.session_id != skip_id { + if let Some(addr) = self.sessions.get(&party.session_id) { addr.do_send(TextMessage(MessageType::Text, message.to_owned())) .ok(); } @@ -185,7 +184,7 @@ impl ChannelServer { Ok(()) } - fn disconnect(&mut self, channel: &session::ChannelName, id: &usize) { + fn disconnect(&mut self, channel: &ChannelID, id: &usize) { if let Some(participants) = self.channels.get_mut(channel) { for (pid, info) in participants { if id == pid { @@ -212,7 +211,7 @@ impl ChannelServer { /// Kill a channel and terminate all participants. /// /// This sends a Terminate to each participant, which forces the connection closed. - fn shutdown(&mut self, channel: &session::ChannelName) { + fn shutdown(&mut self, channel: &ChannelID) { if let Some(participants) = self.channels.get(channel) { for (id, info) in participants { if let Some(addr) = self.sessions.get(&id) { @@ -223,11 +222,7 @@ impl ChannelServer { self.sessions.remove(&id); } } - debug!( - self.log.log, - "Removing channel {}", - channel.to_simple().to_string() - ); + debug!(self.log.log, "Removing channel {}", channel); self.channels.remove(channel); } } @@ -264,76 +259,78 @@ impl Handler for ChannelServer { let session_id = self.rng.borrow_mut().gen::(); let new_session = Channel { // register session with random id - id: session_id.clone(), + session_id: session_id.clone(), started: Instant::now(), msg_count: 0, data_exchanged: 0, remote: msg.remote.clone(), }; - self.sessions.insert(new_session.id, msg.addr.clone()); + self.sessions + .insert(new_session.session_id, msg.addr.clone()); debug!( self.log.log, "New connection"; - "channel" => &msg.channel.to_simple().to_string(), - "session" => &new_session.id, + "channel" => &msg.channel.to_string(), + "session" => &new_session.session_id, "remote_ip" => &msg.remote, ); - let chan_id = &msg.channel.to_simple(); + let chan_id = &msg.channel; // Is this a new channel request? if let Entry::Vacant(entry) = self.channels.entry(msg.channel) { // Is this the first time we're requesting this channel? if !&msg.initial_connect { warn!( - self.log.log, - "Attempt to connect to unknown channel"; - "channel" => &chan_id.to_string(), - "remote_ip" => &msg.remote.clone().unwrap_or("Unknown".to_owned()), - ); + self.log.log, + "Attempt to connect to unknown channel"; + "channel" => &chan_id.to_string(), + "remote_ip" => &msg.remote.clone().unwrap_or("Unknown".to_owned()), + ); return 0; } - let group = entry.insert(HashMap::new()); - // self.metrics.borrow().incr("conn.new").ok(); - if group.len() >= self.settings.borrow().max_channel_connections.into() { - warn!( + entry.insert(HashMap::new()); + }; + let group = self.channels.get_mut(&msg.channel).unwrap(); + if group.len() >= self.settings.borrow().max_channel_connections.into() { + warn!( + self.log.log, + "Too many connections requested for channel"; + "channel" => &chan_id.to_string(), + "remote_ip" => &new_session.remote.unwrap_or("Uknown".to_owned()), + ); + self.sessions.remove(&new_session.session_id); + // self.metrics.borrow().incr("conn.max.conn").ok(); + // It doesn't make sense to impose a high penalty for this + // behavior, but we may want to flag and log the origin + // IP for later analytics. + // We could also impose a tiny penalty on the IP (if possible) + // which would minimally impact accidental occurances, but + // add up for major infractors. + return 0; + } + + // The group should have two principle parties, the auth and supplicant + // Any connection beyond that group should be checked to ensure it's + // from a known IP. If a principle that only has one connection and it + // drops, it is possible that it can't reconnect, but that's not a bad + // thing. We should just let the connection expire as invalid so that + // it's not stolen. + if group.len() > 2 { + if !reconnect_check(&group, &new_session.remote) { + error!( self.log.log, - "Too many connections requested for channel"; - "channel" => &chan_id.to_string(), + "Unexpected remote connection"; "remote_ip" => &new_session.remote.unwrap_or("Uknown".to_owned()), ); - self.sessions.remove(&new_session.id); - // self.metrics.borrow().incr("conn.max.conn").ok(); - // It doesn't make sense to impose a high penalty for this - // behavior, but we may want to flag and log the origin - // IP for later analytics. - // We could also impose a tiny penalty on the IP (if possible) - // which would minimally impact accidental occurances, but - // add up for major infractors. return 0; } - // The group should have two principle parties, the auth and supplicant - // Any connection beyond that group should be checked to ensure it's - // from a known IP. If a principle that only has one connection and it - // drops, it is possible that it can't reconnect, but that's not a bad - // thing. We should just let the connection expire as invalid so that - // it's not stolen. - if group.len() > 2 { - if !reconnect_check(&group, &new_session.remote) { - error!( - self.log.log, - "Unexpected remote connection"; - "remote_ip" => &new_session.remote.unwrap_or("Uknown".to_owned()), - ); - return 0; - } - } - debug!(self.log.log, - "Adding session to channel"; - "channel" => &chan_id.to_string(), - "session" => &new_session.id, - "remote_ip" => &new_session.remote.clone().unwrap_or("Uknown".to_owned()), - ); - group.insert(session_id.clone(), new_session); - } + }; + debug!(self.log.log, + "Adding session to channel"; + "channel" => &chan_id.to_string(), + "session" => &new_session.session_id, + "remote_ip" => &new_session.remote.clone().unwrap_or("Uknown".to_owned()), + ); + group.insert(session_id.clone(), new_session); // tell the client what their channel is. let jpath = json!({ "link": format!("/v1/ws/{}", chan_id), "channelid": chan_id.to_string() }); @@ -353,7 +350,7 @@ impl Handler for ChannelServer { debug!( self.log.log, "Connection dropped"; - "channel" => &msg.channel.to_simple().to_string(), + "channel" => &msg.channel.to_string(), "session" => &msg.id, "reason" => format!("{}", &msg.reason), ); @@ -397,7 +394,7 @@ mod test { test_group.insert( 1, Channel { - id: 1, + session_id: 1, started: Instant::now(), msg_count: 0, data_exchanged: 0, @@ -407,7 +404,7 @@ mod test { test_group.insert( 2, Channel { - id: 1, + session_id: 1, started: Instant::now(), msg_count: 0, data_exchanged: 0, diff --git a/channelserver/src/session.rs b/channelserver/src/session.rs index 5c93920..56d49bb 100644 --- a/channelserver/src/session.rs +++ b/channelserver/src/session.rs @@ -5,17 +5,15 @@ use actix::{ Running, StreamHandler, WrapFuture, }; use actix_web::ws; -use cadence::{StatsdClient}; +use cadence::StatsdClient; use ipnet::IpNet; use maxminddb; -use uuid::Uuid; +use channelid::ChannelID; use logging; use meta::SenderData; use server; -pub type ChannelName = Uuid; - /// This is our websocket route state, this state is shared with all route /// instances via `HttpContext::state()` pub struct WsChannelSessionState { @@ -38,7 +36,7 @@ pub struct WsChannelSession { /// Max channel lifespan pub expiry: Instant, /// joined channel - pub channel: ChannelName, + pub channel: ChannelID, /// peer name pub meta: SenderData, /// is this the first request for the given channel? @@ -59,7 +57,6 @@ impl Actor for WsChannelSession { self.hb(ctx); - self.meta = SenderData::from(ctx.request().clone()); let meta = self.meta.clone(); let addr: Addr = ctx.address(); ctx.state() @@ -202,7 +199,7 @@ impl WsChannelSession { ctx.state().log.log, "Client connected too long"; "session" => &act.id, - "channel" => &act.channel.to_simple().to_string(), + "channel" => &act.channel.to_string(), "remote_ip" => &act.meta.remote, ); ctx.state().addr.do_send(server::Disconnect { @@ -222,7 +219,7 @@ impl WsChannelSession { ctx.state().log.log, "Client time-out. Disconnecting"; "session" => &act.id, - "channel" => &act.channel.to_simple().to_string(), + "channel" => &act.channel.to_string(), "remote_ip" => &act.meta.remote, ); ctx.state().addr.do_send(server::Disconnect {