Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: Add cloudwatch logging for pods #882

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions cli/src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub(crate) struct Install {
// TODO - add default controller_uri after images are published.
#[clap(long = "controller-uri")]
controller_uri: String,

#[clap(long = "archive-logs")]
archive_logs: bool,
}

impl Install {
Expand All @@ -22,9 +25,12 @@ impl Install {
(Some(secret), image) => ImageConfig::WithCreds { secret, image },
(None, image) => ImageConfig::Image(image),
};
client.install(controller_image).await.context(
"Unable to install testsys to the cluster. (Some artifacts may be left behind)",
)?;
client
.install(controller_image, self.archive_logs)
.await
.context(
"Unable to install testsys to the cluster. (Some artifacts may be left behind)",
)?;

println!("testsys components were successfully installed.");

Expand Down
3 changes: 3 additions & 0 deletions controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ license = "MIT OR Apache-2.0"

[dependencies]
anyhow = "1"
aws-config = "0.54"
aws-types = "0.54"
aws-sdk-cloudwatchlogs = "0.24"
env_logger = "0.10"
futures = "0.3"
http = "0"
Expand Down
28 changes: 28 additions & 0 deletions controller/src/job/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,43 @@ pub(crate) enum JobError {
#[snafu(display("Unable to create job: {}", source))]
Create { source: kube::Error },

#[snafu(display("Unable to create log event '{}': {:?}", log_event, source))]
CreateLogEvent {
log_event: String,
source: aws_sdk_cloudwatchlogs::types::SdkError<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, since we've run into it before, will this unwrap the error fully to something useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it will show the underlying error since it is using debug.

aws_sdk_cloudwatchlogs::error::PutLogEventsError,
>,
},

#[snafu(display("Unable to create log group '{}': {}", log_group, message))]
CreateLogGroup { log_group: String, message: String },

#[snafu(display("Unable to create log stream '{}': {:?}", log_stream, source))]
CreateLogStream {
log_stream: String,
source: aws_sdk_cloudwatchlogs::types::SdkError<
aws_sdk_cloudwatchlogs::error::CreateLogStreamError,
>,
},

#[snafu(display("Unable to delete job: {}", source))]
Delete { source: kube::Error },

#[snafu(display("Unable to get job: {}", source))]
Get { source: kube::Error },

#[snafu(display("Unable to read logs for pod '{}': {}", pod, source))]
NoLogs { pod: String, source: kube::Error },

#[snafu(display("No pods found for job '{}'", job))]
NoPods { job: String },

#[snafu(display("Job does not exist: {}", source))]
NotFound { source: kube::Error },

#[snafu(display("{}", source), context(false))]
SystemTime { source: std::time::SystemTimeError },

#[snafu(display(
"There should be only one container for job '{}' but found {} running, {} succeeded, and {} failed",
job_name,
Expand Down
126 changes: 122 additions & 4 deletions controller/src/job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ mod error;
mod job_builder;

pub(crate) use crate::job::error::{JobError, JobResult};
use aws_sdk_cloudwatchlogs::model::InputLogEvent;
pub(crate) use job_builder::{JobBuilder, JobType};
use k8s_openapi::api::batch::v1::Job;
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::chrono::{Duration, Utc};
use kube::api::{DeleteParams, PropagationPolicy};
use kube::Api;
use log::debug;
use snafu::ensure;
use kube::api::{DeleteParams, ListParams, LogParams, PropagationPolicy};
use kube::{Api, ResourceExt};
use log::{debug, info, warn};
use snafu::{ensure, OptionExt, ResultExt};
use std::env;
use std::time::{SystemTime, UNIX_EPOCH};
use testsys_model::constants::NAMESPACE;
use testsys_model::system::TESTSYS_CONTROLLER_ARCHIVE_LOGS;

lazy_static::lazy_static! {
/// The maximum amount of time for a test to begin running (in seconds).
Expand Down Expand Up @@ -122,3 +127,116 @@ pub(crate) async fn delete_job(k8s_client: kube::Client, name: &str) -> JobResul
let _ = result?;
Ok(())
}

async fn get_pod(k8s_client: kube::Client, job_name: &str) -> JobResult<String> {
let pod_api: Api<Pod> = Api::namespaced(k8s_client, NAMESPACE);
let name = pod_api
.list(&ListParams {
label_selector: Some(format!("job-name={}", job_name)),
..Default::default()
})
.await
.context(error::NotFoundSnafu {})?
.items
.first()
.context(error::NoPodsSnafu {
job: job_name.to_string(),
})?
.name_any();

Ok(name)
}

async fn pod_logs(k8s_client: kube::Client, pod_name: &str) -> JobResult<String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the testsys model already have a way to get logs? Why do we need a new, different way here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are accessing the logs from different interfaces. (ResourceInterface vs TestManager)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should backlog an issue to unify this so that log fetching updates do not require updating 2 places. Not necessary for this change though

let log_params = LogParams {
follow: false,
pretty: true,
..Default::default()
};
let pod_api: Api<Pod> = Api::namespaced(k8s_client, NAMESPACE);

pod_api
.logs(pod_name, &log_params)
.await
.context(error::NoLogsSnafu { pod: pod_name })
}

pub(crate) async fn archive_logs(k8s_client: kube::Client, job_name: &str) -> JobResult<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does this happen? Is it only after the test has completed or failed? Or can this be kicked off at the start of the pod and the logs stream in realtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It happens as a job is deleted. I wasn't able to find a solution for realtime logs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might not be something "inside" the system, but there are likely plenty of solutions for sending pod logs in realtime, so we should consider using those as well. Not going to block the PR though as we need to get something going for this.

let archive_logs = match env::var(TESTSYS_CONTROLLER_ARCHIVE_LOGS) {
Ok(s) => s == true.to_string(),
Err(e) => {
warn!(
"Unable to read environment variable '{}': {}",
TESTSYS_CONTROLLER_ARCHIVE_LOGS, e
);
false
}
};

if !archive_logs {
return Ok(());
}
let config = aws_config::from_env().load().await;
let client = aws_sdk_cloudwatchlogs::Client::new(&config);

match client
.create_log_group()
.log_group_name("testsys")
.send()
.await
{
Ok(_) => info!("Creating log group"),
Err(e) => {
let service_error = e.into_service_error();
if service_error.is_resource_already_exists_exception() {
info!("Log group already exists.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a little noisy?

Suggested change
info!("Log group already exists.")
debug!("testsys Cloudwatch Log group already exists.")

}
return Err(error::JobError::CreateLogGroup {
message: service_error.to_string(),
log_group: "testsys".to_string(),
});
}
}

let pod_name = get_pod(k8s_client.clone(), job_name).await?;
let logs = pod_logs(k8s_client, &pod_name).await?;
let name = format!(
"{}-{}",
job_name,
SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs()
);

client
.create_log_stream()
.log_group_name("testsys")
.log_stream_name(&name)
.send()
.await
.context(error::CreateLogStreamSnafu {
log_stream: name.to_string(),
})?;

client
.put_log_events()
.log_group_name("testsys")
.log_stream_name(&name)
.log_events(
InputLogEvent::builder()
.message(logs)
.timestamp(
SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_millis()
.try_into()
.unwrap_or_default(),
)
.build(),
)
.send()
.await
.context(error::CreateLogEventSnafu { log_event: &name })?;

info!("Archive of '{job_name}' can be found at '{name}'");

Ok(())
}
11 changes: 9 additions & 2 deletions controller/src/resource_controller/context.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::error::Result;
use crate::job::{delete_job, get_job_state, JobBuilder, JobState, JobType};
use crate::job::{archive_logs, delete_job, get_job_state, JobBuilder, JobState, JobType};
use anyhow::Context as AnyhowContext;
use kube::Api;
use log::debug;
use log::{debug, error};
use std::sync::Arc;
use testsys_model::clients::{CrdClient, ResourceClient};
use testsys_model::constants::{ENV_RESOURCE_ACTION, ENV_RESOURCE_NAME};
Expand Down Expand Up @@ -102,6 +102,13 @@ impl ResourceInterface {
}

pub(super) async fn remove_job(&self, op: ResourceAction) -> Result<()> {
if let Err(e) = archive_logs(self.k8s_client(), self.job_name(op)).await {
error!(
"Unable to archive logs for job '{}': {}",
self.job_name(op),
e
);
}
delete_job(self.k8s_client(), self.job_name(op))
.await
.context(format!("Unable to remove job '{}'", self.job_name(op)))?;
Expand Down
6 changes: 5 additions & 1 deletion controller/src/test_controller/context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::error::Result;
use crate::job::{delete_job, get_job_state, JobState};
use crate::job::{archive_logs, delete_job, get_job_state, JobState};
use anyhow::Context as AnyhowContext;
use kube::{Api, Client};
use log::error;
use std::sync::Arc;
use testsys_model::clients::{CrdClient, TestClient};
use testsys_model::Test;
Expand Down Expand Up @@ -77,6 +78,9 @@ impl TestInterface {
}

pub(super) async fn delete_job(&self) -> Result<()> {
if let Err(e) = archive_logs(self.k8s_client(), self.name()).await {
error!("Unable to archive logs for test '{}': {}", self.name(), e);
}
delete_job(self.k8s_client(), self.name())
.await
.with_context(|| format!("Unable to delete job for test '{}'", self.name()))
Expand Down
17 changes: 15 additions & 2 deletions model/src/system/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use k8s_openapi::api::apps::v1::{
Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment,
};
use k8s_openapi::api::core::v1::{
Affinity, Container, LocalObjectReference, NodeAffinity, NodeSelector, NodeSelectorRequirement,
NodeSelectorTerm, PodSpec, PodTemplateSpec, ServiceAccount,
Affinity, Container, EnvVar, LocalObjectReference, NodeAffinity, NodeSelector,
NodeSelectorRequirement, NodeSelectorTerm, PodSpec, PodTemplateSpec, ServiceAccount,
};
use k8s_openapi::api::rbac::v1::{ClusterRole, ClusterRoleBinding, PolicyRule, RoleRef, Subject};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
Expand All @@ -16,6 +16,7 @@ use maplit::btreemap;

const TESTSYS_CONTROLLER_SERVICE_ACCOUNT: &str = "testsys-controller-service-account";
const TESTSYS_CONTROLLER_CLUSTER_ROLE: &str = "testsys-controller-role";
pub const TESTSYS_CONTROLLER_ARCHIVE_LOGS: &str = "TESTSYS_CONTROLLER_ARCHIVE_LOGS";

/// Defines the testsys-controller service account
pub fn controller_service_account() -> ServiceAccount {
Expand Down Expand Up @@ -114,6 +115,12 @@ pub fn controller_cluster_role() -> ClusterRole {
.collect(),
..Default::default()
},
PolicyRule {
api_groups: Some(vec!["".to_string()]),
resources: Some(vec!["pods".to_string(), "pods/log".to_string()]),
verbs: ["get", "list"].iter().map(|s| s.to_string()).collect(),
..Default::default()
},
]),
..Default::default()
}
Expand Down Expand Up @@ -145,6 +152,7 @@ pub fn controller_cluster_role_binding() -> ClusterRoleBinding {
pub fn controller_deployment(
controller_image: String,
image_pull_secret: Option<String>,
enable_logging: bool,
) -> Deployment {
let image_pull_secrets =
image_pull_secret.map(|secret| vec![LocalObjectReference { name: Some(secret) }]);
Expand Down Expand Up @@ -214,6 +222,11 @@ pub fn controller_deployment(
image: Some(controller_image),
image_pull_policy: None,
name: "controller".to_string(),
env: Some(vec![EnvVar {
name: TESTSYS_CONTROLLER_ARCHIVE_LOGS.to_string(),
value: Some(enable_logging.to_string()),
..Default::default()
}]),
..Default::default()
}],
image_pull_secrets,
Expand Down
2 changes: 1 addition & 1 deletion model/src/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ mod namespace;
pub use agent::{agent_cluster_role, agent_cluster_role_binding, agent_service_account, AgentType};
pub use controller::{
controller_cluster_role, controller_cluster_role_binding, controller_deployment,
controller_service_account,
controller_service_account, TESTSYS_CONTROLLER_ARCHIVE_LOGS,
};
pub use namespace::testsys_namespace;
3 changes: 2 additions & 1 deletion model/src/test_manager/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ impl TestManager {
&self,
uri: String,
secret: Option<String>,
enable_logging: bool,
) -> Result<()> {
let controller_deployment = controller_deployment(uri, secret);
let controller_deployment = controller_deployment(uri, secret, enable_logging);

// If the controller deployment already exists, update it with the new one using Patch. If
// not create a new controller deployment.
Expand Down
4 changes: 2 additions & 2 deletions model/src/test_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl TestManager {
}

/// Install testsys to a cluster.
pub async fn install(&self, controller_config: ImageConfig) -> Result<()> {
pub async fn install(&self, controller_config: ImageConfig, store_logs: bool) -> Result<()> {
self.create_namespace().await?;
self.create_crd().await?;
self.create_roles(AgentType::Test).await?;
Expand All @@ -145,7 +145,7 @@ impl TestManager {
ImageConfig::WithCreds { secret, image } => (image, Some(secret)),
ImageConfig::Image(image) => (image, None),
};
self.create_deployment(image, secret).await?;
self.create_deployment(image, secret, store_logs).await?;

Ok(())
}
Expand Down