Skip to content

Commit

Permalink
feat(deployer): StateChangeLayer, DeploymentLogLayer, new log item st…
Browse files Browse the repository at this point in the history
…ructure (#1171)

* refactor(deployer): new log structure

* WIP revert state tracking

* mostly finish DeploymentIdLayer and StateChangeLayer

* fix: comments

Co-authored-by: Pieter <pieter@chesedo.me>

* format log lines. LogItem::new.

* deployment_id in spans, LogRecorder for Batcher

* Convertions, log retrieval.

* derive default

* add matching for deployment log end states

* fmt errors correctly

---------

Co-authored-by: Pieter <pieter@chesedo.me>
  • Loading branch information
jonaro00 and chesedo authored Sep 6, 2023
1 parent bd5c9ff commit 7ab8d11
Show file tree
Hide file tree
Showing 24 changed files with 670 additions and 865 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,8 @@ up: $(DOCKER_COMPOSE_FILES)
$(SHUTTLE_DETACH)

down: $(DOCKER_COMPOSE_FILES)
$(DOCKER_COMPOSE_ENV) $(DOCKER_COMPOSE) $(addprefix -f ,$(DOCKER_COMPOSE_FILES)) -p $(STACK) down
$(DOCKER_COMPOSE_ENV) \
$(DOCKER_COMPOSE) \
$(addprefix -f ,$(DOCKER_COMPOSE_FILES)) \
-p $(STACK) \
down
4 changes: 2 additions & 2 deletions auth/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;

use clap::Parser;
use shuttle_common::backends::tracing::setup_tracing;
use shuttle_common::{backends::tracing::setup_tracing, log::Backend};
use sqlx::migrate::Migrator;
use tracing::{info, trace};

Expand All @@ -15,7 +15,7 @@ async fn main() -> io::Result<()> {

trace!(args = ?args, "parsed args");

setup_tracing(tracing_subscriber::registry(), "auth");
setup_tracing(tracing_subscriber::registry(), Backend::Auth);

let db_path = args.state.join("authentication.sqlite");

Expand Down
83 changes: 42 additions & 41 deletions cargo-shuttle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::process::exit;
use std::str::FromStr;

use logger_server::LocalLogger;
use shuttle_common::deployment::{DEPLOYER_END_MESSAGES_BAD, DEPLOYER_END_MESSAGES_GOOD};
use shuttle_common::models::deployment::CREATE_SERVICE_BODY_LIMIT;
use shuttle_common::{
claims::{ClaimService, InjectPropagation},
Expand Down Expand Up @@ -1140,8 +1141,11 @@ impl Shuttle {
deployment_req.data = self.make_archive()?;
if deployment_req.data.len() > CREATE_SERVICE_BODY_LIMIT {
bail!(
"The project is too large - we have a {}MB project limit.",
CREATE_SERVICE_BODY_LIMIT / 1_000_000
r#"The project is too large - we have a {} MB project limit. \
Your project archive is {} MB. \
Run with `RUST_LOG="cargo_shuttle=debug"` to see which files are being packed."#,
CREATE_SERVICE_BODY_LIMIT / 1_000_000,
deployment_req.data.len() / 1_000_000,
);
}

Expand All @@ -1160,33 +1164,31 @@ impl Shuttle {
let log_item: shuttle_common::LogItem =
serde_json::from_str(&line).expect("to parse log line");

match log_item.state.clone() {
shuttle_common::deployment::State::Queued
| shuttle_common::deployment::State::Building
| shuttle_common::deployment::State::Built
| shuttle_common::deployment::State::Loading => {
println!("{log_item}");
}
shuttle_common::deployment::State::Crashed => {
println!();
println!("{}", "Deployment crashed".red());
println!();
println!("Run the following for more details");
println!();
print!("cargo shuttle logs {}", &deployment.id);
println!();

return Ok(CommandOutcome::DeploymentFailure);
}
// Break on remaining end states: Running, Stopped, Completed or Unknown.
end_state => {
debug!(state = %end_state, "received end state, breaking deployment stream");
break;
}
};
println!("{log_item}");

if DEPLOYER_END_MESSAGES_BAD
.iter()
.any(|m| log_item.line.contains(m))
{
println!();
println!("{}", "Deployment crashed".red());
println!();
println!("Run the following for more details");
println!();
println!("cargo shuttle logs {}", &deployment.id);

return Ok(CommandOutcome::DeploymentFailure);
}
if DEPLOYER_END_MESSAGES_GOOD
.iter()
.any(|m| log_item.line.contains(m))
{
debug!("received end message, breaking deployment stream");
break;
}
}
} else {
println!("Reconnecting websockets logging");
eprintln!("--- Reconnecting websockets logging ---");
// A wait time short enough for not much state to have changed, long enough that
// the terminal isn't completely spammed
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Expand All @@ -1198,26 +1200,15 @@ impl Shuttle {

// Temporary fix.
// TODO: Make get_service_summary endpoint wait for a bit and see if it entered Running/Crashed state.
// Note: Will otherwise be possible when health checks are supported
tokio::time::sleep(std::time::Duration::from_millis(500)).await;

let deployment = client
.get_deployment_details(self.ctx.project_name(), &deployment.id)
.await?;

// A deployment will only exist if there is currently one in the running state
if deployment.state == shuttle_common::deployment::State::Running {
let service = client.get_service(self.ctx.project_name()).await?;

let resources = client
.get_service_resources(self.ctx.project_name())
.await?;

let resources = get_resources_table(&resources, self.ctx.project_name().as_str());

println!("{resources}{service}");

Ok(CommandOutcome::Ok)
} else {
if deployment.state != shuttle_common::deployment::State::Running {
println!("{}", "Deployment has not entered the running state".red());
println!();

Expand Down Expand Up @@ -1251,8 +1242,18 @@ impl Shuttle {
println!("cargo shuttle logs {}", &deployment.id);
println!();

Ok(CommandOutcome::DeploymentFailure)
return Ok(CommandOutcome::DeploymentFailure);
}

let service = client.get_service(self.ctx.project_name()).await?;
let resources = client
.get_service_resources(self.ctx.project_name())
.await?;
let resources = get_resources_table(&resources, self.ctx.project_name().as_str());

println!("{resources}{service}");

Ok(CommandOutcome::Ok)
}

async fn project_create(&self, client: &Client, idle_minutes: u64) -> Result<()> {
Expand Down
12 changes: 8 additions & 4 deletions common/src/backends/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use tracing::{debug_span, instrument::Instrumented, Instrument, Span, Subscriber
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::{fmt, prelude::*, registry::LookupSpan, EnvFilter};

use crate::log::Backend;

const OTLP_ADDRESS: &str = "http://otel-collector:4317";

pub fn setup_tracing<S>(subscriber: S, service_name: &str)
pub fn setup_tracing<S>(subscriber: S, backend: Backend)
where
S: Subscriber + for<'a> LookupSpan<'a> + Send + Sync,
{
Expand All @@ -46,7 +48,7 @@ where
.with_trace_config(
trace::config().with_resource(Resource::new(vec![KeyValue::new(
"service.name",
service_name.to_string(),
backend.to_string(),
)])),
)
.install_batch(Tokio)
Expand Down Expand Up @@ -196,15 +198,17 @@ pub fn serde_json_map_to_key_value_list(
/// Convert an [AnyValue] to a [serde_json::Value]
pub fn from_any_value_to_serde_json_value(any_value: AnyValue) -> serde_json::Value {
let Some(value) = any_value.value else {
return serde_json::Value::Null
return serde_json::Value::Null;
};

match value {
any_value::Value::StringValue(s) => serde_json::Value::String(s),
any_value::Value::BoolValue(b) => serde_json::Value::Bool(b),
any_value::Value::IntValue(i) => serde_json::Value::Number(i.into()),
any_value::Value::DoubleValue(f) => {
let Some(number) = serde_json::Number::from_f64(f) else {return serde_json::Value::Null};
let Some(number) = serde_json::Number::from_f64(f) else {
return serde_json::Value::Null;
};
serde_json::Value::Number(number)
}
any_value::Value::ArrayValue(a) => {
Expand Down
10 changes: 10 additions & 0 deletions common/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ pub enum Environment {
Production,
}

pub const DEPLOYER_END_MSG_STARTUP_ERR: &str = "Service startup encountered an error";
pub const DEPLOYER_END_MSG_CRASHED: &str = "Service encountered an error and crashed";
pub const DEPLOYER_END_MSG_STOPPED: &str = "Service was stopped by the user";
pub const DEPLOYER_END_MSG_COMPLETED: &str = "Service finished running all on its own";

pub const DEPLOYER_END_MESSAGES_BAD: &[&str] =
&[DEPLOYER_END_MSG_STARTUP_ERR, DEPLOYER_END_MSG_CRASHED];
pub const DEPLOYER_END_MESSAGES_GOOD: &[&str] =
&[DEPLOYER_END_MSG_STOPPED, DEPLOYER_END_MSG_COMPLETED];

#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand Down
18 changes: 8 additions & 10 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ pub mod database;
#[cfg(feature = "service")]
pub mod deployment;
#[cfg(feature = "service")]
use uuid::Uuid;
#[cfg(feature = "service")]
pub type DeploymentId = Uuid;
#[cfg(feature = "service")]
pub mod log;
#[cfg(feature = "service")]
pub use log::LogItem;
#[cfg(feature = "models")]
pub mod models;
#[cfg(feature = "service")]
Expand All @@ -22,17 +28,11 @@ pub mod wasm;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::fmt::Display;
#[cfg(feature = "openapi")]
use utoipa::openapi::{Object, ObjectBuilder};

use anyhow::bail;
#[cfg(feature = "service")]
pub use log::Item as LogItem;
#[cfg(feature = "service")]
pub use log::STATE_MESSAGE;
use serde::{Deserialize, Serialize};
#[cfg(feature = "service")]
use uuid::Uuid;
#[cfg(feature = "openapi")]
use utoipa::openapi::{Object, ObjectBuilder};

#[cfg(debug_assertions)]
pub const API_URL_DEFAULT: &str = "http://localhost:8001";
Expand All @@ -42,8 +42,6 @@ pub const API_URL_DEFAULT: &str = "https://api.shuttle.rs";

pub type ApiUrl = String;
pub type Host = String;
#[cfg(feature = "service")]
pub type DeploymentId = Uuid;

#[derive(Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "persist", derive(sqlx::Type, PartialEq, Hash, Eq))]
Expand Down
Loading

0 comments on commit 7ab8d11

Please sign in to comment.