Skip to content

Commit

Permalink
Emit scheduled event in connect transaction (#761)
Browse files Browse the repository at this point in the history
This refactors the code that:
- updates the `backend_state` table
- emits the `BackendState` event

Into one helper function that takes a transaction to execute in. This
code path is then used both when updating state on the drone, and when
creating the initial `Scheduled` state on the server.

The `last_status_time`, `backend_state.created_at`, and the event's
timestamp will all be the time of the transaction start.
  • Loading branch information
paulgb authored Jun 25, 2024
1 parent 6fe95a8 commit 010e1f4
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 77 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

42 changes: 28 additions & 14 deletions plane/src/database/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
use chrono::{DateTime, Utc};
use futures_util::Stream;
use serde::{Deserialize, Serialize};
use sqlx::PgConnection;
use std::{fmt::Debug, net::SocketAddr, str::FromStr};
use valuable::Valuable;

Expand Down Expand Up @@ -228,18 +229,6 @@ impl<'a> BackendDatabase<'a> {
return Ok(false);
}

sqlx::query!(
r#"
insert into backend_state (backend_id, state)
values ($1, $2)
"#,
backend.to_string(),
serde_json::to_value(&new_state)
.expect("BackendState should always be JSON-serializable."),
)
.execute(&mut *txn)
.await?;

// If the backend is terminated, we can delete its associated key.
if matches!(new_state, BackendState::Terminated { .. }) {
sqlx::query!(
Expand All @@ -253,7 +242,7 @@ impl<'a> BackendDatabase<'a> {
.await?;
}

emit_with_key(&mut *txn, &backend.to_string(), &new_state).await?;
emit_state_change(&mut txn, backend, &new_state).await?;

txn.commit().await?;

Expand Down Expand Up @@ -435,7 +424,10 @@ impl<'a> BackendDatabase<'a> {
}

pub async fn publish_metrics(&self, metrics: BackendMetricsMessage) -> sqlx::Result<()> {
emit_ephemeral_with_key(&self.db.pool, &metrics.backend_id.to_string(), &metrics).await
let mut txn = self.db.pool.begin().await?;
emit_ephemeral_with_key(&mut txn, &metrics.backend_id.to_string(), &metrics).await?;
txn.commit().await?;
Ok(())
}

pub async fn termination_candidates(
Expand Down Expand Up @@ -585,3 +577,25 @@ impl BackendRow {
self.as_of - self.last_status_time
}
}

/// Update the backend_state table, without updating the backend table.
pub async fn emit_state_change(
txn: &mut PgConnection,
backend: &BackendName,
new_state: &BackendState,
) -> sqlx::Result<()> {
sqlx::query!(
r#"
insert into backend_state (backend_id, state)
values ($1, $2)
"#,
backend.to_string(),
serde_json::to_value(&new_state).expect("BackendState should always be JSON-serializable."),
)
.execute(&mut *txn)
.await?;

emit_with_key(txn, &backend.to_string(), new_state).await?;

Ok(())
}
16 changes: 4 additions & 12 deletions plane/src/database/connect.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{
backend::emit_state_change,
backend_actions::create_pending_action,
backend_key::{KEY_LEASE_RENEW_AFTER, KEY_LEASE_SOFT_TERMINATE_AFTER},
drone::DroneForSpawn,
Expand Down Expand Up @@ -102,7 +103,7 @@ async fn create_backend_with_key(
let mut txn = pool.begin().await?;

let initial_status = BackendStatus::Scheduled;
let initial_state_json = serde_json::to_value(&BackendState::Scheduled).expect("valid json");
let initial_state = BackendState::Scheduled;

let result = sqlx::query!(
r#"
Expand Down Expand Up @@ -143,7 +144,7 @@ async fn create_backend_with_key(
key.namespace,
key.tag,
PgInterval::try_from(KEY_LEASE_EXPIRATION).expect("valid constant interval"),
initial_state_json,
serde_json::to_value(&initial_state).expect("state is always serializable"),
static_token.map(|t| t.to_string()),
spawn_config.subdomain.as_ref().map(|s| s.to_string()),
initial_status.as_int(),
Expand All @@ -161,16 +162,7 @@ async fn create_backend_with_key(
}
};

sqlx::query!(
r#"
insert into backend_state (backend_id, state, created_at)
values ($1, $2, now())
"#,
backend_id.to_string(),
initial_state_json
)
.execute(&mut *txn)
.await?;
emit_state_change(&mut txn, &backend_id, &initial_state).await?;

let acquired_key = AcquiredKey {
key: key.clone(),
Expand Down
62 changes: 26 additions & 36 deletions plane/src/database/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::util::ExponentialBackoff;
use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
use sqlx::{postgres::PgListener, PgExecutor, PgPool};
use sqlx::{postgres::PgListener, PgConnection, PgPool};
use std::{
any::Any,
collections::HashMap,
Expand Down Expand Up @@ -352,11 +352,11 @@ impl EventSubscriptionManager {
}
}

pub async fn emit_impl<'c, T, E>(db: E, key: Option<&str>, payload: &T) -> Result<(), sqlx::Error>
where
T: NotificationPayload,
E: PgExecutor<'c>,
{
pub async fn emit_impl<T: NotificationPayload>(
db: &mut PgConnection,
key: Option<&str>,
payload: &T,
) -> Result<(), sqlx::Error> {
let kind = T::kind().to_string();
sqlx::query!(
r#"
Expand All @@ -381,21 +381,17 @@ where
serde_json::to_value(&payload).map_sqlx_error()?,
EVENT_CHANNEL,
)
.execute(db)
.execute(&mut *db)
.await?;

Ok(())
}

pub async fn emit_ephemeral_impl<'c, T, E>(
db: E,
pub async fn emit_ephemeral_impl<T: NotificationPayload>(
db: &mut PgConnection,
key: Option<&str>,
payload: &T,
) -> Result<(), sqlx::Error>
where
T: NotificationPayload,
E: PgExecutor<'c>,
{
) -> Result<(), sqlx::Error> {
let kind = T::kind().to_string();
sqlx::query!(
r#"
Expand All @@ -419,38 +415,32 @@ where
Ok(())
}

pub async fn emit<'c, T, E>(db: E, payload: &T) -> Result<(), sqlx::Error>
where
T: NotificationPayload,
E: PgExecutor<'c>,
{
pub async fn emit<T: NotificationPayload>(
db: &mut PgConnection,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_impl(db, None, payload).await
}

pub async fn emit_with_key<'c, T, E>(db: E, key: &str, payload: &T) -> Result<(), sqlx::Error>
where
T: NotificationPayload,
E: PgExecutor<'c>,
{
pub async fn emit_with_key<T: NotificationPayload>(
db: &mut PgConnection,
key: &str,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_impl(db, Some(key), payload).await
}

pub async fn emit_ephemeral<'c, T, E>(db: E, payload: &T) -> Result<(), sqlx::Error>
where
T: NotificationPayload,
E: PgExecutor<'c>,
{
pub async fn emit_ephemeral<T: NotificationPayload>(
db: &mut PgConnection,
payload: &T,
) -> Result<(), sqlx::Error> {
emit_ephemeral_impl(db, None, payload).await
}

pub async fn emit_ephemeral_with_key<'c, T, E>(
db: E,
pub async fn emit_ephemeral_with_key<T: NotificationPayload>(
db: &mut PgConnection,
key: &str,
payload: &T,
) -> Result<(), sqlx::Error>
where
T: NotificationPayload,
E: PgExecutor<'c>,
{
) -> Result<(), sqlx::Error> {
emit_ephemeral_impl(db, Some(key), payload).await
}

0 comments on commit 010e1f4

Please sign in to comment.