Skip to content

Commit

Permalink
Gopher Crew Writing Labels Pods (#32)
Browse files Browse the repository at this point in the history
* Feat [Worker] Writing Labels Pods

- [+] feat(worker): register CrewWriteLabelPods TaskRunner
- [+] feat(worker): add CrewLabelPodsTaskRunner implementation

* Feat [Language] [Constant] write labels pods

- [+] chore(init_const.go): reformat code and add missing constant for labeling pods
- [+] feat(init_const.go): add new constant for starting to write label pods and successfully labeled pods

* Fix [Language] Constant

- [+] fix(init_const.go): remove unnecessary variable substitution in ErrorFailedToWriteLabel constant
- [+] feat(init_const.go): add variable substitution in WorkerSucessfully constant to include label key and value

* Feat [Language] [Constant] add new constant 'Error'

- [+] feat(init_const.go): add new constant 'Error'

* Feat [Docs] [README] docs docs docs

- [+] chore(README.md): update task list and descriptions
- [+] feat(README.md): add structured logging integration
- [+] feat(README.md): implement dynamic task execution model
- [+] feat(README.md): develop support for additional Kubernetes resources
- [+] feat(README.md): design and integrate metrics collection framework

* Feat [Worker] implementation writing labels pods

- [+] feat(worker): add CrewLabelPodsTaskRunner implementation
- [+] feat(worker): add extractLabelParameters function
- [+] feat(worker): add Run method to CrewLabelPodsTaskRunner

* Go Docs [Worker] Update package documentation

- [+] chore(docs.go): update package documentation with latest enhancements and TODOs
  • Loading branch information
H0llyW00dzZ authored Dec 23, 2023
1 parent 80b4bb4 commit 69a556f
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 31 deletions.
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,24 @@ In real-world applications, the complexity and cost can escalate quickly. `K8sBl
- [x] **Error Handling and Retry Logic**: Successfully integrated error handling and retry mechanisms within the `CrewWorker` function to manage transient errors gracefully.

- [ ] **Function Versatility and Configurability**:
- Enhance the versatility of the `CrewWorker` function. It currently processes tasks in a generic manner, but it could be extended to handle a wider variety of tasks with different complexities.
- Improve the configurability of task processing. The `CrewGetPodsTaskRunner.Run` method is specialized in listing pods; however, it should be adaptable to accommodate different parameters and settings for various task types.
- ~~Enhance the versatility of the `CrewWorker` function. It currently processes tasks in a generic manner, but it could be extended to handle a wider variety of tasks with different complexities.~~
- ~~Improve the configurability of task processing. The `CrewGetPodsTaskRunner.Run` method is specialized in listing pods; however, it should be adaptable to accommodate different parameters and settings for various task types.~~
- Plan to enhance the versatility of the `CrewWorker` function to handle tasks with varying complexities, not limited to pod health checks.
- Aim to improve the configurability of task processing to allow `CrewWorker` and other related functions to accommodate different parameters and settings for a variety of task types.

## Structured Logging Integration
- [x] **Structured Logging**: Integrated structured logging throughout the package, providing clear and consistent logs with additional context for debugging.

## Task Execution Model
- [x] **Dynamic Task Execution Model**: Implemented a dynamic task execution model that allows for registering and retrieving `TaskRunner` implementations based on task types, enhancing extensibility.

## Package Extension
- [ ] **Support for Additional Kubernetes Resources**:
- Develop the capability to manage and interact with a broader range of Kubernetes resources beyond pods, such as services, deployments, and stateful sets.
- Implement operations that cater to specific resource requirements, enabling a more comprehensive management toolset within the Kubernetes ecosystem.
- Develop the capability to manage and interact with a broader range of Kubernetes resources, including services, deployments, and stateful sets.
- Plan to implement operations that cater to specific resource requirements, enabling a more comprehensive management toolset within the Kubernetes ecosystem.

## Monitoring and Metrics
- [ ] **Metrics Collection Framework**:
- Design and integrate a metrics collection system to monitor the health and efficiency of the worker processes.
- Metrics should provide insights into the success rates of tasks, resource usage, processing times, and error rates.
- Consider using existing monitoring tools that can be integrated with Kubernetes to streamline the collection and visualization of metrics.
- Explore the possibility of leveraging existing monitoring tools that can be integrated with Kubernetes for streamlined metrics collection and visualization.
59 changes: 33 additions & 26 deletions language/init_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,35 @@ package language

// Note: This constant used for translation.
const (
ErrorListingPods = "error listing pods: %w"
ErrorUpdatingPodLabels = "error updating pod labels: %w"
ErrorCreatingPod = "error creating pod: %w"
ErrorDeletingPod = "error deleting pod: %w"
ErrorGettingPod = "error getting pod: %w"
ErrorPodNotFound = "pod not found"
ErrorUpdatingPod = "Error updating pod: %w"
ErrorRetrievingPods = "Error retrieving pods: %w"
PodAndStatus = "Pod: %s, Status: %s"
PodAndStatusAndHealth = "Pod: %s, Status: %s, Health: %s"
errconfig = "cannot load kubeconfig: %w"
cannotcreatek8s = "cannot create kubernetes client: %w"
ErrorLoggerIsNotSet = "Logger is not set! Cannot log info: %s\n"
ErrorLogger = "cannot create logger: %w"
ErrorFailedToComplete = "Failed to complete task after %d attempts"
ContextCancelledAbort = "Context cancelled, aborting retries."
ContextCancelled = "Context cancelled"
ErrorDuringTaskAttempt = "Error during task, attempt %d/%d: %v"
UnknownTaskType = "unknown task type: %s"
InvalidParameters = "invalid parameters"
InvalidparametersL = "invalid parameters: labelSelector, fieldSelector, or limit"
ErrorPodsCancelled = "Pod processing was cancelled."
ErrorPailedtoListPods = "Failed to list pods: %w"
ErrorParamLabelSelector = "parameter 'labelSelector' is required and must be a string"
ErrorParamFieldSelector = "parameter 'fieldSelector' is required and must be a string"
ErrorParamLimit = "parameter 'limit' is required and must be an integer"
ErrorListingPods = "error listing pods: %w"
ErrorUpdatingPodLabels = "error updating pod labels: %w"
ErrorCreatingPod = "error creating pod: %w"
ErrorDeletingPod = "error deleting pod: %w"
ErrorGettingPod = "error getting pod: %w"
ErrorPodNotFound = "pod not found"
ErrorUpdatingPod = "Error updating pod: %w"
ErrorRetrievingPods = "Error retrieving pods: %w"
PodAndStatus = "Pod: %s, Status: %s"
PodAndStatusAndHealth = "Pod: %s, Status: %s, Health: %s"
errconfig = "cannot load kubeconfig: %w"
cannotcreatek8s = "cannot create kubernetes client: %w"
ErrorLoggerIsNotSet = "Logger is not set! Cannot log info: %s\n"
ErrorLogger = "cannot create logger: %w"
ErrorFailedToComplete = "Failed to complete task after %d attempts"
ContextCancelledAbort = "Context cancelled, aborting retries."
ContextCancelled = "Context cancelled"
ErrorDuringTaskAttempt = "Error during task, attempt %d/%d: %v"
UnknownTaskType = "unknown task type: %s"
InvalidParameters = "invalid parameters"
InvalidparametersL = "invalid parameters: labelSelector, fieldSelector, or limit"
ErrorPodsCancelled = "Pod processing was cancelled."
ErrorPailedtoListPods = "Failed to list pods: %w"
ErrorParamLabelSelector = "parameter 'labelSelector' is required and must be a string"
ErrorParamFieldSelector = "parameter 'fieldSelector' is required and must be a string"
ErrorParamLimit = "parameter 'limit' is required and must be an integer"
ErrorParamLabelKey = "parameter 'labelKey' is required and must be a string"
ErrorParamLabelabelValue = "parameter 'labelValue' is required and must be a string"
ErrorFailedToWriteLabel = "Failed to write label pods"
)

const (
Expand Down Expand Up @@ -60,6 +63,7 @@ const (
RunningTaskBackup = "Running BackupTaskRunner with parameters:"
Task_Name = "task_name"
Worker_Name = "crew_worker"
TaskLabelPods = "WriteLabelPods"
)

const (
Expand All @@ -72,6 +76,8 @@ const (
WorkerCountPods = "Count pods"
WorkerCheckingHealth = "Checking health pods"
CrewWorkerUnit = "crew_worker_unit"
StartWritingLabelPods = "Starting to writing label pods with %s=%s"
WorkerSucessfully = "Successfully labeled pods %v=%s"
)

const (
Expand All @@ -86,6 +92,7 @@ const (
const (
Attempt = "attempt"
Max_Retries = "max_retries"
Error = "error"
)

const (
Expand Down
14 changes: 14 additions & 0 deletions worker/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@
// The package is intended for applications running as pods within Kubernetes clusters
// and leverages in-cluster configuration to establish a clientset for API interactions.
//
// Enhancements in the latest version:
//
// - Structured logging has been integrated throughout the package, providing clear
// and consistent logging messages that are easier to read and debug. Logging now
// includes emojis for quick visual parsing and additional context such as task names
// and worker indices.
//
// - The dynamic task execution model allows for registering and retrieving TaskRunner
// implementations based on task types. This extensibility makes it possible to easily
// add new task handling logic without modifying the core package code.
//
// # Functions
//
// - NewKubernetesClient: Creates a new Kubernetes clientset configured for in-cluster
Expand Down Expand Up @@ -57,6 +68,9 @@
// - Logging functionality is customizable, allowing different workers to provide
// unique contextual information, such as worker indices or specific namespaces.
//
// - The dynamic task execution model supports adding new tasks and task runners
// without changing existing code, facilitating scalability and extensibility.
//
// # TODO
//
// - Extend the functionality of the CrewWorker function to support a wider range
Expand Down
3 changes: 3 additions & 0 deletions worker/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ func init() {
// Registers a TaskRunner for an alternate method of retrieving Kubernetes pods.
RegisterTaskRunner("CrewGetPodsTaskRunner", func() TaskRunner { return &CrewGetPodsTaskRunner{} })

// Registers a TaskRunner for labeling Kubernetes pods.
RegisterTaskRunner("CrewWriteLabelPods", func() TaskRunner { return &CrewLabelPodsTaskRunner{} })

}
33 changes: 33 additions & 0 deletions worker/labels_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,36 @@ func labelSinglePod(ctx context.Context, clientset *kubernetes.Clientset, pod *c
}
return nil
}

// extractLabelParameters extracts and validates the label key and value from the parameters.
// It is used to ensure that the parameters provided for labeling operations are of the correct
// type and are present before proceeding with the operation. This function is crucial for
// maintaining type safety and preventing runtime errors that could occur when accessing the
// map directly. It acts as a safeguard, checking the existence and type of the label parameters
// before they are used to label pods.
//
// Parameters:
// - parameters: A map of interface{} values that should contain the labeling parameters.
//
// Returns:
// - labelKey: The extracted label key as a string if present and of type string.
// - labelValue: The extracted label value as a string if present and of type string.
// - err: An error if either the label key or value is missing from the parameters or is not a string.
//
// The function will return an error if the required parameters ("labelKey" and "labelValue") are
// not found in the input map, or if they are not of type string. This error can then be handled
// by the caller to ensure the labeling operation does not proceed with invalid parameters.
func extractLabelParameters(parameters map[string]interface{}) (labelKey string, labelValue string, err error) {
var ok bool
labelKey, ok = parameters["labelKey"].(string)
if !ok {
return "", "", fmt.Errorf(language.ErrorParamLabelKey)
}

labelValue, ok = parameters["labelValue"].(string)
if !ok {
return "", "", fmt.Errorf(language.ErrorParamLabelabelValue)
}

return labelKey, labelValue, nil
}
39 changes: 39 additions & 0 deletions worker/tasks_crew.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/H0llyW00dzZ/K8sBlackPearl/language"
"github.com/H0llyW00dzZ/K8sBlackPearl/navigator"
"github.com/H0llyW00dzZ/go-urlshortner/logmonitor/constant"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
)
Expand Down Expand Up @@ -133,6 +134,44 @@ func (c *CrewProcessCheckHealthTask) Run(ctx context.Context, clientset *kuberne
return c.logResults(ctx, results)
}

// CrewLabelPodsTaskRunner is an implementation of TaskRunner that labels all pods
// in a given Kubernetes namespace with a specific label.
type CrewLabelPodsTaskRunner struct {
workerIndex int
}

// CrewLabelPodsTaskRunner is an implementation of the TaskRunner interface that applies a set of labels
// to all pods within a given Kubernetes namespace. It is responsible for parsing the label parameters,
// invoking the labeling operation, and logging the process. The Run method orchestrates these steps,
// handling any errors that occur during the execution and ensuring that the task's intent is
// fulfilled effectively.
func (c *CrewLabelPodsTaskRunner) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, taskName string, parameters map[string]interface{}, workerIndex int) error {
fields := navigator.CreateLogFields(
language.TaskLabelPods,
shipsnamespace,
navigator.WithAnyZapField(zap.String(language.Task_Name, taskName)),
)

labelKey, labelValue, err := extractLabelParameters(parameters)
if err != nil {
navigator.LogErrorWithEmojiRateLimited(language.PirateEmoji, language.InvalidParameters, fields...)
return err
}

navigator.LogInfoWithEmoji(language.PirateEmoji, fmt.Sprintf(language.StartWritingLabelPods, labelKey, labelValue), fields...)

err = LabelPods(ctx, clientset, shipsnamespace, labelKey, labelValue)
if err != nil {
errorFields := append(fields, zap.String(language.Error, err.Error()))
failedMessage := fmt.Sprintf("%v %s", constant.ErrorEmoji, language.ErrorFailedToWriteLabel)
navigator.LogErrorWithEmojiRateLimited(language.PirateEmoji, failedMessage, errorFields...)
return err
}
successMessage := fmt.Sprintf(language.WorkerSucessfully, labelKey, labelValue)
navigator.LogInfoWithEmoji(language.PirateEmoji, successMessage, fields...)
return nil
}

// performTask runs the specified task by finding the appropriate TaskRunner from the registry
// and invoking its Run method with the task's parameters.
func performTask(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task Task, workerIndex int) error {
Expand Down

0 comments on commit 69a556f

Please sign in to comment.