Skip to content

Commit

Permalink
deployment_id in spans, LogRecorder for Batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaro00 committed Sep 1, 2023
1 parent 88499f0 commit b06eaf8
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 39 deletions.
6 changes: 3 additions & 3 deletions common/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub struct DeploymentLogLayer<R>
where
R: LogRecorder + Send + Sync,
{
pub recorder: R,
pub log_recorder: R,
pub internal_service: Backend,
}

Expand All @@ -170,7 +170,7 @@ where
let extensions = span.extensions();

if let Some(details) = extensions.get::<ScopeDetails>() {
self.recorder.record(LogItem::new(
self.log_recorder.record(LogItem::new(
details.deployment_id,
self.internal_service.clone(),
format_event(event),
Expand Down Expand Up @@ -210,7 +210,7 @@ where
metadata.name().blue(),
);

self.recorder.record(LogItem::new(
self.log_recorder.record(LogItem::new(
details.deployment_id,
self.internal_service.clone(),
message,
Expand Down
2 changes: 1 addition & 1 deletion deployer/src/deployment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl DeploymentManager {
self.queue_send.send(queued).await.unwrap();
}

#[instrument(skip(self), fields(id = %built.id, state = %State::Built))]
#[instrument(skip(self), fields(deployment_id = %built.id, state = %State::Built))]
pub async fn run_push(&self, built: Built) {
self.run_send.send(built).await.unwrap();
}
Expand Down
6 changes: 3 additions & 3 deletions deployer/src/deployment/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub async fn task(
}
}

#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))]
#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))]
fn build_failed(_id: &Uuid, error: impl std::error::Error + 'static) {
error!(
error = &error as &dyn std::error::Error,
Expand Down Expand Up @@ -148,7 +148,7 @@ async fn remove_from_queue(queue_client: impl BuildQueueClient, id: Uuid) {
}
}

#[instrument(skip(run_send), fields(id = %built.id, state = %State::Built))]
#[instrument(skip(run_send), fields(deployment_id = %built.id, state = %State::Built))]
async fn promote_to_run(mut built: Built, run_send: RunSender) {
let cx = Span::current().context();

Expand All @@ -173,7 +173,7 @@ pub struct Queued {
}

impl Queued {
#[instrument(skip(self, storage_manager, deployment_updater, log_recorder, secret_recorder), fields(id = %self.id, state = %State::Building))]
#[instrument(skip(self, storage_manager, deployment_updater, log_recorder, secret_recorder), fields(deployment_id = %self.id, state = %State::Building))]
async fn handle(
self,
storage_manager: ArtifactsStorageManager,
Expand Down
10 changes: 5 additions & 5 deletions deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,25 @@ async fn kill_old_deployments(
Ok(())
}

#[instrument(skip(_id), fields(id = %_id, state = %State::Completed))]
#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Completed))]
fn completed_cleanup(_id: &Uuid) {
info!("service finished all on its own");
}

#[instrument(skip(_id), fields(id = %_id, state = %State::Stopped))]
#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Stopped))]
fn stopped_cleanup(_id: &Uuid) {
info!("service was stopped by the user");
}

#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))]
#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))]
fn crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) {
error!(
error = &error as &dyn std::error::Error,
"service encountered an error"
);
}

#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))]
#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))]
fn start_crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) {
error!(
error = &error as &dyn std::error::Error,
Expand Down Expand Up @@ -215,7 +215,7 @@ pub struct Built {
}

impl Built {
#[instrument(skip(self, storage_manager, secret_getter, resource_manager, runtime_manager, deployment_updater, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))]
#[instrument(skip(self, storage_manager, secret_getter, resource_manager, runtime_manager, deployment_updater, kill_old_deployments, cleanup), fields(deployment_id = %self.id, state = %State::Loading))]
#[allow(clippy::too_many_arguments)]
async fn handle(
self,
Expand Down
17 changes: 9 additions & 8 deletions deployer/src/deployment/state_change_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! This is very similar to Aspect Oriented Programming where we use the annotations from the function to trigger the recording of a new state.
//! This annotation is a [#[instrument]](https://docs.rs/tracing-attributes/latest/tracing_attributes/attr.instrument.html) with an `id` and `state` field as follow:
//! ```no-test
//! #[instrument(fields(id = %built.id, state = %State::Built))]
//! #[instrument(fields(deployment_id = %built.id, state = %State::Built))]
//! pub async fn new_state_fn(built: Built) {
//! // Get built ready for starting
//! }
Expand All @@ -20,6 +20,7 @@
use std::str::FromStr;

use chrono::Utc;
use shuttle_proto::logger::{Batcher, VecReceiver};
use tracing::{field::Visit, span, warn, Metadata, Subscriber};
use tracing_subscriber::Layer;
use uuid::Uuid;
Expand Down Expand Up @@ -63,19 +64,19 @@ where
let mut visitor = NewStateVisitor::default();
attrs.record(&mut visitor);

if visitor.id.is_nil() {
if visitor.deployment_id.is_nil() {
warn!("scope details does not have a valid id");
return;
}

// To deployer persistence
self.state_recorder.record_state(DeploymentState {
id: visitor.id,
id: visitor.deployment_id,
state: visitor.state,
});
// To logger
self.log_recorder.record(LogItem::new(
visitor.id,
visitor.deployment_id,
Backend::Deployer,
format!(
"{} {}",
Expand All @@ -86,16 +87,16 @@ where
}
}

/// To extract `id` and `state` fields for scopes that have them
/// To extract `deployment_id` and `state` fields for scopes that have them
#[derive(Default)]
struct NewStateVisitor {
id: Uuid,
deployment_id: Uuid,
state: State,
}

impl NewStateVisitor {
/// Field containing the deployment identifier
const ID_IDENT: &'static str = "id";
const ID_IDENT: &'static str = "deployment_id";

/// Field containing the deployment state identifier
const STATE_IDENT: &'static str = "state";
Expand All @@ -112,7 +113,7 @@ impl Visit for NewStateVisitor {
if field.name() == Self::STATE_IDENT {
self.state = State::from_str(&format!("{value:?}")).unwrap_or_default();
} else if field.name() == Self::ID_IDENT {
self.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default();
self.deployment_id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default();
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
};
use shuttle_common::log::LogRecorder;
use shuttle_proto::logger::logger_client::LoggerClient;
use tokio::sync::Mutex;
use tracing::{error, info};
Expand All @@ -28,6 +29,7 @@ pub use crate::runtime_manager::RuntimeManager;
pub async fn start(
persistence: Persistence,
runtime_manager: Arc<Mutex<RuntimeManager>>,
log_recorder: impl LogRecorder,
log_fetcher: LoggerClient<
shuttle_common::claims::ClaimService<
shuttle_common::claims::InjectPropagation<tonic::transport::Channel>,
Expand All @@ -37,7 +39,7 @@ pub async fn start(
) {
// when _set is dropped once axum exits, the deployment tasks will be aborted.
let deployment_manager = DeploymentManager::builder()
.build_log_recorder(log_fetcher.clone())
.build_log_recorder(log_recorder)
.secret_recorder(persistence.clone())
.active_deployment_getter(persistence.clone())
.artifacts_path(args.artifacts_path)
Expand Down
11 changes: 6 additions & 5 deletions deployer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use shuttle_common::{
log::{Backend, DeploymentLogLayer},
};
use shuttle_deployer::{start, start_proxy, Args, Persistence, RuntimeManager, StateChangeLayer};
use shuttle_proto::logger::logger_client::LoggerClient;
use shuttle_proto::logger::{logger_client::LoggerClient, Batcher};
use tokio::select;
use tower::ServiceBuilder;
use tracing::{error, trace};
Expand Down Expand Up @@ -40,16 +40,17 @@ async fn main() {
.expect("failed to connect to logger"),
);
let logger_client = LoggerClient::new(channel);
let logger_batcher = Batcher::wrap(logger_client.clone());

setup_tracing(
tracing_subscriber::registry()
.with(StateChangeLayer {
log_recorder: logger_client.clone(),
log_recorder: logger_batcher.clone(),
state_recorder: persistence.clone(),
})
// TODO: Make all relevant backends set this up in this way
.with(DeploymentLogLayer {
recorder: logger_client.clone(),
log_recorder: logger_batcher.clone(),
internal_service: Backend::Deployer,
}),
Backend::Deployer,
Expand All @@ -59,15 +60,15 @@ async fn main() {
args.artifacts_path.clone(),
args.provisioner_address.uri().to_string(),
args.logger_uri.uri().to_string(),
logger_client.clone(),
logger_batcher.clone(),
Some(args.auth_uri.to_string()),
);

select! {
_ = start_proxy(args.proxy_address, args.proxy_fqdn.clone(), persistence.clone()) => {
error!("Proxy stopped.")
},
_ = start(persistence, runtime_manager, logger_client, args) => {
_ = start(persistence, runtime_manager, logger_batcher, logger_client, args) => {
error!("Deployment service stopped.")
},
}
Expand Down
2 changes: 1 addition & 1 deletion examples
Submodule examples updated 39 files
+2 −0 README.md
+2 −2 actix-web/hello-world/Cargo.toml
+3 −3 actix-web/postgres/Cargo.toml
+3 −3 actix-web/websocket-actorless/Cargo.toml
+2 −2 axum/hello-world/Cargo.toml
+3 −3 axum/static-files/Cargo.toml
+3 −3 axum/static-next-server/Cargo.toml
+3 −3 axum/turso/Cargo.toml
+3 −3 axum/websocket/Cargo.toml
+2 −2 axum/with-state/Cargo.toml
+3 −3 custom-resource/pdo/Cargo.toml
+1 −1 custom-service/none/Cargo.toml
+2 −2 custom-service/request-scheduler/Cargo.toml
+5 −5 fullstack-templates/saas/backend/Cargo.toml
+1 −1 next/hello-world/Cargo.toml
+3 −3 other/standalone-binary/Cargo.toml
+2 −2 poem/hello-world/Cargo.toml
+3 −3 poem/mongodb/Cargo.toml
+3 −3 poem/postgres/Cargo.toml
+3 −3 poise/hello-world/Cargo.toml
+2 −2 rocket/authentication/Cargo.toml
+3 −3 rocket/dyn-templates/Cargo.toml
+2 −2 rocket/hello-world/Cargo.toml
+3 −3 rocket/persist/Cargo.toml
+3 −3 rocket/postgres/Cargo.toml
+3 −3 rocket/secrets/Cargo.toml
+3 −3 rocket/url-shortener/Cargo.toml
+2 −2 rocket/workspace/hello-world/Cargo.toml
+2 −2 salvo/hello-world/Cargo.toml
+3 −3 serenity/hello-world/Cargo.toml
+4 −4 serenity/postgres/Cargo.toml
+2 −2 thruster/hello-world/Cargo.toml
+3 −3 thruster/postgres/Cargo.toml
+2 −2 tide/hello-world/Cargo.toml
+3 −3 tide/postgres/Cargo.toml
+2 −2 tower/hello-world/Cargo.toml
+2 −2 tracing/axum-logs-endpoint/Cargo.toml
+1 −1 tracing/custom-layer/Cargo.toml
+2 −2 warp/hello-world/Cargo.toml
18 changes: 6 additions & 12 deletions proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ pub mod logger {
use std::time::Duration;

use chrono::{DateTime, NaiveDateTime, Utc};
use shuttle_common::log::{LogItem as LogItemCommon, LogRecorder};

use prost::bytes::Bytes;
use tokio::{select, sync::mpsc, time::interval};
use tonic::{
Expand All @@ -247,6 +245,8 @@ pub mod logger {
};
use tracing::error;

use shuttle_common::log::{LogItem as LogItemCommon, LogRecorder};

use self::logger_client::LoggerClient;

include!("generated/logger.rs");
Expand Down Expand Up @@ -276,18 +276,12 @@ pub mod logger {
}
}

impl LogRecorder
for logger_client::LoggerClient<
shuttle_common::claims::ClaimService<
shuttle_common::claims::InjectPropagation<tonic::transport::Channel>,
>,
>
impl<I> LogRecorder for Batcher<I>
where
I: VecReceiver<Item = LogItemCommon> + Clone + 'static,
{
fn record(&self, log: LogItemCommon) {
// TODO: Make async + error handling?
// self.send_logs(request)
// .await
// .expect("Failed to sens log line");
self.send(log);
}
}

Expand Down

0 comments on commit b06eaf8

Please sign in to comment.