Skip to content

Commit

Permalink
Add new sdk field to tasks that allow users to always pull images
Browse files Browse the repository at this point in the history
Previoulsy if you had a container image that had a tag that was consistent
but the image was constantly updated it was impossible to tell Gofer to
check for a new version of the image(This behavior mostly occurs with image
tags like "latest").

This PR add functionality for telling Gofer to pass along the request
to always check for the latest version of the image to the downstream
scheduler.
  • Loading branch information
clintjedwards committed Dec 31, 2024
1 parent 95dbf02 commit 15ebe7e
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 43 deletions.
5 changes: 4 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ Because of this we should give the user as many tools as we can to make sure the
* Find other ways we can debug and make the user's life easier in this regard. It's possible that if we put a lot of
thought into this feature that it can become a game changer for Gofer as a whole.
* Maybe create a timeline on when each task execution happened for a particular run?
* It should be possible to create a timegraph of what ran and for how long and display that to the user.
* Each pipeline run should have a comprehensive log of what happened in the entire run from start to finish. Each task
that was started and when, everytime a task changes state, so on and so forth. This should enable the user to read
through a run from the perspective of the scheduler.

# Small things I want to keep track of that I definitely need to do.

* Investigate if pipelines have the ability to say "force this container to be downloaded"
* When the user asks for a log and there is no log, we should say so.
* When the user tries to get a run and the status if failed, we should automaticallly print the failed reasonings and
then tell the user how to go about finding more information.
Expand Down
2 changes: 1 addition & 1 deletion examplePipelines/rust/dag/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ fn main() {
tasks(vec![
Task::new("first-task", "ghcr.io/clintjedwards/gofer/debug/wait:latest").
description("This task has no dependencies so it will run immediately").
variable("WAIT_DURATION", "20s"),
variable("WAIT_DURATION", "20s").always_pull_newest_image(true),
]).finish().unwrap();
}
10 changes: 10 additions & 0 deletions gofer/docs/src/assets/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -6140,6 +6140,10 @@
"description": "A task represents a particular workload within a pipeline. Tasks are composable within a larger pipeline, meaning they can be run before, after, or alongside other tasks. Tasks represent the lowest level of the Gofer hierarchy and is what Gofer references to see how a user might want their workload handled.",
"type": "object",
"properties": {
"always_pull_newest_image": {
"description": "Always check for most recent version of the current image before running.",
"type": "boolean"
},
"command": {
"nullable": true,
"description": "Command to run on init of container; follows normal docker convention of command: https://docs.docker.com/reference/dockerfile/#cmd",
Expand Down Expand Up @@ -6197,6 +6201,7 @@
}
},
"required": [
"always_pull_newest_image",
"depends_on",
"description",
"id",
Expand All @@ -6209,6 +6214,10 @@
"description": "Represents a single task within a [`Pipeline`]. A task is a unit of work that operates within its own container. Each task defines the operations to be performed and the container environment in which these operations will run.\n\n# Example Usage ```ignore // Define a new task within a pipeline. let task = Task { id: \"example_task\".to_string(), description: Some(\"This task executes a simple print command in an Ubuntu container.\".to_string()), image: \"ubuntu:latest\".to_string(), registry_auth: None, depends_on: HashMap::new(), // No dependencies, so it starts immediately when the pipeline runs. variables: HashMap::from([(\"KEY\", \"value\".to_string())]), entrypoint: None, // Use the image's default entrypoint. command: Some(vec![\"echo\".to_string(), \"Hello World!\".to_string()]), inject_api_token: false, }; ```",
"type": "object",
"properties": {
"always_pull_newest_image": {
"description": "Always attempt to pull the newest container image.",
"type": "boolean"
},
"command": {
"nullable": true,
"type": "array",
Expand Down Expand Up @@ -6258,6 +6267,7 @@
}
},
"required": [
"always_pull_newest_image",
"depends_on",
"id",
"image",
Expand Down
2 changes: 1 addition & 1 deletion gofer/src/api/run_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ impl Shepherd {
.registry_auth
.clone()
.map(|auth| auth.into()),
always_pull: false,
always_pull: new_task_execution.task.always_pull_newest_image,
networking: None,
entrypoint: new_task_execution.task.entrypoint.clone(),
command: new_task_execution.task.command.clone(),
Expand Down
7 changes: 7 additions & 0 deletions gofer/src/api/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub struct Task {

/// Whether to inject a run specific Gofer API key. Useful for using Gofer API within the container.
pub inject_api_token: bool,

/// Always check for most recent version of the current image before running.
pub always_pull_newest_image: bool,
}

impl From<gofer_sdk::config::Task> for Task {
Expand All @@ -92,6 +95,7 @@ impl From<gofer_sdk::config::Task> for Task {
entrypoint: value.entrypoint,
command: value.command,
inject_api_token: value.inject_api_token,
always_pull_newest_image: value.always_pull_newest_image,
}
}
}
Expand All @@ -116,6 +120,7 @@ impl Task {
entrypoint: serde_json::to_string(&self.entrypoint)?,
command: serde_json::to_string(&self.command)?,
inject_api_token: self.inject_api_token,
always_pull_newest_image: self.always_pull_newest_image,
};

Ok(task)
Expand All @@ -132,6 +137,7 @@ impl Task {
entrypoint: serde_json::from_str(&storage_task.entrypoint)?,
command: serde_json::from_str(&storage_task.command)?,
inject_api_token: storage_task.inject_api_token,
always_pull_newest_image: storage_task.always_pull_newest_image,
};

Ok(task)
Expand All @@ -158,6 +164,7 @@ mod tests {
entrypoint: Some(vec!["/entrypoint.sh".to_string()]),
command: Some(vec!["run".to_string(), "--option".to_string()]),
inject_api_token: true,
always_pull_newest_image: true,
};

let namespace_id = "ns1".to_string();
Expand Down
20 changes: 5 additions & 15 deletions gofer/src/conf/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,15 @@ mod tests {
#[test]
fn load_from_environment_variables() {
env::set_var("GOFER_API_BASE_URL", "http://localhost:3001");
env::set_var("GOFER_ADMIN_KEY", "envoverride");
env::set_var("GOFER_TOKEN", "envoveride");

let config = Configuration::<CliConfig>::load(None).unwrap();

// Cleanup environment variables after test
env::remove_var("GOFER_API_BASE_URL");
env::remove_var("GOFER_ADMIN_KEY");

assert_eq!(
config,
CliConfig {
namespace: "default".to_string(),
detail: false,
token: "example".to_string(),
api_base_url: "http://localhost:3001".to_string(),
output_format: OutputFormat::Plain,
debug: false,
insecure_skip_tls_verify: None,
}
);
env::remove_var("GOFER_TOKEN");

assert_eq!(config.api_base_url, "http://localhost:3001");
assert_eq!(config.token, "envoveride");
}
}
25 changes: 13 additions & 12 deletions gofer/src/storage/migrations/0_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,19 @@ CREATE TABLE IF NOT EXISTS runs (
) STRICT;

CREATE TABLE IF NOT EXISTS tasks (
namespace_id TEXT NOT NULL,
pipeline_id TEXT NOT NULL,
pipeline_config_version INTEGER NOT NULL,
task_id TEXT NOT NULL,
description TEXT NOT NULL,
image TEXT NOT NULL,
registry_auth TEXT NOT NULL,
depends_on TEXT NOT NULL,
variables TEXT NOT NULL,
entrypoint TEXT NOT NULL,
command TEXT NOT NULL,
inject_api_token INTEGER NOT NULL CHECK (inject_api_token IN (0, 1)),
namespace_id TEXT NOT NULL,
pipeline_id TEXT NOT NULL,
pipeline_config_version INTEGER NOT NULL,
task_id TEXT NOT NULL,
description TEXT NOT NULL,
image TEXT NOT NULL,
registry_auth TEXT NOT NULL,
depends_on TEXT NOT NULL,
variables TEXT NOT NULL,
entrypoint TEXT NOT NULL,
command TEXT NOT NULL,
inject_api_token INTEGER NOT NULL CHECK (inject_api_token IN (0, 1)),
always_pull_newest_image INTEGER NOT NULL CHECK (always_pull_newest_image IN (0, 1)),
FOREIGN KEY (namespace_id) REFERENCES namespaces(id) ON DELETE CASCADE,
FOREIGN KEY (namespace_id, pipeline_id) REFERENCES pipeline_metadata(namespace_id, pipeline_id) ON DELETE CASCADE,
FOREIGN KEY (namespace_id, pipeline_id, pipeline_config_version) REFERENCES pipeline_configs(namespace_id, pipeline_id, version) ON DELETE CASCADE,
Expand Down
11 changes: 7 additions & 4 deletions gofer/src/storage/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ pub struct Task {
pub entrypoint: String,
pub command: String,
pub inject_api_token: bool,
pub always_pull_newest_image: bool,
}

pub async fn insert(conn: &mut SqliteConnection, task: &Task) -> Result<(), StorageError> {
let query = sqlx::query(
"INSERT INTO tasks (namespace_id, pipeline_id, pipeline_config_version, task_id, \
description, image, registry_auth, depends_on, variables, entrypoint, command, \
inject_api_token) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
inject_api_token, always_pull_newest_image) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
)
.bind(&task.namespace_id)
.bind(&task.pipeline_id)
Expand All @@ -35,7 +36,8 @@ pub async fn insert(conn: &mut SqliteConnection, task: &Task) -> Result<(), Stor
.bind(&task.variables)
.bind(&task.entrypoint)
.bind(&task.command)
.bind(task.inject_api_token);
.bind(task.inject_api_token)
.bind(task.always_pull_newest_image);

let sql = query.sql();

Expand All @@ -55,7 +57,7 @@ pub async fn list(
) -> Result<Vec<Task>, StorageError> {
let query = sqlx::query_as::<_, Task>(
"SELECT namespace_id, pipeline_id, pipeline_config_version, task_id, description, image, \
registry_auth, depends_on, variables, entrypoint, command, inject_api_token FROM \
registry_auth, depends_on, variables, entrypoint, command, inject_api_token, always_pull_newest_image FROM \
tasks WHERE namespace_id = ? AND pipeline_id = ? AND pipeline_config_version = ?;",
)
.bind(namespace_id)
Expand All @@ -81,7 +83,7 @@ pub async fn get(
task_id: &str,
) -> Result<Task, StorageError> {
let query = sqlx::query_as::<_, Task>("SELECT namespace_id, pipeline_id, pipeline_config_version, task_id, description, image, \
registry_auth, depends_on, variables, entrypoint, command, inject_api_token FROM \
registry_auth, depends_on, variables, entrypoint, command, inject_api_token, always_pull_newest_image FROM \
tasks WHERE namespace_id = ? AND pipeline_id = ? AND pipeline_config_version = ? AND task_id = ?;")
.bind(namespace_id)
.bind(pipeline_id)
Expand Down Expand Up @@ -162,6 +164,7 @@ mod tests {
entrypoint: "/bin/sh".into(),
command: "cargo test".into(),
inject_api_token: false,
always_pull_newest_image: false,
};

insert(&mut conn, &task).await?;
Expand Down
23 changes: 14 additions & 9 deletions sdk/go/sdk.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions sdk/rust/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ pub struct Task {
pub entrypoint: Option<Vec<String>>,
pub command: Option<Vec<String>>,
pub inject_api_token: bool,

/// Always attempt to pull the newest container image.
pub always_pull_newest_image: bool,
}

impl Task {
Expand All @@ -250,6 +253,7 @@ impl Task {
entrypoint: None,
command: None,
inject_api_token: false,
always_pull_newest_image: true,
}
}

Expand Down Expand Up @@ -318,6 +322,12 @@ impl Task {
self.inject_api_token = inject_token;
self
}

/// Always attempt to pull the newest container image for a given tag.
pub fn always_pull_newest_image(mut self, pull: bool) -> Self {
self.always_pull_newest_image = pull;
self
}
}

#[cfg(test)]
Expand Down

0 comments on commit 15ebe7e

Please sign in to comment.