Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
Deal with missing created_at
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Feb 5, 2024
1 parent b9c9fcd commit 8d20acf
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 15 deletions.
8 changes: 7 additions & 1 deletion hook-api/src/handlers/webhook.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Instant;

use axum::{extract::State, http::StatusCode, Json};
use chrono::Utc;
use hook_common::pgqueue::{NewJob, PgQueue};
use hook_common::webhook::{WebhookJobMetadata, WebhookJobParameters};
use serde::Serialize;
Expand Down Expand Up @@ -32,7 +33,7 @@ fn default_max_attempts() -> u32 {

pub async fn post(
State(pg_queue): State<PgQueue>,
Json(payload): Json<WebhookPostRequestBody>,
Json(mut payload): Json<WebhookPostRequestBody>,
) -> Result<Json<WebhookPostResponse>, (StatusCode, Json<WebhookPostResponse>)> {
debug!("received payload: {:?}", payload);

Expand All @@ -55,6 +56,11 @@ pub async fn post(
}),
)
})?;

if payload.metadata.created_at.is_none() {
payload.metadata.created_at = Some(Utc::now());
}

let job = NewJob::new(
max_attempts,
payload.metadata,
Expand Down
49 changes: 42 additions & 7 deletions hook-common/src/kafka_messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,53 @@ where
serializer.serialize_str(&datetime.format("%Y-%m-%d %H:%M:%S").to_string())
}

pub fn serialize_datetime_option<S>(
datetime: &Option<DateTime<Utc>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match datetime {
Some(datetime) => serialize_datetime(datetime, serializer),
None => serializer.serialize_none(),
}
}

pub fn deserialize_datetime<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
where
D: Deserializer<'de>,
{
let formatted: String = Deserialize::deserialize(deserializer)?;
let datetime = match DateTime::parse_from_rfc3339(&formatted) {
Ok(d) => d.with_timezone(&Utc),
Err(_) => match NaiveDateTime::parse_from_str(&formatted, "%Y-%m-%d %H:%M:%S") {
Ok(d) => d.and_utc(),
Err(_) => return Err(serde::de::Error::custom("Invalid datetime format")),
},
};
let datetime = DateTime::parse_from_rfc3339(&formatted)
.map(|d| d.with_timezone(&Utc))
.or_else(|_| {
NaiveDateTime::parse_from_str(&formatted, "%Y-%m-%d %H:%M:%S").map(|d| d.and_utc())
})
.map_err(|e| serde::de::Error::custom(format!("Invalid datetime format: {}", e)))?;

Ok(datetime)
}

pub fn deserialize_datetime_option<'de, D>(
deserializer: D,
) -> Result<Option<DateTime<Utc>>, D::Error>
where
D: Deserializer<'de>,
{
let opt = Option::<String>::deserialize(deserializer)?;
match opt {
Some(formatted) => {
let datetime = DateTime::parse_from_rfc3339(&formatted)
.map(|d| d.with_timezone(&Utc))
.or_else(|_| {
NaiveDateTime::parse_from_str(&formatted, "%Y-%m-%d %H:%M:%S")
.map(|d| d.and_utc())
})
.map_err(|e| serde::de::Error::custom(format!("Invalid datetime format: {}", e)))?;

Ok(Some(datetime))
}
None => Ok(None),
}
}
9 changes: 5 additions & 4 deletions hook-common/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use chrono::{DateTime, Utc};
use serde::{de::Visitor, Deserialize, Serialize};

use crate::kafka_messages::app_metrics;
use crate::kafka_messages::{deserialize_datetime, serialize_datetime};
use crate::kafka_messages::{deserialize_datetime_option, serialize_datetime_option};
use crate::pgqueue::PgQueueError;

/// Supported HTTP methods for webhooks.
Expand Down Expand Up @@ -138,10 +138,11 @@ pub struct WebhookJobMetadata {
pub plugin_id: i32,
pub plugin_config_id: i32,
#[serde(
serialize_with = "serialize_datetime",
deserialize_with = "deserialize_datetime"
serialize_with = "serialize_datetime_option",
deserialize_with = "deserialize_datetime_option",
skip_serializing_if = "Option::is_none"
)]
pub created_at: DateTime<Utc>,
pub created_at: Option<DateTime<Utc>>,
}

/// An error originating during a Webhook Job invocation.
Expand Down
8 changes: 5 additions & 3 deletions hook-worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,11 @@ async fn process_webhook_job<W: WebhookJob>(

match send_result {
Ok(_) => {
let end_to_end_duration = Utc::now() - webhook_job.metadata().created_at;
metrics::histogram!("webhook_jobs_end_to_end_duration_seconds", &labels)
.record((end_to_end_duration.num_milliseconds() as f64) / 1_000_f64);
if let Some(created_at) = webhook_job.metadata().created_at {
let end_to_end_duration = Utc::now() - created_at;
metrics::histogram!("webhook_jobs_end_to_end_duration_seconds", &labels)
.record((end_to_end_duration.num_milliseconds() as f64) / 1_000_f64);
}

webhook_job
.complete()
Expand Down

0 comments on commit 8d20acf

Please sign in to comment.