Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: emit metrics for any failed notif conversions #39

Merged
merged 2 commits into from
Jul 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,7 @@ where
let response = Box::new(data.srv.ddb.hello(
&connected_at,
uaid.as_ref(),
&data.srv.opts.router_table_name,
&data.srv.opts.router_url,
&data.srv.opts.message_table_names,
&data.srv.opts.current_message_month,
&data.srv.metrics,
));
transition!(AwaitProcessHello {
response,
Expand Down Expand Up @@ -567,12 +563,25 @@ where
},

#[state_machine_future(
transitions(IncrementStorage, CheckStorage, AwaitDropUser, AwaitMigrateUser, AwaitInput)
transitions(
IncrementStorage,
CheckStorage,
AwaitDropUser,
AwaitMigrateUser,
AwaitInput
)
)]
DetermineAck { data: AuthClientData<T> },

#[state_machine_future(
transitions(DetermineAck, Send, AwaitInput, AwaitRegister, AwaitUnregister, AwaitDelete)
transitions(
DetermineAck,
Send,
AwaitInput,
AwaitRegister,
AwaitUnregister,
AwaitDelete
)
)]
AwaitInput { data: AuthClientData<T> },

Expand Down Expand Up @@ -683,11 +692,7 @@ where
// Exceeded the max limit of stored messages: drop the user to trigger a
// re-register
debug!("Dropping user: exceeded msg_limit");
let response = Box::new(
data.srv
.ddb
.drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid),
);
let response = Box::new(data.srv.ddb.drop_uaid(&webpush.uaid));
transition!(AwaitDropUser { response, data });
} else if !smessages.is_empty() {
transition!(Send { smessages, data });
Expand All @@ -708,20 +713,15 @@ where
transition!(CheckStorage { data });
} else if all_acked && webpush.flags.rotate_message_table {
debug!("Triggering migration");
let response = Box::new(data.srv.ddb.migrate_user(
&webpush.uaid,
&webpush.message_month,
&data.srv.opts.current_message_month,
&data.srv.opts.router_table_name,
));
transition!(AwaitMigrateUser { response, data });
} else if all_acked && webpush.flags.reset_uaid {
debug!("Dropping user: flagged reset_uaid");
let response = Box::new(
data.srv
.ddb
.drop_uaid(&data.srv.opts.router_table_name, &webpush.uaid),
.migrate_user(&webpush.uaid, &webpush.message_month),
);
transition!(AwaitMigrateUser { response, data });
} else if all_acked && webpush.flags.reset_uaid {
debug!("Dropping user: flagged reset_uaid");
let response = Box::new(data.srv.ddb.drop_uaid(&webpush.uaid));
transition!(AwaitDropUser { response, data });
}
transition!(AwaitInput { data })
Expand Down Expand Up @@ -1006,7 +1006,7 @@ where
let AwaitMigrateUser { data, .. } = await_migrate_user.take();
{
let mut webpush = data.webpush.borrow_mut();
webpush.message_month = data.srv.opts.current_message_month.clone();
webpush.message_month = data.srv.ddb.current_message_month.clone();
webpush.flags.rotate_message_table = false;
}
transition!(DetermineAck { data })
Expand Down
68 changes: 56 additions & 12 deletions src/db/commands.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::fmt::Display;
use std::rc::Rc;
use std::result::Result as StdResult;
use uuid::Uuid;
Expand Down Expand Up @@ -49,6 +50,7 @@ pub fn list_tables(

pub fn fetch_messages(
ddb: Rc<Box<DynamoDb>>,
metrics: &Rc<StatsdClient>,
table_name: &str,
uaid: &Uuid,
limit: u32,
Expand All @@ -66,19 +68,22 @@ pub fn fetch_messages(
..Default::default()
};

let metrics = Rc::clone(metrics);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed the rust team began recommending Rc::clone vs .clone about a year ago, makes it clearer that this is a cheap Rc clone vs worrying about it being a different clone

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense for containers like RC (and I suppose vec may be similar), but I keep feeling that a given object should be responsible for making it's own clone, since it knows best how deep it needs to copy itself. I wonder if we'll see this pattern play out for other, similar structures?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not Vec but it makes sense for smart pointers. the original RFC (which suggested a new method new_ref instead that was rejected) mentioned Rc/Arc and the associated Weak pointer types. Here's a pretty good example it outlines:

https://github.com/nical/rfcs/blob/rc-newref-clone/text/0000-rc-newref-clone.md#example

let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.query(&input), cond)
.chain_err(|| "Error fetching messages")
.and_then(|output| {
.and_then(move |output| {
let mut notifs: Vec<DynamoDbNotification> =
output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
// TODO: Capture translation errors and report them as we shouldn't
// have corrupt data
items
.into_iter()
.inspect(|i| debug!("Item: {:?}", i))
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.filter_map(|item| {
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, "serde_dynamodb_from_hashmap")
})
})
.collect()
});
if notifs.is_empty() {
Expand All @@ -89,10 +94,13 @@ pub fn fetch_messages(
// the first DynamoDbNotification and remove it from the vec.
let timestamp = notifs.remove(0).current_timestamp;
// Convert any remaining DynamoDbNotifications to Notification's
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
let messages = notifs
.into_iter()
.filter_map(|ddb_notif| ddb_notif.into_notif().ok())
.filter_map(|ddb_notif| {
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, "into_notif")
})
})
.collect();
Ok(FetchMessageResponse {
timestamp,
Expand All @@ -103,6 +111,7 @@ pub fn fetch_messages(

pub fn fetch_timestamp_messages(
ddb: Rc<Box<DynamoDb>>,
metrics: &Rc<StatsdClient>,
table_name: &str,
uaid: &Uuid,
timestamp: Option<u64>,
Expand All @@ -126,17 +135,25 @@ pub fn fetch_timestamp_messages(
..Default::default()
};

let metrics = Rc::clone(metrics);
let cond = |err: &QueryError| matches!(err, &QueryError::ProvisionedThroughputExceeded(_));
retry_if(move || ddb.query(&input), cond)
.chain_err(|| "Error fetching messages")
.and_then(|output| {
.and_then(move |output| {
let messages = output.items.map_or_else(Vec::new, |items| {
debug!("Got response of: {:?}", items);
// TODO: Capture translation errors and report them as we shouldn't have corrupt data
items
.into_iter()
.filter_map(|item| serde_dynamodb::from_hashmap(item).ok())
.filter_map(|ddb_notif: DynamoDbNotification| ddb_notif.into_notif().ok())
.filter_map(|item| {
ok_or_inspect(serde_dynamodb::from_hashmap(item), |e| {
conversion_err(&metrics, e, "serde_dynamodb_from_hashmap")
})
})
.filter_map(|ddb_notif: DynamoDbNotification| {
ok_or_inspect(ddb_notif.into_notif(), |e| {
conversion_err(&metrics, e, "into_notif")
})
})
.collect()
});
let timestamp = messages.iter().filter_map(|m| m.sortkey_timestamp).max();
Expand Down Expand Up @@ -334,13 +351,13 @@ pub fn unregister_channel_id(

pub fn lookup_user(
ddb: Rc<Box<DynamoDb>>,
metrics: &Rc<StatsdClient>,
uaid: &Uuid,
connected_at: &u64,
router_url: &str,
router_table_name: &str,
message_table_names: &[String],
current_message_month: &str,
metrics: &StatsdClient,
) -> MyFuture<(HelloResponse, Option<DynamoDbUser>)> {
let response = get_uaid(ddb.clone(), uaid, router_table_name);
// Prep all these for the move into the static closure capture
Expand All @@ -350,7 +367,7 @@ pub fn lookup_user(
let messages_tables = message_table_names.to_vec();
let connected_at = *connected_at;
let router_url = router_url.to_string();
let metrics = metrics.clone();
let metrics = Rc::clone(metrics);
let response = response.and_then(move |data| -> MyFuture<_> {
let mut hello_response = HelloResponse {
message_month: cur_month.clone(),
Expand Down Expand Up @@ -417,3 +434,30 @@ fn handle_user_result(
user.connected_at = connected_at;
Ok(user)
}

/// Like Result::ok, convert from Result<T, E> to Option<T> but applying a
/// function to the Err value
fn ok_or_inspect<T, E, F>(result: StdResult<T, E>, op: F) -> Option<T>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

almost tempted to make this into a whole trait extension, time will tell how useful it is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, right now you're using it just to contain a call to conversion_err. (I almost wonder if it might just be better to combine them into one call to reduce some of the templating, but that's a super minor nit.)

It'll be interesting to see how these play out in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it that way to start with then ended up here. Already had in mind I wanted an Option::ok_or_else like call for Result (but it doesn't have one because it would be dropping Err in that case).

It had "ua.notification_read.error" either embedded inside of it or that also needed to be an argument, so went for the generic combinator I really wanted

where
F: FnOnce(E),
{
match result {
Ok(t) => Some(t),
Err(e) => {
op(e);
None
}
}
}

/// Log/metric errors during conversions to Notification
fn conversion_err<E>(metrics: &StatsdClient, err: E, name: &'static str)
where
E: Display,
{
error!("Failed {} conversion: {}", name, err);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this as debug! but figured let's error! for now so it's seen on prod

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

metrics
.incr_with_tags("ua.notification_read.error")
.with_tag("conversion", name)
.send();
}
Loading