From 5d576a7f37cb481a86f76bd694cf2f11dd3f930d Mon Sep 17 00:00:00 2001 From: ecpullen Date: Fri, 12 Jan 2024 16:56:41 +0000 Subject: [PATCH] controller: Add cloudwatch logging for pods --- Cargo.lock | 27 ++++ cli/src/install.rs | 12 +- controller/Cargo.toml | 3 + controller/src/job/error.rs | 28 ++++ controller/src/job/mod.rs | 126 +++++++++++++++++- controller/src/resource_controller/context.rs | 11 +- controller/src/test_controller/context.rs | 6 +- model/src/system/controller.rs | 17 ++- model/src/system/mod.rs | 2 +- model/src/test_manager/install.rs | 3 +- model/src/test_manager/manager.rs | 4 +- 11 files changed, 223 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9799975..3b421380 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -315,6 +315,30 @@ dependencies = [ "tower", ] +[[package]] +name = "aws-sdk-cloudwatchlogs" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4f8dbe7e86cb0c5999cb5911e052ec1e6e3f4abbbcb1333a63538b4aecdaa9b" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes", + "http", + "regex", + "tokio-stream", + "tower", +] + [[package]] name = "aws-sdk-ec2" version = "0.24.0" @@ -948,6 +972,9 @@ name = "controller" version = "0.0.10" dependencies = [ "anyhow", + "aws-config", + "aws-sdk-cloudwatchlogs", + "aws-types", "env_logger", "futures", "http", diff --git a/cli/src/install.rs b/cli/src/install.rs index e51d8660..fad4778c 100644 --- a/cli/src/install.rs +++ b/cli/src/install.rs @@ -14,6 +14,9 @@ pub(crate) struct Install { // TODO - add default controller_uri after images are published. #[clap(long = "controller-uri")] controller_uri: String, + + #[clap(long = "archive-logs")] + archive_logs: bool, } impl Install { @@ -22,9 +25,12 @@ impl Install { (Some(secret), image) => ImageConfig::WithCreds { secret, image }, (None, image) => ImageConfig::Image(image), }; - client.install(controller_image).await.context( - "Unable to install testsys to the cluster. (Some artifacts may be left behind)", - )?; + client + .install(controller_image, self.archive_logs) + .await + .context( + "Unable to install testsys to the cluster. (Some artifacts may be left behind)", + )?; println!("testsys components were successfully installed."); diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 5c5f6a04..73bb0bfd 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -7,6 +7,9 @@ license = "MIT OR Apache-2.0" [dependencies] anyhow = "1" +aws-config = "0.54" +aws-types = "0.54" +aws-sdk-cloudwatchlogs = "0.24" env_logger = "0.10" futures = "0.3" http = "0" diff --git a/controller/src/job/error.rs b/controller/src/job/error.rs index 028346c7..2e70c389 100644 --- a/controller/src/job/error.rs +++ b/controller/src/job/error.rs @@ -13,15 +13,43 @@ pub(crate) enum JobError { #[snafu(display("Unable to create job: {}", source))] Create { source: kube::Error }, + #[snafu(display("Unable to create log event '{}': {:?}", log_event, source))] + CreateLogEvent { + log_event: String, + source: aws_sdk_cloudwatchlogs::types::SdkError< + aws_sdk_cloudwatchlogs::error::PutLogEventsError, + >, + }, + + #[snafu(display("Unable to create log group '{}': {}", log_group, message))] + CreateLogGroup { log_group: String, message: String }, + + #[snafu(display("Unable to create log stream '{}': {:?}", log_stream, source))] + CreateLogStream { + log_stream: String, + source: aws_sdk_cloudwatchlogs::types::SdkError< + aws_sdk_cloudwatchlogs::error::CreateLogStreamError, + >, + }, + #[snafu(display("Unable to delete job: {}", source))] Delete { source: kube::Error }, #[snafu(display("Unable to get job: {}", source))] Get { source: kube::Error }, + #[snafu(display("Unable to read logs for pod '{}': {}", pod, source))] + NoLogs { pod: String, source: kube::Error }, + + #[snafu(display("No pods found for job '{}'", job))] + NoPods { job: String }, + #[snafu(display("Job does not exist: {}", source))] NotFound { source: kube::Error }, + #[snafu(display("{}", source), context(false))] + SystemTime { source: std::time::SystemTimeError }, + #[snafu(display( "There should be only one container for job '{}' but found {} running, {} succeeded, and {} failed", job_name, diff --git a/controller/src/job/mod.rs b/controller/src/job/mod.rs index 6f233f5d..86e40463 100644 --- a/controller/src/job/mod.rs +++ b/controller/src/job/mod.rs @@ -2,14 +2,19 @@ mod error; mod job_builder; pub(crate) use crate::job::error::{JobError, JobResult}; +use aws_sdk_cloudwatchlogs::model::InputLogEvent; pub(crate) use job_builder::{JobBuilder, JobType}; use k8s_openapi::api::batch::v1::Job; +use k8s_openapi::api::core::v1::Pod; use k8s_openapi::chrono::{Duration, Utc}; -use kube::api::{DeleteParams, PropagationPolicy}; -use kube::Api; -use log::debug; -use snafu::ensure; +use kube::api::{DeleteParams, ListParams, LogParams, PropagationPolicy}; +use kube::{Api, ResourceExt}; +use log::{debug, info, warn}; +use snafu::{ensure, OptionExt, ResultExt}; +use std::env; +use std::time::{SystemTime, UNIX_EPOCH}; use testsys_model::constants::NAMESPACE; +use testsys_model::system::TESTSYS_CONTROLLER_ARCHIVE_LOGS; lazy_static::lazy_static! { /// The maximum amount of time for a test to begin running (in seconds). @@ -122,3 +127,116 @@ pub(crate) async fn delete_job(k8s_client: kube::Client, name: &str) -> JobResul let _ = result?; Ok(()) } + +async fn get_pod(k8s_client: kube::Client, job_name: &str) -> JobResult { + let pod_api: Api = Api::namespaced(k8s_client, NAMESPACE); + let name = pod_api + .list(&ListParams { + label_selector: Some(format!("job-name={}", job_name)), + ..Default::default() + }) + .await + .context(error::NotFoundSnafu {})? + .items + .first() + .context(error::NoPodsSnafu { + job: job_name.to_string(), + })? + .name_any(); + + Ok(name) +} + +async fn pod_logs(k8s_client: kube::Client, pod_name: &str) -> JobResult { + let log_params = LogParams { + follow: false, + pretty: true, + ..Default::default() + }; + let pod_api: Api = Api::namespaced(k8s_client, NAMESPACE); + + pod_api + .logs(pod_name, &log_params) + .await + .context(error::NoLogsSnafu { pod: pod_name }) +} + +pub(crate) async fn archive_logs(k8s_client: kube::Client, job_name: &str) -> JobResult<()> { + let archive_logs = match env::var(TESTSYS_CONTROLLER_ARCHIVE_LOGS) { + Ok(s) => s == true.to_string(), + Err(e) => { + warn!( + "Unable to read environment variable '{}': {}", + TESTSYS_CONTROLLER_ARCHIVE_LOGS, e + ); + false + } + }; + + if !archive_logs { + return Ok(()); + } + let config = aws_config::from_env().load().await; + let client = aws_sdk_cloudwatchlogs::Client::new(&config); + + match client + .create_log_group() + .log_group_name("testsys") + .send() + .await + { + Ok(_) => info!("Creating log group"), + Err(e) => { + let service_error = e.into_service_error(); + if service_error.is_resource_already_exists_exception() { + info!("Log group already exists.") + } + return Err(error::JobError::CreateLogGroup { + message: service_error.to_string(), + log_group: "testsys".to_string(), + }); + } + } + + let pod_name = get_pod(k8s_client.clone(), job_name).await?; + let logs = pod_logs(k8s_client, &pod_name).await?; + let name = format!( + "{}-{}", + job_name, + SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() + ); + + client + .create_log_stream() + .log_group_name("testsys") + .log_stream_name(&name) + .send() + .await + .context(error::CreateLogStreamSnafu { + log_stream: name.to_string(), + })?; + + client + .put_log_events() + .log_group_name("testsys") + .log_stream_name(&name) + .log_events( + InputLogEvent::builder() + .message(logs) + .timestamp( + SystemTime::now() + .duration_since(UNIX_EPOCH)? + .as_millis() + .try_into() + .unwrap_or_default(), + ) + .build(), + ) + .send() + .await + .context(error::CreateLogEventSnafu { log_event: &name })?; + + info!("Archive of '{job_name}' can be found at '{name}'"); + + Ok(()) +} diff --git a/controller/src/resource_controller/context.rs b/controller/src/resource_controller/context.rs index 3c175cf8..64d4b928 100644 --- a/controller/src/resource_controller/context.rs +++ b/controller/src/resource_controller/context.rs @@ -1,8 +1,8 @@ use crate::error::Result; -use crate::job::{delete_job, get_job_state, JobBuilder, JobState, JobType}; +use crate::job::{archive_logs, delete_job, get_job_state, JobBuilder, JobState, JobType}; use anyhow::Context as AnyhowContext; use kube::Api; -use log::debug; +use log::{debug, error}; use std::sync::Arc; use testsys_model::clients::{CrdClient, ResourceClient}; use testsys_model::constants::{ENV_RESOURCE_ACTION, ENV_RESOURCE_NAME}; @@ -102,6 +102,13 @@ impl ResourceInterface { } pub(super) async fn remove_job(&self, op: ResourceAction) -> Result<()> { + if let Err(e) = archive_logs(self.k8s_client(), self.job_name(op)).await { + error!( + "Unable to archive logs for job '{}': {}", + self.job_name(op), + e + ); + } delete_job(self.k8s_client(), self.job_name(op)) .await .context(format!("Unable to remove job '{}'", self.job_name(op)))?; diff --git a/controller/src/test_controller/context.rs b/controller/src/test_controller/context.rs index 0f592504..86d80d28 100644 --- a/controller/src/test_controller/context.rs +++ b/controller/src/test_controller/context.rs @@ -1,7 +1,8 @@ use crate::error::Result; -use crate::job::{delete_job, get_job_state, JobState}; +use crate::job::{archive_logs, delete_job, get_job_state, JobState}; use anyhow::Context as AnyhowContext; use kube::{Api, Client}; +use log::error; use std::sync::Arc; use testsys_model::clients::{CrdClient, TestClient}; use testsys_model::Test; @@ -77,6 +78,9 @@ impl TestInterface { } pub(super) async fn delete_job(&self) -> Result<()> { + if let Err(e) = archive_logs(self.k8s_client(), self.name()).await { + error!("Unable to archive logs for test '{}': {}", self.name(), e); + } delete_job(self.k8s_client(), self.name()) .await .with_context(|| format!("Unable to delete job for test '{}'", self.name())) diff --git a/model/src/system/controller.rs b/model/src/system/controller.rs index b84c135c..178b762c 100644 --- a/model/src/system/controller.rs +++ b/model/src/system/controller.rs @@ -5,8 +5,8 @@ use k8s_openapi::api::apps::v1::{ Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment, }; use k8s_openapi::api::core::v1::{ - Affinity, Container, LocalObjectReference, NodeAffinity, NodeSelector, NodeSelectorRequirement, - NodeSelectorTerm, PodSpec, PodTemplateSpec, ServiceAccount, + Affinity, Container, EnvVar, LocalObjectReference, NodeAffinity, NodeSelector, + NodeSelectorRequirement, NodeSelectorTerm, PodSpec, PodTemplateSpec, ServiceAccount, }; use k8s_openapi::api::rbac::v1::{ClusterRole, ClusterRoleBinding, PolicyRule, RoleRef, Subject}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; @@ -16,6 +16,7 @@ use maplit::btreemap; const TESTSYS_CONTROLLER_SERVICE_ACCOUNT: &str = "testsys-controller-service-account"; const TESTSYS_CONTROLLER_CLUSTER_ROLE: &str = "testsys-controller-role"; +pub const TESTSYS_CONTROLLER_ARCHIVE_LOGS: &str = "TESTSYS_CONTROLLER_ARCHIVE_LOGS"; /// Defines the testsys-controller service account pub fn controller_service_account() -> ServiceAccount { @@ -114,6 +115,12 @@ pub fn controller_cluster_role() -> ClusterRole { .collect(), ..Default::default() }, + PolicyRule { + api_groups: Some(vec!["".to_string()]), + resources: Some(vec!["pods".to_string(), "pods/log".to_string()]), + verbs: ["get", "list"].iter().map(|s| s.to_string()).collect(), + ..Default::default() + }, ]), ..Default::default() } @@ -145,6 +152,7 @@ pub fn controller_cluster_role_binding() -> ClusterRoleBinding { pub fn controller_deployment( controller_image: String, image_pull_secret: Option, + enable_logging: bool, ) -> Deployment { let image_pull_secrets = image_pull_secret.map(|secret| vec![LocalObjectReference { name: Some(secret) }]); @@ -214,6 +222,11 @@ pub fn controller_deployment( image: Some(controller_image), image_pull_policy: None, name: "controller".to_string(), + env: Some(vec![EnvVar { + name: TESTSYS_CONTROLLER_ARCHIVE_LOGS.to_string(), + value: Some(enable_logging.to_string()), + ..Default::default() + }]), ..Default::default() }], image_pull_secrets, diff --git a/model/src/system/mod.rs b/model/src/system/mod.rs index 131928e0..4fd9c0c4 100644 --- a/model/src/system/mod.rs +++ b/model/src/system/mod.rs @@ -6,6 +6,6 @@ mod namespace; pub use agent::{agent_cluster_role, agent_cluster_role_binding, agent_service_account, AgentType}; pub use controller::{ controller_cluster_role, controller_cluster_role_binding, controller_deployment, - controller_service_account, + controller_service_account, TESTSYS_CONTROLLER_ARCHIVE_LOGS, }; pub use namespace::testsys_namespace; diff --git a/model/src/test_manager/install.rs b/model/src/test_manager/install.rs index eca481a9..4f894510 100644 --- a/model/src/test_manager/install.rs +++ b/model/src/test_manager/install.rs @@ -119,8 +119,9 @@ impl TestManager { &self, uri: String, secret: Option, + enable_logging: bool, ) -> Result<()> { - let controller_deployment = controller_deployment(uri, secret); + let controller_deployment = controller_deployment(uri, secret, enable_logging); // If the controller deployment already exists, update it with the new one using Patch. If // not create a new controller deployment. diff --git a/model/src/test_manager/manager.rs b/model/src/test_manager/manager.rs index 1cce6dbb..0717a3ff 100644 --- a/model/src/test_manager/manager.rs +++ b/model/src/test_manager/manager.rs @@ -131,7 +131,7 @@ impl TestManager { } /// Install testsys to a cluster. - pub async fn install(&self, controller_config: ImageConfig) -> Result<()> { + pub async fn install(&self, controller_config: ImageConfig, store_logs: bool) -> Result<()> { self.create_namespace().await?; self.create_crd().await?; self.create_roles(AgentType::Test).await?; @@ -145,7 +145,7 @@ impl TestManager { ImageConfig::WithCreds { secret, image } => (image, Some(secret)), ImageConfig::Image(image) => (image, None), }; - self.create_deployment(image, secret).await?; + self.create_deployment(image, secret, store_logs).await?; Ok(()) }