Skip to content

Commit

Permalink
Reduce Complexities 🤪 (#64)
Browse files Browse the repository at this point in the history
* Feat [Language] Update Constants

- [+] feat(init_const.go): add ErrorFailedToCompleteAfterAttempts constant for handling failure after multiple attempts

* Feat [Worker] Update Constants

- [+] chore(worker): add new constants for task, attempt, and maxRetries
- [+] refactor(worker): rename constant policyNamE to policyName
- [+] refactor(worker): rename constant policySpeC to policySpec
- [+] refactor(worker): rename constant retryDelay to retryDelay

* Feat [Worker] [Helper Function] Retry Policy Tasks

- [+] refactor(worker): remove unused code and rename function withRetries to Execute in RetryPolicy struct
- [+] feat(worker): add RetryPolicy struct and Execute method to handle retrying operations

* Feat [Crew] Syncing Tasks Retries with Retry Policy

- [+] refactor(crew.go): extract retry logic into a separate function
- [+] feat(crew.go): add RetryPolicy struct to encapsulate retry settings
- [+] feat(crew.go): use RetryPolicy's Execute method to perform task with retries
- [+] fix(crew.go): update error message when task fails to complete

* Refactor [Worker] [Error and Retry] Syncing with Retry Policy

- [+] fix(worker): fix function signature of logRetryAttempt to include logFunc parameter
- [+] refactor(worker): refactor handleGenericError to use waitForNextAttempt function instead of checking context cancellation directly

* Chore [Crew] [Task Retries] Update Log Functions

- [+] fix(crew.go): import constant package for ErrorEmoji constant
- [+] chore(crew.go): update log function to use constant.ErrorEmoji
  • Loading branch information
H0llyW00dzZ authored Dec 31, 2023
1 parent 34d8a8e commit 7c6dad5
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 61 deletions.
1 change: 1 addition & 0 deletions language/init_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
ErrorSailingShips = "sailing ships failed after %d attempts"
ErrorAttemptFailed = "attempt %d failed: %w"
ErrorTaskFailedAfterAttempts = "task %s failed after %d attempts: %w"
ErrorFailedToCompleteAfterAttempts = "failed to complete after %d attempts: %v"
)

const (
Expand Down
3 changes: 3 additions & 0 deletions worker/cmd_constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
policyNamE = "policyName"
policySpeC = "policySpec"
retryDelay = "retryDelay"
tasK = "task"
attempT = "attempt"
maXRetries = "maxRetries"
)

// defined notice message just like human would type
Expand Down
22 changes: 19 additions & 3 deletions worker/crew.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/H0llyW00dzZ/K8sBlackPearl/language"
"github.com/H0llyW00dzZ/K8sBlackPearl/navigator"
"github.com/H0llyW00dzZ/K8sBlackPearl/worker/configuration"
"github.com/H0llyW00dzZ/go-urlshortner/logmonitor/constant"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -111,18 +112,33 @@ func handleSuccessfulTask(task configuration.Task, results chan<- string, worker
//
// error: Error if the task fails after all retry attempts.
func performTaskWithRetries(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, results chan<- string, workerIndex int, taskStatus *TaskStatusMap) error {
// Define the operation to be retried.
operation := func() (string, error) {
// Attempt to perform the task.
err := performTask(ctx, clientset, shipsNamespace, task, workerIndex)
return task.Name, err // Return the task name along with the error.
}

// Use the withRetries helper function to perform the operation with retries.
err := withRetries(ctx, task.MaxRetries, task.RetryDelayDuration, operation)
// Create a RetryPolicy instance with the task's retry settings.
retryPolicy := RetryPolicy{
MaxRetries: task.MaxRetries,
RetryDelay: task.RetryDelayDuration,
}

// Use the RetryPolicy's Execute method to perform the operation with retries.
err := retryPolicy.Execute(ctx, operation, func(message string, fields ...zap.Field) {
// This is a placeholder for the actual logging function.
// Replace this with the actual function to log retries.
// For example: navigator.LogInfoWithEmoji or navigator.LogErrorWithEmoji
// Combine emojis with a space for readability.
emojiField := fmt.Sprintf("%s %s", constant.ErrorEmoji, language.PirateEmoji)
navigator.LogErrorWithEmoji(emojiField, message, fields...)
})

if err != nil {
// If the operation failed after retries, handle the failure.
handleFailedTask(task, taskStatus, shipsNamespace, err, results, workerIndex)
return fmt.Errorf(language.ErrorTaskFailedAfterAttempts, task.Name, task.MaxRetries, err)
return fmt.Errorf(language.ErrorFailedToCompleteTask, task.Name, task.MaxRetries)
}

// If the operation was successful, handle the success.
Expand Down
25 changes: 15 additions & 10 deletions worker/error_and_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
// attempt int: The current retry attempt number.
// err error: The error encountered during the task execution that prompted the retry.
// maxRetries int: The maximum number of retry attempts.
func logRetryAttempt(taskName string, attempt int, err error, maxRetries int) {
navigator.LogErrorWithEmojiRateLimited(
constant.ErrorEmoji,
fmt.Sprintf(language.ErrorDuringTaskAttempt, attempt+1, maxRetries, err),
zap.String(language.Task_Name, taskName),
// logFunc func(string, ...zap.Field): The log function to use.
func logRetryAttempt(taskName string, attempt int, err error, maxRetries int, logFunc func(string, ...zap.Field)) {
message := fmt.Sprintf(language.ErrorDuringTaskAttempt, attempt+1, maxRetries, err)
fields := []zap.Field{
zap.String(tasK, taskName),
zap.Int(attempT, attempt+1),
zap.Int(maXRetries, maxRetries),
zap.Error(err),
)
}
logFunc(message, fields...)
}

// logFinalError logs an error message signaling the final failure of a task after all retries.
Expand Down Expand Up @@ -127,10 +130,12 @@ func handleConflictError(ctx context.Context, clientset *kubernetes.Clientset, s
//
// bool: A boolean indicating whether the task should be retried or not.
func handleGenericError(ctx context.Context, err error, attempt int, task *configuration.Task, workerIndex int, maxRetries int, retryDelay time.Duration) bool {
logRetryAttempt(task.Name, attempt, err, maxRetries)
time.Sleep(retryDelay)
if ctx.Err() != nil {
return false // Context is canceled, do not continue.
logRetryAttempt(task.Name, attempt, err, maxRetries, navigator.Logger.Error)

// Wait for the next attempt, respecting the context cancellation.
if !waitForNextAttempt(ctx, retryDelay) {
return false // Context was cancelled during wait, do not continue.
}

return true
}
76 changes: 28 additions & 48 deletions worker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,34 @@ import (
"go.uber.org/zap"
)

// RetryPolicy defines the policy for retrying operations.
type RetryPolicy struct {
MaxRetries int
RetryDelay time.Duration
}

// Execute runs the given operation according to the retry policy.
func (r *RetryPolicy) Execute(ctx context.Context, operation func() (string, error), logFunc func(string, ...zap.Field)) error {
var lastErr error
for attempt := 0; attempt < r.MaxRetries; attempt++ {
select {
case <-ctx.Done():
return ctx.Err() // Context was cancelled, return the context error.
default:
taskName, err := operation()
if err == nil {
return nil // The operation was successful, return nil error.
}
lastErr = err
logRetryAttempt(taskName, attempt, err, r.MaxRetries, logFunc)
if attempt < r.MaxRetries-1 {
time.Sleep(r.RetryDelay) // Wait before the next attempt.
}
}
}
return fmt.Errorf(language.ErrorFailedToCompleteAfterAttempts, r.MaxRetries, lastErr)
}

// getParamAsString retrieves a string value from a map based on a key.
// It returns an error if the key is not present or the value is not a string.
//
Expand Down Expand Up @@ -138,54 +166,6 @@ func logResultsFromChannel(results chan string, fields []zap.Field) {
}
}

// withRetries executes an operation with a specified number of retries.
// It accepts a context for cancellation, the maximum number of retries, a delay between retries,
// and the operation to be executed as a function that returns a string and an error.
//
// The operation is attempted up to maxRetries times until it succeeds or the context is cancelled.
// If the operation fails, it logs the retry attempt and waits for retryDelay before retrying.
// The operation is considered successful if it returns a nil error.
//
// ctx context.Context: The context that controls the cancellation of the retries.
// maxRetries int: The maximum number of times to retry the operation.
// retryDelay time.Duration: The amount of time to wait between each retry attempt.
// operation func() (string, error): The operation to be executed, which returns a result string and error.
//
// Returns an error if the operation does not succeed within the maximum number of retries or if the context is cancelled.
func withRetries(ctx context.Context, maxRetries int, retryDelay time.Duration, operation func() (string, error)) error {
for attempt := 0; attempt < maxRetries; attempt++ {
taskName, err := attemptOperation(ctx, attempt, operation)
if err == nil {
return nil // The operation was successful, return nil error.
}
if ctx.Err() != nil {
return ctx.Err() // The context has been cancelled, return the context error.
}
logRetryAttempt(taskName, attempt, err, maxRetries)
if attempt < maxRetries-1 && !waitForNextAttempt(ctx, retryDelay) {
// Only wait for the next attempt if we have more retries left and context is not done.
return ctx.Err() // Context was cancelled during wait, return the context error.
}
}
return fmt.Errorf(language.ErrorSailingShips, maxRetries)
}

// attemptOperation attempts to execute an operation within a retry mechanism.
// It is a helper function used by withRetries to encapsulate the single attempt logic.
//
// ctx context.Context: The context that controls the cancellation of the operation.
// attempt int: The current attempt number.
// operation func() (string, error): The operation to be executed, which returns a result string and error.
//
// Returns the result string and an error. The error is formatted with the attempt number if the operation fails.
func attemptOperation(ctx context.Context, attempt int, operation func() (string, error)) (string, error) {
taskName, err := operation()
if err != nil {
return taskName, fmt.Errorf(language.ErrorAttemptFailed, attempt, err)
}
return taskName, nil
}

// waitForNextAttempt waits for a specified duration or until the context is cancelled, whichever comes first.
// It is used to implement a delay between retry attempts in the withRetries function.
//
Expand Down

0 comments on commit 7c6dad5

Please sign in to comment.