Skip to content

Commit

Permalink
Implement Update Deployment Image (#44)
Browse files Browse the repository at this point in the history
* Chore [Language] Update Constants

- [+] chore(init_const.go): add new error and success messages for updating deployment image
- [+] feat(init_const.go): add new success message for updating deployment image

* Feat [Worker] Update Deployment Image

- [+] feat(worker): add UpdateDeploymentImage function to update deployment image
- [+] feat(worker): add updateImageWithRetry function to retry updating image with conflict handling
- [+] feat(worker): add updateDeploymentImageOnce function to update deployment image once
- [+] feat(worker): add reportSuccess function to report successful image update
- [+] feat(worker): add reportFailure function to report failed image update
- [+] feat(worker): add reportMaxRetriesFailure function to report failed image update after reaching max retries

* Chore [Language] Update Constants

- [+] chore(init_const.go): add missing error messages for required parameters
- [+] feat(init_const.go): add new task for updating deployment image

* Feat [Worker] [Task] Extract Parameter

- [+] fix(worker): fix typo in constant name from deploymentName to deploYmentName
- [+] feat(worker): add new constant contaInerName and newImAge
- [+] feat(worker): add function extractDeploymentParameters to extract deployment parameters from map

* Feat [Worker] [Task Crew] Update Deployment Image

- [+] feat(worker): add support for updating image deployments
- [+] fix(worker): fix typo in variable name 'deploymentName'
- [+] fix(worker): fix typo in variable name 'deploYmentName'
- [+] fix(worker): fix typo in variable name 'deploymentName'
- [+] feat(worker): add support for updating image deployments

* Go Docs [Worker] [Update Image] Update Documentation

- [+] feat(worker): update package documentation to reflect new features and functions
- [+] feat(worker): add UpdateDeploymentImage function to update the image of a container within a deployment
- [+] feat(worker): add updateImageWithRetry function to handle retries on conflicts when updating deployment image
- [+] feat(worker): add updateDeploymentImageOnce function to perform a single attempt to update the deployment image
- [+] feat(worker): add reportSuccess function to send success message and log success when updating deployment image
- [+] feat(worker): add reportFailure function to send error message and log failure when updating deployment image
- [+] feat(worker): add reportMaxRetriesFailure function to send message and log failure after reaching maximum number of retries when updating deployment image
- [+] feat(worker): add extractDeploymentParameters function to extract and validate deploymentName, containerName, and newImage from parameters map
  • Loading branch information
H0llyW00dzZ authored Dec 25, 2023
1 parent cecf8d9 commit 627bdb3
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 28 deletions.
48 changes: 30 additions & 18 deletions language/init_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ const (
ErrorScalingDeployment = "Failed to scale deployment '%s' to '%d': %v"
ErrorParameterDeploymentName = "parameter 'deploymentName' is required and must be a string"
ErrorParameterReplicas = "parameter 'replicas' is required and must be an integer"
ErrorParameterNewImage = "parameter 'newImage' is required and must be a string"
ErrorParameterContainerName = "parameter 'containerName' is required and must be a string"
ErrorConflict = "Conflict detected when scaling deployment '%s', resolving..."
FailedToScaleDeployment = "Failed to scale deployment '%s' to '%d' after %d retries: %v"
FailedTOScallEdDeployment = "Failed to scale deployment '%s' to '%d': %v"
FailedToGetDeployment = "Failed to get deployment '%s': %v"
ErrorFailedtoScalingDeployment = "Failed to scale deployment"
ErrorConflictUpdateImage = "Conflict encountered while updating deployment image for deployment %s Retrying..."
ErrorReachedMaxRetries = "Reached max retries for updating deployment image"
ErrorFailedToUpdateImage = "Failed to update image for deployment %s: %v"
ErrorFailedToUpdateImageAfterRetries = "Failed to update image for deployment %s after %d retries"
ErrorFailedToUpdateDeployImage = "Failed to update deployment image"
)

const (
Expand All @@ -62,24 +69,26 @@ const (
)

const (
TaskLabelKey = "LabelKey"
TaskCheckHealth = "CheckHealth"
TaskGetPod = "GetPod"
TaskFetchPods = "FetchPods"
TaskProcessPod = "ProcessPod"
TaskCreatePod = "CreatePod"
TaskDeletePod = "DeletePod"
TaskCompleteS = "Task '%s' completed successfully."
TaskWorker_Name = "Crew Worker %d: %s"
TaskNumber = "The number of workers and the number of tasks do not match."
RunningTaskBackup = "Running BackupTaskRunner with parameters:"
Task_Name = "task_name"
Worker_Name = "crew_worker"
TaskLabelPods = "WriteLabelPods"
TaskManageDeployments = "ManageDeployments"
TaskScaleDeployment = "ScaleDeployment"
ScalingDeployment = "Crew Worker %d: Scaling deployments"
ManagingDeployments = "Crew Worker %d: Managing deployments"
TaskLabelKey = "LabelKey"
TaskCheckHealth = "CheckHealth"
TaskGetPod = "GetPod"
TaskFetchPods = "FetchPods"
TaskProcessPod = "ProcessPod"
TaskCreatePod = "CreatePod"
TaskDeletePod = "DeletePod"
TaskCompleteS = "Task '%s' completed successfully."
TaskWorker_Name = "Crew Worker %d: %s"
TaskNumber = "The number of workers and the number of tasks do not match."
RunningTaskBackup = "Running BackupTaskRunner with parameters:"
Task_Name = "task_name"
Worker_Name = "crew_worker"
TaskLabelPods = "WriteLabelPods"
TaskManageDeployments = "ManageDeployments"
TaskScaleDeployment = "ScaleDeployment"
TaskUpdateDeploymentImage = "UpdateDeploymentImage"
ScalingDeployment = "Crew Worker %d: Scaling deployments"
ManagingDeployments = "Crew Worker %d: Managing deployments"
UpdatingImage = "Crew Worker %d: Updating deployment image"
)

const (
Expand All @@ -96,6 +105,9 @@ const (
WorkerSucessfully = "Successfully labeled pods %v=%s"
DeploymentScaled = "Deployment '%s' scaled to '%d'"
ScaledDeployment = "Scaled deployment '%s' to '%d' replicas"
ImageSuccessfully = "Image updated successfully for deployment %s to %s"
DeploymentImageUpdated = "Deployment image updated successfully"
UpdatingDeploymentImage = "Updating deployment image"
)

const (
Expand Down
4 changes: 3 additions & 1 deletion worker/cmd_constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ const (
labelSelector = "labelSelector"
fieldSelector = "fieldSelector"
limIt = "limit"
deploymentName = "deploymentName"
deploYmentName = "deploymentName"
contaInerName = "containerName"
newImAge = "newImage"
repliCas = "replicas"
deploymenT = "deployment"
scalE = "scale"
Expand Down
24 changes: 17 additions & 7 deletions worker/docs.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Package worker provides a set of tools designed to facilitate the interaction with
// Kubernetes resources from within a cluster. It offers a convenient abstraction for
// managing Kubernetes operations, focusing on pod health checks, pod labeling, structured logging,
// scaling deployments, and task configuration through YAML or JSON files.
// scaling deployments, updating deployment images, and task configuration through YAML or JSON files.
//
// The package is intended for applications running as pods within Kubernetes clusters
// and leverages in-cluster configuration to establish a clientset for API interactions.
Expand All @@ -27,14 +27,18 @@
// - Scaling deployments is now supported with functions that allow for adjusting the
// number of replicas with retry logic to handle conflicts.
//
// - Updating deployment images has been introduced, enabling the change of container images
// within deployments. This includes handling retries on update conflicts and reporting
// the outcome of the operation.
//
// # Functions
//
// - NewKubernetesClient: Creates a new Kubernetes clientset configured for in-cluster
// communication with the Kubernetes API server.
//
// - CrewWorker: Orchestrates a worker process to perform tasks such as health checks,
// labeling of pods, scaling deployments, and other configurable tasks within a specified namespace. It includes
// retry logic to handle transient errors and respects cancellation and timeout contexts.
// labeling of pods, scaling deployments, updating deployment images, and other configurable tasks within a specified namespace.
// It includes retry logic to handle transient errors and respects cancellation and timeout contexts.
// Structured logging is used to provide detailed contextual information.
//
// - LoadTasksFromYAML: Loads task configurations from a YAML file, allowing for
Expand All @@ -56,13 +60,16 @@
// with retries on conflicts. It provides detailed logs and returns success or failure
// messages through a results channel.
//
// - UpdateDeploymentImage: Updates the image of a specified container within a deployment,
// handling retries on conflicts and reporting the outcome through a results channel.
//
// Usage:
//
// Initialize the Kubernetes client using NewKubernetesClient, then leverage the client
// to perform operations such as retrieving and processing pods within a namespace, and
// scaling deployments as required. Contexts are used to manage the lifecycle of the worker
// processes, including graceful shutdowns and cancellation. Task configurations can be loaded
// from a YAML file for enhanced flexibility.
// to perform operations such as retrieving and processing pods within a namespace, scaling
// deployments, and updating deployment images as required. Contexts are used to manage the
// lifecycle of the worker processes, including graceful shutdowns and cancellation. Task
// configurations can be loaded from a YAML file for enhanced flexibility.
//
// Example:
//
Expand Down Expand Up @@ -104,6 +111,9 @@
// - The scaling functionality has been introduced to adjust deployment sizes with
// conflict resolution strategies, catering to dynamic workload requirements.
//
// - Image update functionality has been added to modify the image of a container within
// a deployment, with built-in retry logic for handling update conflicts.
//
// # TODO
//
// - Extend the functionality of the CrewWorker function to support a wider range
Expand Down
6 changes: 5 additions & 1 deletion worker/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ func init() {

// Register the new TaskRunner for managing deployments.
RegisterTaskRunner("CrewManageDeployments", func() TaskRunner { return &CrewManageDeployments{} })
// register the new TaskRunner for scaling deployments.

// Register the new TaskRunner for scaling deployments.
RegisterTaskRunner("CrewScaleDeployments", func() TaskRunner { return &CrewScaleDeployments{} })

// Register the new TaskRunner for updating image deployments.
RegisterTaskRunner("CrewUpdateImageDeployments", func() TaskRunner { return &CrewUpdateImageDeployments{} })

}
64 changes: 63 additions & 1 deletion worker/tasks_crew.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (c *CrewScaleDeployments) Run(ctx context.Context, clientset *kubernetes.Cl
)

// Assume parameters contain "deploymentName" and "replicas" for scaling
deploymentName, ok := parameters[deploymentName].(string)
deploymentName, ok := parameters[deploYmentName].(string)
if !ok {
navigator.LogErrorWithEmojiRateLimited(language.PirateEmoji, language.InvalidParameters, fields...)
return fmt.Errorf(language.ErrorParameterDeploymentName)
Expand Down Expand Up @@ -248,6 +248,68 @@ func (c *CrewScaleDeployments) Run(ctx context.Context, clientset *kubernetes.Cl
return nil
}

// CrewUpdateImageDeployments contains information required to update the image of a Kubernetes deployment.
type CrewUpdateImageDeployments struct {
// shipsNamespace specifies the Kubernetes namespace where the deployments are located.
shipsNamespace string

// workerIndex is an identifier for the worker that is executing the update operation.
// This can be used for logging and tracking the progress of the update across multiple workers.
workerIndex int
}

// Run performs the update operation for a Kubernetes deployment's container image.
// It extracts the deployment name, container name, and new image from the task parameters,
// and then proceeds with the update using the UpdateDeploymentImage function.
// The method logs the start and end of the update operation and handles any errors encountered.
func (c *CrewUpdateImageDeployments) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, taskName string, parameters map[string]interface{}, workerIndex int) error {
// Define logging fields for structured logging
fields := navigator.CreateLogFields(
language.TaskUpdateDeploymentImage,
shipsNamespace,
navigator.WithAnyZapField(zap.String(language.Task_Name, taskName)),
)

// Log the start of the update operation
navigator.LogInfoWithEmoji(
language.PirateEmoji,
fmt.Sprintf(language.UpdatingImage, workerIndex),
fields...,
)

// Extract deployment parameters from the provided task parameters
deploymentName, containerName, newImage, err := extractDeploymentParameters(parameters)
if err != nil {
// Log the error and return if parameter extraction fails
navigator.LogErrorWithEmojiRateLimited(language.PirateEmoji, err.Error(), fields...)
return err
}

// Create a channel to receive results from the update operation
results := make(chan string, 1)
defer close(results)

// Retrieve the logger instance
logger := zap.L()

// Update the deployment image using the extracted parameters
err = UpdateDeploymentImage(ctx, clientset, shipsNamespace, deploymentName, containerName, newImage, results, logger)
if err != nil {
// Log the error and return if the update operation fails
errorFields := append(fields, zap.String(language.Error, err.Error()))
failedMessage := fmt.Sprintf("%v %s", constant.ErrorEmoji, language.ErrorFailedToUpdateDeployImage)
navigator.LogErrorWithEmojiRateLimited(language.PirateEmoji, failedMessage, errorFields...)
return err
}

// Process and log the results from the update operation
for updateResult := range results {
navigator.LogInfoWithEmoji(language.PirateEmoji, updateResult, fields...)
}

return nil
}

// getLatestVersionOfPod fetches the latest version of the Pod from the Kubernetes API.
func getLatestVersionOfPod(ctx context.Context, clientset *kubernetes.Clientset, namespace string, podName string) (*corev1.Pod, error) {
// Fetch the latest version of the Pod using the clientset.
Expand Down
132 changes: 132 additions & 0 deletions worker/update_image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package worker

import (
"context"
"fmt"
"time"

"github.com/H0llyW00dzZ/K8sBlackPearl/language"
"github.com/H0llyW00dzZ/K8sBlackPearl/navigator"
"github.com/H0llyW00dzZ/go-urlshortner/logmonitor/constant"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
)

// UpdateDeploymentImage attempts to update the image of a specified container within a deployment in Kubernetes.
// It performs retries on conflicts and reports the outcome through a results channel. If the image update is successful,
// a success message is sent to the results channel. In case of errors other than conflicts or after exceeding the maximum
// number of retries, it reports the failure.
//
// Parameters:
// - ctx: Context for cancellation and timeout.
// - clientset: A Kubernetes clientset to interact with the Kubernetes API.
// - namespace: The Kubernetes namespace containing the deployment.
// - deploymentName: The name of the deployment to update.
// - containerName: The name of the container within the deployment to update.
// - newImage: The new image to apply to the container.
// - results: A channel to send operation results for logging.
// - logger: A logger for structured logging.
//
// Returns an error if the operation fails after the maximum number of retries or if a non-conflict error is encountered.
func UpdateDeploymentImage(ctx context.Context, clientset *kubernetes.Clientset, namespace, deploymentName, containerName, newImage string, results chan<- string, logger *zap.Logger) error {
for attempt := 0; attempt < maxRetries; attempt++ {
err := updateImageWithRetry(ctx, clientset, namespace, deploymentName, containerName, newImage)
if err == nil {
reportSuccess(results, logger, deploymentName, newImage)
return nil
}

if !errors.IsConflict(err) {
reportFailure(results, logger, deploymentName, newImage, err)
return err
}

navigator.LogInfoWithEmoji(language.SwordEmoji, fmt.Sprintf(language.ErrorConflictUpdateImage, deploymentName))
time.Sleep(retryDelay)
}

reportMaxRetriesFailure(results, logger, deploymentName, newImage)
return fmt.Errorf(language.ErrorReachedMaxRetries)
}

// updateImageWithRetry attempts to update the deployment image, retrying on conflicts.
// It uses the Kubernetes client-go utility 'RetryOnConflict' to handle retries.
//
// This function is unexported and used internally by UpdateDeploymentImage.
func updateImageWithRetry(ctx context.Context, clientset *kubernetes.Clientset, namespace, deploymentName, containerName, newImage string) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
return updateDeploymentImageOnce(ctx, clientset, namespace, deploymentName, containerName, newImage)
})
}

// updateDeploymentImageOnce performs a single attempt to update the deployment image.
// It fetches the current deployment, updates the image for the specified container, and applies the changes.
//
// This function is unexported and used internally by updateImageWithRetry.
func updateDeploymentImageOnce(ctx context.Context, clientset *kubernetes.Clientset, namespace, deploymentName, containerName, newImage string) error {
deployment, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deploymentName, v1.GetOptions{})
if err != nil {
return err
}

for i, container := range deployment.Spec.Template.Spec.Containers {
if container.Name == containerName {
deployment.Spec.Template.Spec.Containers[i].Image = newImage
break
}
}

_, err = clientset.AppsV1().Deployments(namespace).Update(ctx, deployment, v1.UpdateOptions{})
return err
}

// reportSuccess sends a success message to the results channel and logs the success.
//
// This function is unexported and used internally by UpdateDeploymentImage.
func reportSuccess(results chan<- string, logger *zap.Logger, deploymentName, newImage string) {
successMsg := fmt.Sprintf(language.ImageSuccessfully, deploymentName, newImage)
results <- successMsg
navigator.LogInfoWithEmoji(constant.SuccessEmoji, successMsg)
}

// reportFailure sends an error message to the results channel and logs the failure.
//
// This function is unexported and used internally by UpdateDeploymentImage.
func reportFailure(results chan<- string, logger *zap.Logger, deploymentName, newImage string, err error) {
errorMessage := fmt.Sprintf(language.ErrorFailedToUpdateImage, deploymentName, err)
results <- errorMessage
navigator.LogErrorWithEmojiRateLimited(constant.ErrorEmoji, errorMessage)
}

// reportMaxRetriesFailure sends a message to the results channel and logs the failure after reaching the maximum number of retries.
//
// This function is unexported and used internally by UpdateDeploymentImage.
func reportMaxRetriesFailure(results chan<- string, logger *zap.Logger, deploymentName, newImage string) {
failMessage := fmt.Sprintf(language.ErrorFailedToUpdateImageAfterRetries, deploymentName, maxRetries)
results <- failMessage
navigator.LogErrorWithEmojiRateLimited(constant.ErrorEmoji, failMessage)
}

// extractDeploymentParameters extracts and validates the deploymentName, containerName, and newImage from a map of parameters.
// It returns an error if any of the parameters are missing or not a string type.
//
// This function is unexported and used internally by other functions within the package.
func extractDeploymentParameters(parameters map[string]interface{}) (deploymentName, containerName, newImage string, err error) {
var ok bool
if deploymentName, ok = parameters[deploYmentName].(string); !ok {
err = fmt.Errorf(language.ErrorParameterDeploymentName)
return
}
if containerName, ok = parameters[contaInerName].(string); !ok {
err = fmt.Errorf(language.ErrorParameterContainerName)
return
}
if newImage, ok = parameters[newImAge].(string); !ok {
err = fmt.Errorf(language.ErrorParameterNewImage)
return
}
return
}

0 comments on commit 627bdb3

Please sign in to comment.