diff --git a/src/client.rs b/src/client.rs index 42435b789..8fb93485d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -343,11 +343,7 @@ where let response = Box::new(data.srv.ddb.hello( &connected_at, uaid.as_ref(), - &data.srv.opts.router_table_name, &data.srv.opts.router_url, - &data.srv.opts.message_table_names, - &data.srv.opts.current_message_month, - &data.srv.metrics, )); transition!(AwaitProcessHello { response, @@ -567,12 +563,25 @@ where }, #[state_machine_future( - transitions(IncrementStorage, CheckStorage, AwaitDropUser, AwaitMigrateUser, AwaitInput) + transitions( + IncrementStorage, + CheckStorage, + AwaitDropUser, + AwaitMigrateUser, + AwaitInput + ) )] DetermineAck { data: AuthClientData }, #[state_machine_future( - transitions(DetermineAck, Send, AwaitInput, AwaitRegister, AwaitUnregister, AwaitDelete) + transitions( + DetermineAck, + Send, + AwaitInput, + AwaitRegister, + AwaitUnregister, + AwaitDelete + ) )] AwaitInput { data: AuthClientData }, @@ -683,11 +692,7 @@ where // Exceeded the max limit of stored messages: drop the user to trigger a // re-register debug!("Dropping user: exceeded msg_limit"); - let response = Box::new( - data.srv - .ddb - .drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid), - ); + let response = Box::new(data.srv.ddb.drop_uaid(&webpush.uaid)); transition!(AwaitDropUser { response, data }); } else if !smessages.is_empty() { transition!(Send { smessages, data }); @@ -708,20 +713,15 @@ where transition!(CheckStorage { data }); } else if all_acked && webpush.flags.rotate_message_table { debug!("Triggering migration"); - let response = Box::new(data.srv.ddb.migrate_user( - &webpush.uaid, - &webpush.message_month, - &data.srv.opts.current_message_month, - &data.srv.opts.router_table_name, - )); - transition!(AwaitMigrateUser { response, data }); - } else if all_acked && webpush.flags.reset_uaid { - debug!("Dropping user: flagged reset_uaid"); let response = Box::new( data.srv .ddb - .drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid), + .migrate_user(&webpush.uaid, &webpush.message_month), ); + transition!(AwaitMigrateUser { response, data }); + } else if all_acked && webpush.flags.reset_uaid { + debug!("Dropping user: flagged reset_uaid"); + let response = Box::new(data.srv.ddb.drop_uaid(&webpush.uaid)); transition!(AwaitDropUser { response, data }); } transition!(AwaitInput { data }) @@ -1006,7 +1006,7 @@ where let AwaitMigrateUser { data, .. } = await_migrate_user.take(); { let mut webpush = data.webpush.borrow_mut(); - webpush.message_month = data.srv.opts.current_message_month.clone(); + webpush.message_month = data.srv.ddb.current_message_month.clone(); webpush.flags.rotate_message_table = false; } transition!(DetermineAck { data }) diff --git a/src/db/commands.rs b/src/db/commands.rs index 609cfe8c7..ca8807ee5 100644 --- a/src/db/commands.rs +++ b/src/db/commands.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::fmt::Display; use std::rc::Rc; use std::result::Result as StdResult; use uuid::Uuid; @@ -49,6 +50,7 @@ pub fn list_tables( pub fn fetch_messages( ddb: Rc>, + metrics: &Rc, table_name: &str, uaid: &Uuid, limit: u32, @@ -66,19 +68,22 @@ pub fn fetch_messages( ..Default::default() }; + let metrics = Rc::clone(metrics); let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)); retry_if(move || ddb.query(&input), cond) .chain_err(|| "Error fetching messages") - .and_then(|output| { + .and_then(move |output| { let mut notifs: Vec = output.items.map_or_else(Vec::new, |items| { debug!("Got response of: {:?}", items); - // TODO: Capture translation errors and report them as we shouldn't - // have corrupt data items .into_iter() .inspect(|i| debug!("Item: {:?}", i)) - .filter_map(|item| serde_dynamodb::from_hashmap(item).ok()) + .filter_map(|item| { + ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| { + conversion_err(&metrics, e, "serde_dynamodb_from_hashmap") + }) + }) .collect() }); if notifs.is_empty() { @@ -89,10 +94,13 @@ pub fn fetch_messages( // the first DynamoDbNotification and remove it from the vec. let timestamp = notifs.remove(0).current_timestamp; // Convert any remaining DynamoDbNotifications to Notification's - // TODO: Capture translation errors and report them as we shouldn't have corrupt data let messages = notifs .into_iter() - .filter_map(|ddb_notif| ddb_notif.into_notif().ok()) + .filter_map(|ddb_notif| { + ok_or_inspect(ddb_notif.into_notif(), |e| { + conversion_err(&metrics, e, "into_notif") + }) + }) .collect(); Ok(FetchMessageResponse { timestamp, @@ -103,6 +111,7 @@ pub fn fetch_messages( pub fn fetch_timestamp_messages( ddb: Rc>, + metrics: &Rc, table_name: &str, uaid: &Uuid, timestamp: Option, @@ -126,17 +135,25 @@ pub fn fetch_timestamp_messages( ..Default::default() }; + let metrics = Rc::clone(metrics); let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_)); retry_if(move || ddb.query(&input), cond) .chain_err(|| "Error fetching messages") - .and_then(|output| { + .and_then(move |output| { let messages = output.items.map_or_else(Vec::new, |items| { debug!("Got response of: {:?}", items); - // TODO: Capture translation errors and report them as we shouldn't have corrupt data items .into_iter() - .filter_map(|item| serde_dynamodb::from_hashmap(item).ok()) - .filter_map(|ddb_notif: DynamoDbNotification| ddb_notif.into_notif().ok()) + .filter_map(|item| { + ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| { + conversion_err(&metrics, e, "serde_dynamodb_from_hashmap") + }) + }) + .filter_map(|ddb_notif: DynamoDbNotification| { + ok_or_inspect(ddb_notif.into_notif(), |e| { + conversion_err(&metrics, e, "into_notif") + }) + }) .collect() }); let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max(); @@ -334,13 +351,13 @@ pub fn unregister_channel_id( pub fn lookup_user( ddb: Rc>, + metrics: &Rc, uaid: &Uuid, connected_at: &u64, router_url: &str, router_table_name: &str, message_table_names: &[String], current_message_month: &str, - metrics: &StatsdClient, ) -> MyFuture<(HelloResponse, Option)> { let response = get_uaid(ddb.clone(), uaid, router_table_name); // Prep all these for the move into the static closure capture @@ -350,7 +367,7 @@ pub fn lookup_user( let messages_tables = message_table_names.to_vec(); let connected_at = *connected_at; let router_url = router_url.to_string(); - let metrics = metrics.clone(); + let metrics = Rc::clone(metrics); let response = response.and_then(move |data| -> MyFuture<_> { let mut hello_response = HelloResponse { message_month: cur_month.clone(), @@ -417,3 +434,30 @@ fn handle_user_result( user.connected_at = connected_at; Ok(user) } + +/// Like Result::ok, convert from Result to Option but applying a +/// function to the Err value +fn ok_or_inspect(result: StdResult, op: F) -> Option +where + F: FnOnce(E), +{ + match result { + Ok(t) => Some(t), + Err(e) => { + op(e); + None + } + } +} + +/// Log/metric errors during conversions to Notification +fn conversion_err(metrics: &StatsdClient, err: E, name: &'static str) +where + E: Display, +{ + error!("Failed {} conversion: {}", name, err); + metrics + .incr_with_tags("ua.notification_read.error") + .with_tag("conversion", name) + .send(); +} diff --git a/src/db/mod.rs b/src/db/mod.rs index 09726e9d1..59910b289 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -22,7 +22,7 @@ mod commands; mod models; use errors::*; use protocol::Notification; -use server::Server; +use server::{Server, ServerOptions}; mod util; use util::timing::sec_since_epoch; @@ -62,10 +62,14 @@ pub enum RegisterResponse { pub struct DynamoStorage { ddb: Rc>, + metrics: Rc, + router_table_name: String, + message_table_names: Vec, + pub current_message_month: String, } impl DynamoStorage { - pub fn new() -> Self { + pub fn from_opts(opts: &ServerOptions, metrics: StatsdClient) -> Result { let ddb: Box = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") { Box::new(DynamoDbClient::new( RequestDispatcher::default(), @@ -78,27 +82,23 @@ impl DynamoStorage { } else { Box::new(DynamoDbClient::simple(Region::default())) }; - Self { ddb: Rc::new(ddb) } - } + let ddb = Rc::new(ddb); - pub fn list_message_tables(&self, prefix: &str) -> Result> { - let mut names: Vec = Vec::new(); - let mut start_key = None; - loop { - let result = commands::list_tables(self.ddb.clone(), start_key).wait()?; - start_key = result.last_evaluated_table_name; - if let Some(table_names) = result.table_names { - names.extend(table_names); - } - if start_key.is_none() { - break; - } - } - let names = names - .into_iter() - .filter(|name| name.starts_with(prefix)) - .collect(); - Ok(names) + let mut message_table_names = list_message_tables(&ddb, &opts._message_table_name) + .map_err(|_| "Failed to locate message tables")?; + message_table_names.sort_unstable(); + let current_message_month = message_table_names + .last() + .ok_or("No last message month found")? + .to_string(); + + Ok(Self { + ddb, + metrics: Rc::new(metrics), + router_table_name: opts._router_table_name.clone(), + message_table_names, + current_message_month, + }) } pub fn increment_storage( @@ -136,28 +136,23 @@ impl DynamoStorage { &self, connected_at: &u64, uaid: Option<&Uuid>, - router_table_name: &str, router_url: &str, - message_table_names: &[String], - current_message_month: &str, - metrics: &StatsdClient, ) -> impl Future { - let router_table_name = router_table_name.to_string(); let response: MyFuture<(HelloResponse, Option)> = if let Some(uaid) = uaid { commands::lookup_user( self.ddb.clone(), + &self.metrics, &uaid, connected_at, router_url, - &router_table_name, - message_table_names, - current_message_month, - metrics, + &self.router_table_name, + &self.message_table_names, + &self.current_message_month, ) } else { Box::new(future::ok(( HelloResponse { - message_month: current_message_month.to_string(), + message_month: self.current_message_month.clone(), connected_at: *connected_at, ..Default::default() }, @@ -166,6 +161,7 @@ impl DynamoStorage { }; let ddb = self.ddb.clone(); let router_url = router_url.to_string(); + let router_table_name = self.router_table_name.clone(); let connected_at = *connected_at; response.and_then(move |(mut hello_response, user_opt)| { @@ -179,7 +175,7 @@ impl DynamoStorage { let uaid = user.uaid; let mut err_response = hello_response.clone(); err_response.connected_at = connected_at; - commands::register_user(ddb, &user, router_table_name.as_ref()) + commands::register_user(ddb, &user, &router_table_name) .and_then(move |result| { debug!("Success adding user, item output: {:?}", result); hello_response.uaid = Some(uaid); @@ -223,12 +219,8 @@ impl DynamoStorage { Box::new(response) } - pub fn drop_uaid( - &self, - table_name: &str, - uaid: &Uuid, - ) -> impl Future { - commands::drop_user(self.ddb.clone(), uaid, table_name) + pub fn drop_uaid(&self, uaid: &Uuid) -> impl Future { + commands::drop_user(self.ddb.clone(), uaid, &self.router_table_name) .and_then(|_| future::ok(())) .chain_err(|| "Unable to drop user record") } @@ -249,15 +241,13 @@ impl DynamoStorage { &self, uaid: &Uuid, message_month: &str, - current_message_month: &str, - router_table_name: &str, ) -> impl Future { let uaid = *uaid; let ddb = self.ddb.clone(); let ddb2 = self.ddb.clone(); - let cur_month = current_message_month.to_string(); + let cur_month = self.current_message_month.to_string(); let cur_month2 = cur_month.clone(); - let router_table_name = router_table_name.to_string(); + let router_table_name = self.router_table_name.clone(); commands::all_channels(self.ddb.clone(), &uaid, message_month) .and_then(move |channels| -> MyFuture<_> { @@ -350,6 +340,7 @@ impl DynamoStorage { let response: MyFuture = if include_topic { Box::new(commands::fetch_messages( self.ddb.clone(), + &self.metrics, table_name, uaid, 11 as u32, @@ -360,6 +351,7 @@ impl DynamoStorage { let uaid = *uaid; let table_name = table_name.to_string(); let ddb = self.ddb.clone(); + let metrics = Rc::clone(&self.metrics); response.and_then(move |resp| -> MyFuture<_> { // Return now from this future if we have messages @@ -381,6 +373,7 @@ impl DynamoStorage { if resp.messages.is_empty() || resp.timestamp.is_some() { Box::new(commands::fetch_timestamp_messages( ddb, + &metrics, table_name.as_ref(), &uaid, timestamp, @@ -404,3 +397,23 @@ impl DynamoStorage { }) } } + +pub fn list_message_tables(ddb: &Rc>, prefix: &str) -> Result> { + let mut names: Vec = Vec::new(); + let mut start_key = None; + loop { + let result = commands::list_tables(Rc::clone(ddb), start_key).wait()?; + start_key = result.last_evaluated_table_name; + if let Some(table_names) = result.table_names { + names.extend(table_names); + } + if start_key.is_none() { + break; + } + } + let names = names + .into_iter() + .filter(|name| name.starts_with(prefix)) + .collect(); + Ok(names) +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 7f6d5a12c..db82a4989 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -129,9 +129,8 @@ pub struct ServerOptions { pub auto_ping_timeout: Duration, pub max_connections: Option, pub close_handshake_timeout: Option, - pub message_table_names: Vec, - pub current_message_month: String, - pub router_table_name: String, + pub _message_table_name: String, + pub _router_table_name: String, pub router_url: String, pub endpoint_url: String, pub statsd_host: Option, @@ -157,13 +156,9 @@ impl ServerOptions { .collect(); let fernet = MultiFernet::new(fernets); - let ddb = DynamoStorage::new(); - let message_table_names = ddb - .list_message_tables(&settings.message_tablename) - .expect("Failed to locate message tables"); let router_url = settings.router_url(); let endpoint_url = settings.endpoint_url(); - let mut opts = Self { + Ok(Self { debug: settings.debug, port: settings.port, fernet, @@ -174,9 +169,8 @@ impl ServerOptions { Some(settings.statsd_host) }, statsd_port: settings.statsd_port, - message_table_names, - current_message_month: "".to_string(), - router_table_name: settings.router_tablename, + _message_table_name: settings.message_tablename, + _router_table_name: settings.router_tablename, router_url, endpoint_url, ssl_key: settings.router_ssl_key.map(PathBuf::from), @@ -199,14 +193,7 @@ impl ServerOptions { .expect("megaphone poll interval cannot be 0"), human_logs: settings.human_logs, msg_limit: settings.msg_limit, - }; - opts.message_table_names.sort_unstable(); - opts.current_message_month = opts - .message_table_names - .last() - .expect("No last message month found") - .to_string(); - Ok(opts) + }) } } @@ -314,15 +301,17 @@ impl Server { } else { BroadcastChangeTracker::new(Vec::new()) }; + let metrics = metrics_from_opts(opts)?; + let srv = Rc::new(Server { opts: opts.clone(), broadcaster: RefCell::new(broadcaster), - ddb: DynamoStorage::new(), + ddb: DynamoStorage::from_opts(opts, metrics.clone())?, uaids: RefCell::new(HashMap::new()), open_connections: Cell::new(0), handle: core.handle(), tls_acceptor: tls::configure(opts), - metrics: metrics_from_opts(opts)?, + metrics, }); let addr = SocketAddr::from(([0, 0, 0, 0], srv.opts.port)); let ws_listener = TcpListener::bind(&addr, &srv.handle)?;