diff --git a/README.md b/README.md index d2eff60..d0c1b2a 100644 --- a/README.md +++ b/README.md @@ -106,16 +106,24 @@ In real-world applications, the complexity and cost can escalate quickly. `K8sBl - [x] **Error Handling and Retry Logic**: Successfully integrated error handling and retry mechanisms within the `CrewWorker` function to manage transient errors gracefully. - [ ] **Function Versatility and Configurability**: - - Enhance the versatility of the `CrewWorker` function. It currently processes tasks in a generic manner, but it could be extended to handle a wider variety of tasks with different complexities. - - Improve the configurability of task processing. The `CrewGetPodsTaskRunner.Run` method is specialized in listing pods; however, it should be adaptable to accommodate different parameters and settings for various task types. + - ~~Enhance the versatility of the `CrewWorker` function. It currently processes tasks in a generic manner, but it could be extended to handle a wider variety of tasks with different complexities.~~ + - ~~Improve the configurability of task processing. The `CrewGetPodsTaskRunner.Run` method is specialized in listing pods; however, it should be adaptable to accommodate different parameters and settings for various task types.~~ + - Plan to enhance the versatility of the `CrewWorker` function to handle tasks with varying complexities, not limited to pod health checks. + - Aim to improve the configurability of task processing to allow `CrewWorker` and other related functions to accommodate different parameters and settings for a variety of task types. + +## Structured Logging Integration +- [x] **Structured Logging**: Integrated structured logging throughout the package, providing clear and consistent logs with additional context for debugging. + +## Task Execution Model +- [x] **Dynamic Task Execution Model**: Implemented a dynamic task execution model that allows for registering and retrieving `TaskRunner` implementations based on task types, enhancing extensibility. ## Package Extension - [ ] **Support for Additional Kubernetes Resources**: - - Develop the capability to manage and interact with a broader range of Kubernetes resources beyond pods, such as services, deployments, and stateful sets. - - Implement operations that cater to specific resource requirements, enabling a more comprehensive management toolset within the Kubernetes ecosystem. + - Develop the capability to manage and interact with a broader range of Kubernetes resources, including services, deployments, and stateful sets. + - Plan to implement operations that cater to specific resource requirements, enabling a more comprehensive management toolset within the Kubernetes ecosystem. ## Monitoring and Metrics - [ ] **Metrics Collection Framework**: - Design and integrate a metrics collection system to monitor the health and efficiency of the worker processes. - Metrics should provide insights into the success rates of tasks, resource usage, processing times, and error rates. - - Consider using existing monitoring tools that can be integrated with Kubernetes to streamline the collection and visualization of metrics. + - Explore the possibility of leveraging existing monitoring tools that can be integrated with Kubernetes for streamlined metrics collection and visualization. diff --git a/language/init_const.go b/language/init_const.go index f10e795..b4f7b60 100644 --- a/language/init_const.go +++ b/language/init_const.go @@ -2,32 +2,35 @@ package language // Note: This constant used for translation. const ( - ErrorListingPods = "error listing pods: %w" - ErrorUpdatingPodLabels = "error updating pod labels: %w" - ErrorCreatingPod = "error creating pod: %w" - ErrorDeletingPod = "error deleting pod: %w" - ErrorGettingPod = "error getting pod: %w" - ErrorPodNotFound = "pod not found" - ErrorUpdatingPod = "Error updating pod: %w" - ErrorRetrievingPods = "Error retrieving pods: %w" - PodAndStatus = "Pod: %s, Status: %s" - PodAndStatusAndHealth = "Pod: %s, Status: %s, Health: %s" - errconfig = "cannot load kubeconfig: %w" - cannotcreatek8s = "cannot create kubernetes client: %w" - ErrorLoggerIsNotSet = "Logger is not set! Cannot log info: %s\n" - ErrorLogger = "cannot create logger: %w" - ErrorFailedToComplete = "Failed to complete task after %d attempts" - ContextCancelledAbort = "Context cancelled, aborting retries." - ContextCancelled = "Context cancelled" - ErrorDuringTaskAttempt = "Error during task, attempt %d/%d: %v" - UnknownTaskType = "unknown task type: %s" - InvalidParameters = "invalid parameters" - InvalidparametersL = "invalid parameters: labelSelector, fieldSelector, or limit" - ErrorPodsCancelled = "Pod processing was cancelled." - ErrorPailedtoListPods = "Failed to list pods: %w" - ErrorParamLabelSelector = "parameter 'labelSelector' is required and must be a string" - ErrorParamFieldSelector = "parameter 'fieldSelector' is required and must be a string" - ErrorParamLimit = "parameter 'limit' is required and must be an integer" + ErrorListingPods = "error listing pods: %w" + ErrorUpdatingPodLabels = "error updating pod labels: %w" + ErrorCreatingPod = "error creating pod: %w" + ErrorDeletingPod = "error deleting pod: %w" + ErrorGettingPod = "error getting pod: %w" + ErrorPodNotFound = "pod not found" + ErrorUpdatingPod = "Error updating pod: %w" + ErrorRetrievingPods = "Error retrieving pods: %w" + PodAndStatus = "Pod: %s, Status: %s" + PodAndStatusAndHealth = "Pod: %s, Status: %s, Health: %s" + errconfig = "cannot load kubeconfig: %w" + cannotcreatek8s = "cannot create kubernetes client: %w" + ErrorLoggerIsNotSet = "Logger is not set! Cannot log info: %s\n" + ErrorLogger = "cannot create logger: %w" + ErrorFailedToComplete = "Failed to complete task after %d attempts" + ContextCancelledAbort = "Context cancelled, aborting retries." + ContextCancelled = "Context cancelled" + ErrorDuringTaskAttempt = "Error during task, attempt %d/%d: %v" + UnknownTaskType = "unknown task type: %s" + InvalidParameters = "invalid parameters" + InvalidparametersL = "invalid parameters: labelSelector, fieldSelector, or limit" + ErrorPodsCancelled = "Pod processing was cancelled." + ErrorPailedtoListPods = "Failed to list pods: %w" + ErrorParamLabelSelector = "parameter 'labelSelector' is required and must be a string" + ErrorParamFieldSelector = "parameter 'fieldSelector' is required and must be a string" + ErrorParamLimit = "parameter 'limit' is required and must be an integer" + ErrorParamLabelKey = "parameter 'labelKey' is required and must be a string" + ErrorParamLabelabelValue = "parameter 'labelValue' is required and must be a string" + ErrorFailedToWriteLabel = "Failed to write label pods" ) const ( @@ -60,6 +63,7 @@ const ( RunningTaskBackup = "Running BackupTaskRunner with parameters:" Task_Name = "task_name" Worker_Name = "crew_worker" + TaskLabelPods = "WriteLabelPods" ) const ( @@ -72,6 +76,8 @@ const ( WorkerCountPods = "Count pods" WorkerCheckingHealth = "Checking health pods" CrewWorkerUnit = "crew_worker_unit" + StartWritingLabelPods = "Starting to writing label pods with %s=%s" + WorkerSucessfully = "Successfully labeled pods %v=%s" ) const ( @@ -86,6 +92,7 @@ const ( const ( Attempt = "attempt" Max_Retries = "max_retries" + Error = "error" ) const ( diff --git a/worker/docs.go b/worker/docs.go index 13a0fba..1a4fb5e 100644 --- a/worker/docs.go +++ b/worker/docs.go @@ -5,6 +5,17 @@ // The package is intended for applications running as pods within Kubernetes clusters // and leverages in-cluster configuration to establish a clientset for API interactions. // +// Enhancements in the latest version: +// +// - Structured logging has been integrated throughout the package, providing clear +// and consistent logging messages that are easier to read and debug. Logging now +// includes emojis for quick visual parsing and additional context such as task names +// and worker indices. +// +// - The dynamic task execution model allows for registering and retrieving TaskRunner +// implementations based on task types. This extensibility makes it possible to easily +// add new task handling logic without modifying the core package code. +// // # Functions // // - NewKubernetesClient: Creates a new Kubernetes clientset configured for in-cluster @@ -57,6 +68,9 @@ // - Logging functionality is customizable, allowing different workers to provide // unique contextual information, such as worker indices or specific namespaces. // +// - The dynamic task execution model supports adding new tasks and task runners +// without changing existing code, facilitating scalability and extensibility. +// // # TODO // // - Extend the functionality of the CrewWorker function to support a wider range diff --git a/worker/init.go b/worker/init.go index cdf2f2f..4952cd5 100644 --- a/worker/init.go +++ b/worker/init.go @@ -17,4 +17,7 @@ func init() { // Registers a TaskRunner for an alternate method of retrieving Kubernetes pods. RegisterTaskRunner("CrewGetPodsTaskRunner", func() TaskRunner { return &CrewGetPodsTaskRunner{} }) + // Registers a TaskRunner for labeling Kubernetes pods. + RegisterTaskRunner("CrewWriteLabelPods", func() TaskRunner { return &CrewLabelPodsTaskRunner{} }) + } diff --git a/worker/labels_pods.go b/worker/labels_pods.go index c11c55c..962f274 100644 --- a/worker/labels_pods.go +++ b/worker/labels_pods.go @@ -73,3 +73,36 @@ func labelSinglePod(ctx context.Context, clientset *kubernetes.Clientset, pod *c } return nil } + +// extractLabelParameters extracts and validates the label key and value from the parameters. +// It is used to ensure that the parameters provided for labeling operations are of the correct +// type and are present before proceeding with the operation. This function is crucial for +// maintaining type safety and preventing runtime errors that could occur when accessing the +// map directly. It acts as a safeguard, checking the existence and type of the label parameters +// before they are used to label pods. +// +// Parameters: +// - parameters: A map of interface{} values that should contain the labeling parameters. +// +// Returns: +// - labelKey: The extracted label key as a string if present and of type string. +// - labelValue: The extracted label value as a string if present and of type string. +// - err: An error if either the label key or value is missing from the parameters or is not a string. +// +// The function will return an error if the required parameters ("labelKey" and "labelValue") are +// not found in the input map, or if they are not of type string. This error can then be handled +// by the caller to ensure the labeling operation does not proceed with invalid parameters. +func extractLabelParameters(parameters map[string]interface{}) (labelKey string, labelValue string, err error) { + var ok bool + labelKey, ok = parameters["labelKey"].(string) + if !ok { + return "", "", fmt.Errorf(language.ErrorParamLabelKey) + } + + labelValue, ok = parameters["labelValue"].(string) + if !ok { + return "", "", fmt.Errorf(language.ErrorParamLabelabelValue) + } + + return labelKey, labelValue, nil +} diff --git a/worker/tasks_crew.go b/worker/tasks_crew.go index d39ee2b..b64d099 100644 --- a/worker/tasks_crew.go +++ b/worker/tasks_crew.go @@ -8,6 +8,7 @@ import ( "github.com/H0llyW00dzZ/K8sBlackPearl/language" "github.com/H0llyW00dzZ/K8sBlackPearl/navigator" + "github.com/H0llyW00dzZ/go-urlshortner/logmonitor/constant" "go.uber.org/zap" "k8s.io/client-go/kubernetes" ) @@ -133,6 +134,44 @@ func (c *CrewProcessCheckHealthTask) Run(ctx context.Context, clientset *kuberne return c.logResults(ctx, results) } +// CrewLabelPodsTaskRunner is an implementation of TaskRunner that labels all pods +// in a given Kubernetes namespace with a specific label. +type CrewLabelPodsTaskRunner struct { + workerIndex int +} + +// CrewLabelPodsTaskRunner is an implementation of the TaskRunner interface that applies a set of labels +// to all pods within a given Kubernetes namespace. It is responsible for parsing the label parameters, +// invoking the labeling operation, and logging the process. The Run method orchestrates these steps, +// handling any errors that occur during the execution and ensuring that the task's intent is +// fulfilled effectively. +func (c *CrewLabelPodsTaskRunner) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, taskName string, parameters map[string]interface{}, workerIndex int) error { + fields := navigator.CreateLogFields( + language.TaskLabelPods, + shipsnamespace, + navigator.WithAnyZapField(zap.String(language.Task_Name, taskName)), + ) + + labelKey, labelValue, err := extractLabelParameters(parameters) + if err != nil { + navigator.LogErrorWithEmojiRateLimited(language.PirateEmoji, language.InvalidParameters, fields...) + return err + } + + navigator.LogInfoWithEmoji(language.PirateEmoji, fmt.Sprintf(language.StartWritingLabelPods, labelKey, labelValue), fields...) + + err = LabelPods(ctx, clientset, shipsnamespace, labelKey, labelValue) + if err != nil { + errorFields := append(fields, zap.String(language.Error, err.Error())) + failedMessage := fmt.Sprintf("%v %s", constant.ErrorEmoji, language.ErrorFailedToWriteLabel) + navigator.LogErrorWithEmojiRateLimited(language.PirateEmoji, failedMessage, errorFields...) + return err + } + successMessage := fmt.Sprintf(language.WorkerSucessfully, labelKey, labelValue) + navigator.LogInfoWithEmoji(language.PirateEmoji, successMessage, fields...) + return nil +} + // performTask runs the specified task by finding the appropriate TaskRunner from the registry // and invoking its Run method with the task's parameters. func performTask(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task Task, workerIndex int) error {