diff --git a/README.md b/README.md index d0c1b2a..dcfc13c 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,41 @@ In real-world applications, the complexity and cost can escalate quickly. `K8sBl "fieldSelector": "status.phase=Running", "limit": 10 } + }, + { + "name": "list-specific-pods-run", + "type": "CrewGetPodsTaskRunner", + "parameters": { + "labelSelector": "app=nginx", + "fieldSelector": "status.phase=Running", + "limit": 10 + } + }, + { + "name": "check-health-pods", + "type": "CrewCheckHealthPods", + "parameters": { + "labelSelector": "app=nginx", + "fieldSelector": "status.phase=Running", + "limit": 10 + } + }, + { + "name": "label-all-pods", + "type": "CrewWriteLabelPods", + "parameters": { + "labelKey": "environment", + "labelValue": "production" + } + }, + { + "name": "update-specific-pod", + "type": "CrewWriteLabelPods", + "parameters": { + "podName": "pod-name", + "labelKey": "environment", + "labelValue": "production" + } } ] ``` diff --git a/language/init_const.go b/language/init_const.go index b4f7b60..02617fb 100644 --- a/language/init_const.go +++ b/language/init_const.go @@ -2,35 +2,38 @@ package language // Note: This constant used for translation. const ( - ErrorListingPods = "error listing pods: %w" - ErrorUpdatingPodLabels = "error updating pod labels: %w" - ErrorCreatingPod = "error creating pod: %w" - ErrorDeletingPod = "error deleting pod: %w" - ErrorGettingPod = "error getting pod: %w" - ErrorPodNotFound = "pod not found" - ErrorUpdatingPod = "Error updating pod: %w" - ErrorRetrievingPods = "Error retrieving pods: %w" - PodAndStatus = "Pod: %s, Status: %s" - PodAndStatusAndHealth = "Pod: %s, Status: %s, Health: %s" - errconfig = "cannot load kubeconfig: %w" - cannotcreatek8s = "cannot create kubernetes client: %w" - ErrorLoggerIsNotSet = "Logger is not set! Cannot log info: %s\n" - ErrorLogger = "cannot create logger: %w" - ErrorFailedToComplete = "Failed to complete task after %d attempts" - ContextCancelledAbort = "Context cancelled, aborting retries." - ContextCancelled = "Context cancelled" - ErrorDuringTaskAttempt = "Error during task, attempt %d/%d: %v" - UnknownTaskType = "unknown task type: %s" - InvalidParameters = "invalid parameters" - InvalidparametersL = "invalid parameters: labelSelector, fieldSelector, or limit" - ErrorPodsCancelled = "Pod processing was cancelled." - ErrorPailedtoListPods = "Failed to list pods: %w" - ErrorParamLabelSelector = "parameter 'labelSelector' is required and must be a string" - ErrorParamFieldSelector = "parameter 'fieldSelector' is required and must be a string" - ErrorParamLimit = "parameter 'limit' is required and must be an integer" - ErrorParamLabelKey = "parameter 'labelKey' is required and must be a string" - ErrorParamLabelabelValue = "parameter 'labelValue' is required and must be a string" - ErrorFailedToWriteLabel = "Failed to write label pods" + ErrorListingPods = "error listing pods: %w" + ErrorUpdatingPodLabels = "error updating pod labels: %w" + ErrorCreatingPod = "error creating pod: %w" + ErrorDeletingPod = "error deleting pod: %w" + ErrorGettingPod = "error getting pod: %w" + ErrorPodNotFound = "pod not found" + ErrorUpdatingPod = "Error updating pod: %w" + ErrorRetrievingPods = "Error retrieving pods: %w" + PodAndStatus = "Pod: %s, Status: %s" + PodAndStatusAndHealth = "Pod: %s, Status: %s, Health: %s" + errconfig = "cannot load kubeconfig: %w" + cannotcreatek8s = "cannot create kubernetes client: %w" + ErrorLoggerIsNotSet = "Logger is not set! Cannot log info: %s\n" + ErrorLogger = "cannot create logger: %w" + ErrorFailedToComplete = "Failed to complete task after %d attempts" + ContextCancelledAbort = "Context cancelled, aborting retries." + ContextCancelled = "Context cancelled" + ErrorDuringTaskAttempt = "Error during task, attempt %d/%d: %v" + UnknownTaskType = "unknown task type: %s" + InvalidParameters = "invalid parameters" + InvalidparametersL = "invalid parameters: labelSelector, fieldSelector, or limit" + ErrorPodsCancelled = "Pod processing was cancelled." + ErrorPailedtoListPods = "Failed to list pods: %w" + ErrorParamLabelSelector = "parameter 'labelSelector' is required and must be a string" + ErrorParamFieldSelector = "parameter 'fieldSelector' is required and must be a string" + ErrorParamLimit = "parameter 'limit' is required and must be an integer" + ErrorParamLabelKey = "parameter 'labelKey' is required and must be a string" + ErrorParamLabelabelValue = "parameter 'labelValue' is required and must be a string" + ErrorFailedToWriteLabel = "Failed to write label pods" + ErrorFailedToCompleteTaskDueToConflict = "Failed to complete task %s after %d attempts due to conflict: %v" + ErrorPodNameParameter = "podName parameter is missing or not a string" + ErrorFailedToUpdateLabelSPods = "Failed to update labels for pod %s: %v" ) const ( @@ -42,6 +45,7 @@ const ( Pods = "pods" Phase = "phase" healthStatus = "healthStatus" + PodName = "podName" ) const ( @@ -90,9 +94,10 @@ const ( ) const ( - Attempt = "attempt" - Max_Retries = "max_retries" - Error = "error" + Attempt = "attempt" + Max_Retries = "max_retries" + Error = "error" + ResourceVersion = "resourceVersion" ) const ( diff --git a/worker/crew.go b/worker/crew.go index 2371259..418204d 100644 --- a/worker/crew.go +++ b/worker/crew.go @@ -7,7 +7,6 @@ import ( "github.com/H0llyW00dzZ/K8sBlackPearl/language" "github.com/H0llyW00dzZ/K8sBlackPearl/navigator" - "github.com/H0llyW00dzZ/go-urlshortner/logmonitor/constant" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" @@ -44,7 +43,7 @@ func CrewWorker(ctx context.Context, clientset *kubernetes.Clientset, shipsNames if err != nil { // If the task fails, you can choose to release it for retrying. taskStatus.Release(task.Name) - logFinalError(shipsNamespace, task.Name, err) + logFinalError(shipsNamespace, task.Name, err, maxRetries) results <- err.Error() } else { // If the task is successful, it remains claimed to prevent retries. @@ -71,38 +70,32 @@ func CrewWorker(ctx context.Context, clientset *kubernetes.Clientset, shipsNames func performTaskWithRetries(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task Task, results chan<- string, workerIndex int) error { for attempt := 0; attempt < maxRetries; attempt++ { err := performTask(ctx, clientset, shipsnamespace, task, workerIndex) - if err == nil { + if err != nil { + if !handleTaskError(ctx, clientset, shipsnamespace, err, attempt, &task, workerIndex, maxRetries, retryDelay) { + return fmt.Errorf(language.ErrorFailedToCompleteTask, task.Name, maxRetries) + } + } else { results <- fmt.Sprintf(language.TaskWorker_Name, workerIndex, fmt.Sprintf(language.TaskCompleteS, task.Name)) return nil } - - if ctx.Err() != nil { - return fmt.Errorf(language.ContextCancelled) - } - - fieldslog := navigator.CreateLogFields( - language.TaskFetchPods, - shipsnamespace, - navigator.WithAnyZapField(zap.Int(language.Worker_Name, workerIndex)), - navigator.WithAnyZapField(zap.Int(language.Attempt, attempt+1)), - navigator.WithAnyZapField(zap.Int(language.Max_Retries, maxRetries)), - navigator.WithAnyZapField(zap.String(language.Task_Name, task.Name)), - ) - // magic goes here, append fields log ":=" into binaries lmao - retryMessage := fmt.Sprintf("%s %s", constant.ErrorEmoji, fmt.Sprintf(language.RetryingTask, attempt+1, maxRetries)) - navigator.LogInfoWithEmoji( - language.PirateEmoji, - fmt.Sprintf(language.TaskWorker_Name, workerIndex, retryMessage), - fieldslog..., - ) - - logRetryAttempt(task.Name, attempt, err) - time.Sleep(retryDelay) } - return fmt.Errorf(language.ErrorFailedToCompleteTask, task.Name, maxRetries) } +func resolveConflict(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task *Task) error { + podName, ok := task.Parameters[language.PodName].(string) + if !ok { + return fmt.Errorf(language.ErrorPodNameParameter) + } + updatedPod, err := getLatestVersionOfPod(ctx, clientset, shipsnamespace, podName) + if err != nil { + return err // Return the error if we can't get the latest version. + } + // Update task parameters with the new pod information. + task.Parameters[language.ResourceVersion] = updatedPod.ResourceVersion + return nil +} + // CrewProcessPods iterates over a list of pods to evaluate their health. // It sends a health status message for each pod to the results channel. // If the context is cancelled during the process, it logs the cancellation diff --git a/worker/error_and_retry.go b/worker/error_and_retry.go index 6deb2bc..4692d77 100644 --- a/worker/error_and_retry.go +++ b/worker/error_and_retry.go @@ -1,22 +1,28 @@ package worker import ( + "context" "fmt" + "time" "github.com/H0llyW00dzZ/K8sBlackPearl/language" "github.com/H0llyW00dzZ/K8sBlackPearl/navigator" "github.com/H0llyW00dzZ/go-urlshortner/logmonitor/constant" "go.uber.org/zap" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/kubernetes" ) // logRetryAttempt logs a warning message indicating a task retry attempt with the current count. -// It includes the task name and the error that prompted the retry. +// It includes the task name and the error that prompted the retry. The maxRetries variable is used +// to indicate the total number of allowed retries. // // Parameters: // - taskName: The name of the task being attempted. // - attempt: The current retry attempt number. // - err: The error encountered during the task execution that prompted the retry. -func logRetryAttempt(taskName string, attempt int, err error) { +// - maxRetries: The maximum number of retry attempts. +func logRetryAttempt(taskName string, attempt int, err error, maxRetries int) { navigator.LogErrorWithEmojiRateLimited( constant.ErrorEmoji, fmt.Sprintf(language.ErrorDuringTaskAttempt, attempt+1, maxRetries, err), @@ -26,13 +32,15 @@ func logRetryAttempt(taskName string, attempt int, err error) { } // logFinalError logs an error message signaling the final failure of a task after all retries. -// It includes the task name and the error returned from the last attempt. +// It includes the task name and the error returned from the last attempt. The maxRetries variable is used +// to indicate the total number of allowed retries. // // Parameters: // - shipsnamespace: The namespace where the task was attempted. // - taskName: The name of the task that failed. // - err: The final error encountered that resulted in the task failure. -func logFinalError(shipsnamespace string, taskName string, err error) { +// - maxRetries: The maximum number of retry attempts. +func logFinalError(shipsnamespace string, taskName string, err error, maxRetries int) { finalErrorMessage := fmt.Sprintf(language.ErrorFailedToCompleteTask, taskName, maxRetries) navigator.LogErrorWithEmojiRateLimited( constant.ErrorEmoji, @@ -42,3 +50,52 @@ func logFinalError(shipsnamespace string, taskName string, err error) { zap.Error(err), ) } + +// handleTaskError processes an error encountered during task execution, determining whether to retry the task. +// It takes into account the context's cancellation state, conflict errors and employs a delay between retries. +// +// Parameters: +// - ctx: The context governing cancellation. +// - clientset: The Kubernetes client set. +// - shipsnamespace: The namespace where the task was attempted. +// - err: The error encountered during the task execution. +// - attempt: The current retry attempt number. +// - task: The task being attempted. +// - workerIndex: The index of the worker processing the task. +// - maxRetries: The maximum number of retry attempts. +// - retryDelay: The duration to wait before retrying the task. +// +// Returns: +// - shouldContinue: A boolean indicating whether the task should be retried. +func handleTaskError(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, err error, attempt int, task *Task, workerIndex int, maxRetries int, retryDelay time.Duration) (shouldContinue bool) { + if ctx.Err() != nil { + return false + } + + if apierrors.IsConflict(err) { + if resolveErr := resolveConflict(ctx, clientset, shipsnamespace, task); resolveErr != nil { + return false // Return the error from resolveConflict. + } + return true // Retry immediately after resolving conflict. + } + + fieldslog := navigator.CreateLogFields( + language.TaskFetchPods, + shipsnamespace, + navigator.WithAnyZapField(zap.Int(language.Worker_Name, workerIndex)), + navigator.WithAnyZapField(zap.Int(language.Attempt, attempt+1)), + navigator.WithAnyZapField(zap.Int(language.Max_Retries, maxRetries)), + navigator.WithAnyZapField(zap.String(language.Task_Name, task.Name)), + ) + // magic goes here, append fields log ":=" into binaries lmao + retryMessage := fmt.Sprintf("%s %s", constant.ErrorEmoji, fmt.Sprintf(language.RetryingTask, attempt+1, maxRetries)) + navigator.LogInfoWithEmoji( + language.PirateEmoji, + fmt.Sprintf(language.TaskWorker_Name, workerIndex, retryMessage), + fieldslog..., + ) + + logRetryAttempt(task.Name, attempt, err, maxRetries) + time.Sleep(retryDelay) + return true +} diff --git a/worker/labels_pods.go b/worker/labels_pods.go index 962f274..9497fe7 100644 --- a/worker/labels_pods.go +++ b/worker/labels_pods.go @@ -2,14 +2,137 @@ package worker import ( "context" + "encoding/json" "fmt" "github.com/H0llyW00dzZ/K8sBlackPearl/language" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" ) +// labelSinglePodWithResourceVersion applies the label to a single pod if it doesn't already have it. +// It fetches the latest version of the pod to ensure the update is based on the current state of the pod. +// +// Parameters: +// - ctx: A context.Context for managing cancellation and deadlines. +// - clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. +// - podName: The name of the pod to label. +// - namespace: The namespace in which the pod is located. +// - labelKey: The key of the label to be added or updated. +// - labelValue: The value for the label. +// +// Returns: +// - An error if the pod cannot be retrieved or updated with the new label. +func labelSinglePodWithResourceVersion(ctx context.Context, clientset *kubernetes.Clientset, podName, namespace, labelKey, labelValue string) error { + latestPod, err := fetchLatestPodVersion(ctx, clientset, podName, namespace) + if err != nil { + return wrapPodError(podName, err) + } + + if shouldUpdatePod(latestPod, labelKey, labelValue) { + return updatePodLabels(ctx, clientset, latestPod, namespace, podName, labelKey, labelValue) + } + + return nil +} + +// fetchLatestPodVersion retrieves the most recent version of the pod from the Kubernetes API. +// This is necessary to avoid conflicts when updating the pod's labels. +// +// Parameters: +// - ctx: A context.Context for managing cancellation and deadlines. +// - clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. +// - podName: The name of the pod to retrieve. +// - namespace: The namespace in which the pod is located. +// +// Returns: +// - A pointer to the retrieved corev1.Pod instance. +// - An error if the pod cannot be retrieved. +func fetchLatestPodVersion(ctx context.Context, clientset *kubernetes.Clientset, podName, namespace string) (*corev1.Pod, error) { + return clientset.CoreV1().Pods(namespace).Get(ctx, podName, v1.GetOptions{}) +} + +// shouldUpdatePod determines if the pod's labels need to be updated with the new labelKey and labelValue. +// It compares the existing labels of the pod to the desired label. +// +// Parameters: +// - pod: A pointer to the corev1.Pod instance to check. +// - labelKey: The key of the label to be added or updated. +// - labelValue: The value for the label. +// +// Returns: +// - True if the pod needs to be updated, false otherwise. +func shouldUpdatePod(pod *corev1.Pod, labelKey, labelValue string) bool { + return pod.Labels[labelKey] != labelValue +} + +// updatePodLabels applies the update to the pod's labels using a strategic merge patch. +// It ensures that only the labels are updated, leaving the rest of the pod configuration unchanged. +// +// Parameters: +// - ctx: A context.Context for managing cancellation and deadlines. +// - clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. +// - pod: A pointer to the corev1.Pod instance to update. +// - namespace: The namespace in which the pod is located. +// - podName: The name of the pod to update. +// - labelKey: The key of the label to be added or updated. +// - labelValue: The value for the label. +// +// Returns: +// - An error if the patch cannot be created or applied to the pod. +func updatePodLabels(ctx context.Context, clientset *kubernetes.Clientset, pod *corev1.Pod, namespace, podName, labelKey, labelValue string) error { + pod.Labels = getUpdatedLabels(pod.Labels, labelKey, labelValue) + + patchData, err := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": pod.Labels, + }, + }) + if err != nil { + return wrapPodError(podName, err) + } + + _, err = clientset.CoreV1().Pods(namespace).Patch(ctx, podName, types.StrategicMergePatchType, patchData, v1.PatchOptions{}) + if err != nil { + return wrapPodError(podName, err) + } + + return nil +} + +// getUpdatedLabels constructs a new labels map containing the updated label. +// If the original labels map is nil, it initializes a new map before adding the label. +// +// Parameters: +// - labels: The original map of labels to update. +// - labelKey: The key of the label to be added or updated. +// - labelValue: The value for the label. +// +// Returns: +// - A new map of labels with the updated label included. +func getUpdatedLabels(labels map[string]string, labelKey, labelValue string) map[string]string { + if labels == nil { + labels = make(map[string]string) + } + labels[labelKey] = labelValue + return labels +} + +// wrapPodError enriches the provided error with additional context, specifically mentioning the pod name. +// This helps in identifying which pod encountered the error when multiple pods are being processed. +// +// Parameters: +// - podName: The name of the pod related to the error. +// - err: The original error to wrap with additional context. +// +// Returns: +// - An error that includes the pod name and the original error message. +func wrapPodError(podName string, err error) error { + return fmt.Errorf(language.ErrorFailedToUpdateLabelSPods, podName, err) +} + // LabelPods sets a specific label on all pods within the specified namespace that do not already have it. // This function iterates over all pods in the namespace and delegates the labeling of each individual pod // to the labelSinglePod function. @@ -32,7 +155,7 @@ func LabelPods(ctx context.Context, clientset *kubernetes.Clientset, namespace, // Iterate over the list of pods and update their labels if necessary. for _, pod := range pods.Items { - if err := labelSinglePod(ctx, clientset, &pod, namespace, labelKey, labelValue); err != nil { + if err := labelSinglePodWithResourceVersion(ctx, clientset, pod.Name, namespace, labelKey, labelValue); err != nil { return err } } diff --git a/worker/tasks_crew.go b/worker/tasks_crew.go index b64d099..c27f2a7 100644 --- a/worker/tasks_crew.go +++ b/worker/tasks_crew.go @@ -10,6 +10,8 @@ import ( "github.com/H0llyW00dzZ/K8sBlackPearl/navigator" "github.com/H0llyW00dzZ/go-urlshortner/logmonitor/constant" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) @@ -172,6 +174,16 @@ func (c *CrewLabelPodsTaskRunner) Run(ctx context.Context, clientset *kubernetes return nil } +// getLatestVersionOfPod fetches the latest version of the Pod from the Kubernetes API. +func getLatestVersionOfPod(ctx context.Context, clientset *kubernetes.Clientset, namespace string, podName string) (*corev1.Pod, error) { + // Fetch the latest version of the Pod using the clientset. + latestPod, err := clientset.CoreV1().Pods(namespace).Get(ctx, podName, v1.GetOptions{}) + if err != nil { + return nil, err + } + return latestPod, nil +} + // performTask runs the specified task by finding the appropriate TaskRunner from the registry // and invoking its Run method with the task's parameters. func performTask(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task Task, workerIndex int) error {