Skip to content

Commit

Permalink
Handling K8s Version Conflict (#33)
Browse files Browse the repository at this point in the history
* Feat [Language] Constant

- [+] chore(init_const.go): reformat code and add missing constant 'ErrorFailedToCompleteTaskDueToConflict'
- [+] feat(init_const.go): add constant 'PodName' for pod name parameter

* Chore [Language] Update Constant

- [+] chore(init_const.go): add new error constant for failed to update labels for pod

* Feat [Task] Fetch Latest Version of Pods

- [+] feat(tasks_crew.go): add getLatestVersionOfPod function to fetch the latest version of a Pod from the Kubernetes API
- [+] fix(tasks_crew.go): import missing packages for getLatestVersionOfPod function

* Refactor [Worker] [Crew] Reduce gocyclo

- [+] refactor(crew.go): remove unused import statement
- [+] fix(crew.go): fix logFinalError function call by adding maxRetries parameter
- [+] fix(crew.go): fix condition for handling task error in performTaskWithRetries function
- [+] refactor(crew.go): remove unused code and comments
- [+] feat(crew.go): add resolveConflict function to handle pod conflict resolution
- [+] fix(worker): add missing import statements for context, time, apierrors, and kubernetes
- [+] refactor(worker): add maxRetries parameter to logRetryAttempt and logFinalError functions
- [+] feat(worker): add handleTaskError function to handle task errors and retries

* Chore [README] [Docs]

- [+] chore(README.md): add tasks for managing pods in Kubernetes cluster

* Feat [Worker] [Labels Pods] Resource Version

- [+] feat(labels_pods.go): add labelSinglePodWithResourceVersion function to apply label to a single pod if it doesn't already have it
- [+] feat(labels_pods.go): add fetchLatestPodVersion function to retrieve the most recent version of the pod
- [+] feat(labels_pods.go): add shouldUpdatePod function to determine if the pod's labels need to be updated
- [+] feat(labels_pods.go): add updatePodLabels function to apply the update to the pod's labels using a strategic merge patch
- [+] feat(labels_pods.go): add getUpdatedLabels function to construct a new labels map containing the updated label
- [+] feat(labels_pods.go): add wrapPodError function to enrich the provided error with additional context
  • Loading branch information
H0llyW00dzZ authored Dec 24, 2023
1 parent 69a556f commit 39b312e
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 64 deletions.
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,41 @@ In real-world applications, the complexity and cost can escalate quickly. `K8sBl
"fieldSelector": "status.phase=Running",
"limit": 10
}
},
{
"name": "list-specific-pods-run",
"type": "CrewGetPodsTaskRunner",
"parameters": {
"labelSelector": "app=nginx",
"fieldSelector": "status.phase=Running",
"limit": 10
}
},
{
"name": "check-health-pods",
"type": "CrewCheckHealthPods",
"parameters": {
"labelSelector": "app=nginx",
"fieldSelector": "status.phase=Running",
"limit": 10
}
},
{
"name": "label-all-pods",
"type": "CrewWriteLabelPods",
"parameters": {
"labelKey": "environment",
"labelValue": "production"
}
},
{
"name": "update-specific-pod",
"type": "CrewWriteLabelPods",
"parameters": {
"podName": "pod-name",
"labelKey": "environment",
"labelValue": "production"
}
}
]
```
Expand Down
69 changes: 37 additions & 32 deletions language/init_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,38 @@ package language

// Note: This constant used for translation.
const (
ErrorListingPods = "error listing pods: %w"
ErrorUpdatingPodLabels = "error updating pod labels: %w"
ErrorCreatingPod = "error creating pod: %w"
ErrorDeletingPod = "error deleting pod: %w"
ErrorGettingPod = "error getting pod: %w"
ErrorPodNotFound = "pod not found"
ErrorUpdatingPod = "Error updating pod: %w"
ErrorRetrievingPods = "Error retrieving pods: %w"
PodAndStatus = "Pod: %s, Status: %s"
PodAndStatusAndHealth = "Pod: %s, Status: %s, Health: %s"
errconfig = "cannot load kubeconfig: %w"
cannotcreatek8s = "cannot create kubernetes client: %w"
ErrorLoggerIsNotSet = "Logger is not set! Cannot log info: %s\n"
ErrorLogger = "cannot create logger: %w"
ErrorFailedToComplete = "Failed to complete task after %d attempts"
ContextCancelledAbort = "Context cancelled, aborting retries."
ContextCancelled = "Context cancelled"
ErrorDuringTaskAttempt = "Error during task, attempt %d/%d: %v"
UnknownTaskType = "unknown task type: %s"
InvalidParameters = "invalid parameters"
InvalidparametersL = "invalid parameters: labelSelector, fieldSelector, or limit"
ErrorPodsCancelled = "Pod processing was cancelled."
ErrorPailedtoListPods = "Failed to list pods: %w"
ErrorParamLabelSelector = "parameter 'labelSelector' is required and must be a string"
ErrorParamFieldSelector = "parameter 'fieldSelector' is required and must be a string"
ErrorParamLimit = "parameter 'limit' is required and must be an integer"
ErrorParamLabelKey = "parameter 'labelKey' is required and must be a string"
ErrorParamLabelabelValue = "parameter 'labelValue' is required and must be a string"
ErrorFailedToWriteLabel = "Failed to write label pods"
ErrorListingPods = "error listing pods: %w"
ErrorUpdatingPodLabels = "error updating pod labels: %w"
ErrorCreatingPod = "error creating pod: %w"
ErrorDeletingPod = "error deleting pod: %w"
ErrorGettingPod = "error getting pod: %w"
ErrorPodNotFound = "pod not found"
ErrorUpdatingPod = "Error updating pod: %w"
ErrorRetrievingPods = "Error retrieving pods: %w"
PodAndStatus = "Pod: %s, Status: %s"
PodAndStatusAndHealth = "Pod: %s, Status: %s, Health: %s"
errconfig = "cannot load kubeconfig: %w"
cannotcreatek8s = "cannot create kubernetes client: %w"
ErrorLoggerIsNotSet = "Logger is not set! Cannot log info: %s\n"
ErrorLogger = "cannot create logger: %w"
ErrorFailedToComplete = "Failed to complete task after %d attempts"
ContextCancelledAbort = "Context cancelled, aborting retries."
ContextCancelled = "Context cancelled"
ErrorDuringTaskAttempt = "Error during task, attempt %d/%d: %v"
UnknownTaskType = "unknown task type: %s"
InvalidParameters = "invalid parameters"
InvalidparametersL = "invalid parameters: labelSelector, fieldSelector, or limit"
ErrorPodsCancelled = "Pod processing was cancelled."
ErrorPailedtoListPods = "Failed to list pods: %w"
ErrorParamLabelSelector = "parameter 'labelSelector' is required and must be a string"
ErrorParamFieldSelector = "parameter 'fieldSelector' is required and must be a string"
ErrorParamLimit = "parameter 'limit' is required and must be an integer"
ErrorParamLabelKey = "parameter 'labelKey' is required and must be a string"
ErrorParamLabelabelValue = "parameter 'labelValue' is required and must be a string"
ErrorFailedToWriteLabel = "Failed to write label pods"
ErrorFailedToCompleteTaskDueToConflict = "Failed to complete task %s after %d attempts due to conflict: %v"
ErrorPodNameParameter = "podName parameter is missing or not a string"
ErrorFailedToUpdateLabelSPods = "Failed to update labels for pod %s: %v"
)

const (
Expand All @@ -42,6 +45,7 @@ const (
Pods = "pods"
Phase = "phase"
healthStatus = "healthStatus"
PodName = "podName"
)

const (
Expand Down Expand Up @@ -90,9 +94,10 @@ const (
)

const (
Attempt = "attempt"
Max_Retries = "max_retries"
Error = "error"
Attempt = "attempt"
Max_Retries = "max_retries"
Error = "error"
ResourceVersion = "resourceVersion"
)

const (
Expand Down
47 changes: 20 additions & 27 deletions worker/crew.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/H0llyW00dzZ/K8sBlackPearl/language"
"github.com/H0llyW00dzZ/K8sBlackPearl/navigator"
"github.com/H0llyW00dzZ/go-urlshortner/logmonitor/constant"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -44,7 +43,7 @@ func CrewWorker(ctx context.Context, clientset *kubernetes.Clientset, shipsNames
if err != nil {
// If the task fails, you can choose to release it for retrying.
taskStatus.Release(task.Name)
logFinalError(shipsNamespace, task.Name, err)
logFinalError(shipsNamespace, task.Name, err, maxRetries)
results <- err.Error()
} else {
// If the task is successful, it remains claimed to prevent retries.
Expand All @@ -71,38 +70,32 @@ func CrewWorker(ctx context.Context, clientset *kubernetes.Clientset, shipsNames
func performTaskWithRetries(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task Task, results chan<- string, workerIndex int) error {
for attempt := 0; attempt < maxRetries; attempt++ {
err := performTask(ctx, clientset, shipsnamespace, task, workerIndex)
if err == nil {
if err != nil {
if !handleTaskError(ctx, clientset, shipsnamespace, err, attempt, &task, workerIndex, maxRetries, retryDelay) {
return fmt.Errorf(language.ErrorFailedToCompleteTask, task.Name, maxRetries)
}
} else {
results <- fmt.Sprintf(language.TaskWorker_Name, workerIndex, fmt.Sprintf(language.TaskCompleteS, task.Name))
return nil
}

if ctx.Err() != nil {
return fmt.Errorf(language.ContextCancelled)
}

fieldslog := navigator.CreateLogFields(
language.TaskFetchPods,
shipsnamespace,
navigator.WithAnyZapField(zap.Int(language.Worker_Name, workerIndex)),
navigator.WithAnyZapField(zap.Int(language.Attempt, attempt+1)),
navigator.WithAnyZapField(zap.Int(language.Max_Retries, maxRetries)),
navigator.WithAnyZapField(zap.String(language.Task_Name, task.Name)),
)
// magic goes here, append fields log ":=" into binaries lmao
retryMessage := fmt.Sprintf("%s %s", constant.ErrorEmoji, fmt.Sprintf(language.RetryingTask, attempt+1, maxRetries))
navigator.LogInfoWithEmoji(
language.PirateEmoji,
fmt.Sprintf(language.TaskWorker_Name, workerIndex, retryMessage),
fieldslog...,
)

logRetryAttempt(task.Name, attempt, err)
time.Sleep(retryDelay)
}

return fmt.Errorf(language.ErrorFailedToCompleteTask, task.Name, maxRetries)
}

func resolveConflict(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task *Task) error {
podName, ok := task.Parameters[language.PodName].(string)
if !ok {
return fmt.Errorf(language.ErrorPodNameParameter)
}
updatedPod, err := getLatestVersionOfPod(ctx, clientset, shipsnamespace, podName)
if err != nil {
return err // Return the error if we can't get the latest version.
}
// Update task parameters with the new pod information.
task.Parameters[language.ResourceVersion] = updatedPod.ResourceVersion
return nil
}

// CrewProcessPods iterates over a list of pods to evaluate their health.
// It sends a health status message for each pod to the results channel.
// If the context is cancelled during the process, it logs the cancellation
Expand Down
65 changes: 61 additions & 4 deletions worker/error_and_retry.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
package worker

import (
"context"
"fmt"
"time"

"github.com/H0llyW00dzZ/K8sBlackPearl/language"
"github.com/H0llyW00dzZ/K8sBlackPearl/navigator"
"github.com/H0llyW00dzZ/go-urlshortner/logmonitor/constant"
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
)

// logRetryAttempt logs a warning message indicating a task retry attempt with the current count.
// It includes the task name and the error that prompted the retry.
// It includes the task name and the error that prompted the retry. The maxRetries variable is used
// to indicate the total number of allowed retries.
//
// Parameters:
// - taskName: The name of the task being attempted.
// - attempt: The current retry attempt number.
// - err: The error encountered during the task execution that prompted the retry.
func logRetryAttempt(taskName string, attempt int, err error) {
// - maxRetries: The maximum number of retry attempts.
func logRetryAttempt(taskName string, attempt int, err error, maxRetries int) {
navigator.LogErrorWithEmojiRateLimited(
constant.ErrorEmoji,
fmt.Sprintf(language.ErrorDuringTaskAttempt, attempt+1, maxRetries, err),
Expand All @@ -26,13 +32,15 @@ func logRetryAttempt(taskName string, attempt int, err error) {
}

// logFinalError logs an error message signaling the final failure of a task after all retries.
// It includes the task name and the error returned from the last attempt.
// It includes the task name and the error returned from the last attempt. The maxRetries variable is used
// to indicate the total number of allowed retries.
//
// Parameters:
// - shipsnamespace: The namespace where the task was attempted.
// - taskName: The name of the task that failed.
// - err: The final error encountered that resulted in the task failure.
func logFinalError(shipsnamespace string, taskName string, err error) {
// - maxRetries: The maximum number of retry attempts.
func logFinalError(shipsnamespace string, taskName string, err error, maxRetries int) {
finalErrorMessage := fmt.Sprintf(language.ErrorFailedToCompleteTask, taskName, maxRetries)
navigator.LogErrorWithEmojiRateLimited(
constant.ErrorEmoji,
Expand All @@ -42,3 +50,52 @@ func logFinalError(shipsnamespace string, taskName string, err error) {
zap.Error(err),
)
}

// handleTaskError processes an error encountered during task execution, determining whether to retry the task.
// It takes into account the context's cancellation state, conflict errors and employs a delay between retries.
//
// Parameters:
// - ctx: The context governing cancellation.
// - clientset: The Kubernetes client set.
// - shipsnamespace: The namespace where the task was attempted.
// - err: The error encountered during the task execution.
// - attempt: The current retry attempt number.
// - task: The task being attempted.
// - workerIndex: The index of the worker processing the task.
// - maxRetries: The maximum number of retry attempts.
// - retryDelay: The duration to wait before retrying the task.
//
// Returns:
// - shouldContinue: A boolean indicating whether the task should be retried.
func handleTaskError(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, err error, attempt int, task *Task, workerIndex int, maxRetries int, retryDelay time.Duration) (shouldContinue bool) {
if ctx.Err() != nil {
return false
}

if apierrors.IsConflict(err) {
if resolveErr := resolveConflict(ctx, clientset, shipsnamespace, task); resolveErr != nil {
return false // Return the error from resolveConflict.
}
return true // Retry immediately after resolving conflict.
}

fieldslog := navigator.CreateLogFields(
language.TaskFetchPods,
shipsnamespace,
navigator.WithAnyZapField(zap.Int(language.Worker_Name, workerIndex)),
navigator.WithAnyZapField(zap.Int(language.Attempt, attempt+1)),
navigator.WithAnyZapField(zap.Int(language.Max_Retries, maxRetries)),
navigator.WithAnyZapField(zap.String(language.Task_Name, task.Name)),
)
// magic goes here, append fields log ":=" into binaries lmao
retryMessage := fmt.Sprintf("%s %s", constant.ErrorEmoji, fmt.Sprintf(language.RetryingTask, attempt+1, maxRetries))
navigator.LogInfoWithEmoji(
language.PirateEmoji,
fmt.Sprintf(language.TaskWorker_Name, workerIndex, retryMessage),
fieldslog...,
)

logRetryAttempt(task.Name, attempt, err, maxRetries)
time.Sleep(retryDelay)
return true
}
Loading

0 comments on commit 39b312e

Please sign in to comment.