Skip to content

Commit

Permalink
Reduce Cyclomatic and Update Documentation (#38)
Browse files Browse the repository at this point in the history
* Reduce [Crew] Cyclomatic

- [+] feat(crew.go): add processTask function to handle individual task processing
- [+] feat(crew.go): add handleFailedTask function to handle failed tasks
- [+] feat(crew.go): add handleSuccessfulTask function to handle successful tasks

* Reduce [Error and Retry] Cyclomatic

- [+] fix(worker/error_and_retry.go): improve function and variable names for clarity
- [+] feat(worker/error_and_retry.go): add support for handling conflict errors and generic errors during task execution

* Feat [Go Docs] Resolve Conflict Documentation

- [+] chore(crew.go): add resolveConflict function to handle conflict errors during task execution

* Reduce [Setup] [K8S] Cyclomatic

- [+] refactor(worker/setup.go): simplify NewKubernetesClient function and extract buildOutOfClusterConfig function
- [+] feat(worker/setup.go): add buildOutOfClusterConfig function to build configuration from kubeconfig file

* Reduce [Task Check Health Pods] Cyclomatic

- [+] refactor(logs_pods.go): improve readability and maintainability by separating concerns of iterating over pod list and logging pod information
- [+] feat(logs_pods.go): add function to log individual pod information
- [+] feat(logs_pods.go): add function to initiate concurrent health checks for pods
- [+] feat(logs_pods.go): add function to log health status messages for pods
  • Loading branch information
H0llyW00dzZ authored Dec 25, 2023
1 parent e9f1e89 commit ba99954
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 126 deletions.
114 changes: 87 additions & 27 deletions worker/crew.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,84 @@ const (
retryDelay = 2 * time.Second // Delay between retries
)

// CrewWorker orchestrates the execution of tasks within a Kubernetes namespace.
// It utilizes performTaskWithRetries to attempt each task with built-in retry logic.
// If a task fails after the maximum number of retries, it logs the error and sends
// a failure message through the results channel. Tasks are claimed to prevent duplicate
// executions, and they can be released if necessary for subsequent retries.
// CrewWorker orchestrates the execution of tasks within a Kubernetes namespace by utilizing
// performTaskWithRetries to attempt each task with built-in retry logic. If a task fails
// after the maximum number of retries, it logs the error and sends a failure message through
// the results channel. Tasks are claimed to prevent duplicate executions, and they can be
// released if necessary for subsequent retries.
//
// Parameters:
// - ctx: Context for cancellation and timeout of the worker process.
// - clientset: Kubernetes API client for cluster interactions.
// - shipsNamespace: Namespace in Kubernetes for task operations.
// - tasks: List of Task structs, each representing an executable task.
// - results: Channel to return execution results to the caller.
// - logger: Logger for structured logging within the worker.
// - taskStatus: Map to track and control the status of tasks.
// - workerIndex: Identifier for the worker instance for logging.
//
// ctx: Context for cancellation and timeout of the worker process.
// clientset: Kubernetes API client for cluster interactions.
// shipsNamespace: Namespace in Kubernetes for task operations.
// tasks: List of Task structs, each representing an executable task.
// results: Channel to return execution results to the caller.
// logger: Logger for structured logging within the worker.
// taskStatus: Map to track and control the status of tasks.
// workerIndex: Identifier for the worker instance for logging.
func CrewWorker(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, tasks []configuration.Task, results chan<- string, logger *zap.Logger, taskStatus *TaskStatusMap, workerIndex int) {
for _, task := range tasks {
// Try to claim the task. If it's already claimed, skip it.
if !taskStatus.Claim(task.Name) {
continue
}
processTask(ctx, clientset, shipsNamespace, task, results, logger, taskStatus, workerIndex)
}
}

err := performTaskWithRetries(ctx, clientset, shipsNamespace, task, results, workerIndex)
if err != nil {
// If the task fails, you can choose to release it for retrying.
taskStatus.Release(task.Name)
logFinalError(shipsNamespace, task.Name, err, maxRetries)
results <- err.Error()
} else {
// If the task is successful, it remains claimed to prevent retries.
results <- fmt.Sprintf(language.TaskWorker_Name, workerIndex, fmt.Sprintf(language.TaskCompleteS, task.Name))
}
// processTask processes an individual task within a Kubernetes namespace. It first attempts to
// claim the task to prevent duplicate processing. If the claim is successful, it then attempts
// to perform the task with retries. Depending on the outcome, it either handles a failed task
// or reports a successful completion.
//
// Parameters:
//
// ctx: Context for cancellation and timeout of the task processing.
// clientset: Kubernetes API client for cluster interactions.
// shipsNamespace: Namespace in Kubernetes where the task is executed.
// task: The task to be processed.
// results: Channel to return execution results to the caller.
// logger: Logger for structured logging within the worker.
// taskStatus: Map to track and control the status of tasks.
// workerIndex: Identifier for the worker instance for logging.
func processTask(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, results chan<- string, logger *zap.Logger, taskStatus *TaskStatusMap, workerIndex int) {
if !taskStatus.Claim(task.Name) {
return
}

err := performTaskWithRetries(ctx, clientset, shipsNamespace, task, results, workerIndex)
if err != nil {
handleFailedTask(task, taskStatus, shipsNamespace, err, results, workerIndex)
} else {
handleSuccessfulTask(task, results, workerIndex)
}
}

// handleFailedTask handles the scenario when a task fails to complete after retries. It releases
// the claim on the task, logs the final error, and sends an error message through the results channel.
//
// Parameters:
//
// task: The task that has failed.
// taskStatus: Map to track and control the status of tasks.
// shipsNamespace: Namespace in Kubernetes associated with the task.
// err: The error that occurred during task processing.
// results: Channel to return execution results to the caller.
// workerIndex: Identifier for the worker instance for logging.
func handleFailedTask(task configuration.Task, taskStatus *TaskStatusMap, shipsNamespace string, err error, results chan<- string, workerIndex int) {
taskStatus.Release(task.Name)
logFinalError(shipsNamespace, task.Name, err, maxRetries)
results <- err.Error()
}

// handleSuccessfulTask reports a task's successful completion by sending a success message
// through the results channel.
//
// Parameters:
//
// task: The task that has been successfully completed.
// results: Channel to return execution results to the caller.
// workerIndex: Identifier for the worker instance for logging.
func handleSuccessfulTask(task configuration.Task, results chan<- string, workerIndex int) {
successMessage := fmt.Sprintf(language.TaskWorker_Name, workerIndex, fmt.Sprintf(language.TaskCompleteS, task.Name))
results <- successMessage
}

// performTaskWithRetries tries to execute a task, with retries on failure.
Expand Down Expand Up @@ -83,6 +128,21 @@ func performTaskWithRetries(ctx context.Context, clientset *kubernetes.Clientset
return fmt.Errorf(language.ErrorFailedToCompleteTask, task.Name, maxRetries)
}

// resolveConflict attempts to resolve a conflict error by retrieving the latest version of a pod involved in the task.
// It updates the task's parameters with the new pod information, particularly the resource version, to mitigate
// the conflict error. This function is typically called when a conflict error is detected during task execution,
// such as when a resource has been modified concurrently.
//
// Parameters:
//
// ctx: The context governing cancellation.
// clientset: The Kubernetes client set used for interacting with the Kubernetes API.
// shipsnamespace: The Kubernetes namespace where the pod is located.
// task: The task containing the parameters that need to be updated with the latest pod information.
//
// Returns:
//
// error: An error if retrieving the latest version of the pod fails or if the pod name is not found in the task parameters.
func resolveConflict(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task *configuration.Task) error {
podName, ok := task.Parameters[language.PodName].(string)
if !ok {
Expand Down
96 changes: 64 additions & 32 deletions worker/error_and_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,51 +52,83 @@ func logFinalError(shipsnamespace string, taskName string, err error, maxRetries
)
}

// handleTaskError processes an error encountered during task execution, determining whether to retry the task.
// It takes into account the context's cancellation state, conflict errors and employs a delay between retries.
// handleTaskError evaluates an error encountered during task execution to determine if a retry is appropriate.
// It checks the context's cancellation state and the nature of the error (e.g., conflict errors). If the context
// is not canceled and the error is not a conflict, it will log the error and delay the next attempt based on the
// specified retryDelay. This function helps to implement a retry mechanism with backoff strategy.
//
// Parameters:
// - ctx: The context governing cancellation.
// - clientset: The Kubernetes client set.
// - shipsnamespace: The namespace where the task was attempted.
// - err: The error encountered during the task execution.
// - attempt: The current retry attempt number.
// - task: The task being attempted.
// - workerIndex: The index of the worker processing the task.
// - maxRetries: The maximum number of retry attempts.
// - retryDelay: The duration to wait before retrying the task.
//
// ctx: The context governing cancellation.
// clientset: The Kubernetes client set used for task operations.
// shipsnamespace: The Kubernetes namespace where the task was attempted.
// err: The error encountered during the task execution.
// attempt: The current retry attempt number.
// task: The task being attempted.
// workerIndex: The index of the worker processing the task.
// maxRetries: The maximum number of retry attempts allowed.
// retryDelay: The duration to wait before making the next retry attempt.
//
// Returns:
// - shouldContinue: A boolean indicating whether the task should be retried.
//
// shouldContinue: A boolean indicating whether the task should be retried or not.
func handleTaskError(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, err error, attempt int, task *configuration.Task, workerIndex int, maxRetries int, retryDelay time.Duration) (shouldContinue bool) {
if ctx.Err() != nil {
return false
}

if apierrors.IsConflict(err) {
if resolveErr := resolveConflict(ctx, clientset, shipsnamespace, task); resolveErr != nil {
return false // Return the error from resolveConflict.
}
return true // Retry immediately after resolving conflict.
switch {
case apierrors.IsConflict(err):
return handleConflictError(ctx, clientset, shipsnamespace, task)
default:
return handleGenericError(ctx, err, attempt, task, workerIndex, maxRetries, retryDelay)
}
}

fieldslog := navigator.CreateLogFields(
language.TaskFetchPods,
shipsnamespace,
navigator.WithAnyZapField(zap.Int(language.Worker_Name, workerIndex)),
navigator.WithAnyZapField(zap.Int(language.Attempt, attempt+1)),
navigator.WithAnyZapField(zap.Int(language.Max_Retries, maxRetries)),
navigator.WithAnyZapField(zap.String(language.Task_Name, task.Name)),
)
// magic goes here, append fields log ":=" into binaries lmao
retryMessage := fmt.Sprintf("%s %s", constant.ErrorEmoji, fmt.Sprintf(language.RetryingTask, attempt+1, maxRetries))
navigator.LogInfoWithEmoji(
language.PirateEmoji,
fmt.Sprintf(language.TaskWorker_Name, workerIndex, retryMessage),
fieldslog...,
)
// handleConflictError is called when a conflict error is detected during task execution. It attempts to resolve
// the conflict by calling resolveConflict. If resolving the conflict fails, it returns false to indicate that the
// task should not be retried. Otherwise, it returns true, suggesting that the task may be retried.
//
// Parameters:
//
// ctx: The context governing cancellation.
// clientset: The Kubernetes client set used for task operations.
// shipsnamespace: The Kubernetes namespace where the task was attempted.
// task: The task being attempted.
//
// Returns:
//
// A boolean indicating whether the task should be retried after conflict resolution.
func handleConflictError(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task *configuration.Task) bool {
if resolveErr := resolveConflict(ctx, clientset, shipsnamespace, task); resolveErr != nil {
return false
}
return true
}

// handleGenericError handles non-conflict errors encountered during task execution. It logs the retry attempt
// and enforces a delay before the next attempt based on retryDelay. If the context is canceled during this delay,
// it returns false to indicate that the task should not be retried. Otherwise, it returns true to suggest that
// the task may be retried.
//
// Parameters:
//
// ctx: The context governing cancellation.
// err: The error encountered during task execution.
// attempt: The current retry attempt number.
// task: The task being attempted.
// workerIndex: The index of the worker processing the task.
// maxRetries: The maximum number of retry attempts allowed.
// retryDelay: The duration to wait before making the next retry attempt.
//
// Returns:
//
// A boolean indicating whether the task should be retried or not.
func handleGenericError(ctx context.Context, err error, attempt int, task *configuration.Task, workerIndex int, maxRetries int, retryDelay time.Duration) bool {
logRetryAttempt(task.Name, attempt, err, maxRetries)
time.Sleep(retryDelay)
if ctx.Err() != nil {
return false // Context is canceled, do not continue.
}
return true
}
107 changes: 58 additions & 49 deletions worker/logs_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,85 @@ import (
corev1 "k8s.io/api/core/v1"
)

// logPods logs information about the pods that have been fetched from the Kubernetes API.
// It uses structured logging to provide a consistent and searchable log format. Each log entry
// will include additional fields provided by the `fields` parameter, as well as fields specific
// to each pod such as its name and status.
// logPods iterates through the list of pods and delegates the task of logging
// each individual pod's information to the logPod function. This function enhances
// readability and maintainability by separating the concerns of iterating over the
// pod list and the actual logging of pod information.
//
// Parameters:
// - fields: A slice of zap.Field structs that provide additional context for logging,
// such as the operation being performed or metadata about the request.
// - podList: A pointer to a corev1.PodList containing the list of Pods to be logged.
//
// This function first logs a summary message indicating the total number of pods fetched.
// It then iterates over each pod in the list and logs its name and status. The logs are
// decorated with an emoji for better visual distinction in log outputs.
// - baseFields: A slice of zap.Field structs providing contextual logging information.
// - podList: A pointer to a corev1.PodList containing the list of pods to log.
func logPods(baseFields []zap.Field, podList *corev1.PodList) {
for _, pod := range podList.Items {
// Create a copy of baseFields for each pod to avoid appending to the same slice
podFields := make([]zap.Field, len(baseFields))
copy(podFields, baseFields)
podFields = append(podFields, zap.String(language.PodsName, pod.Name), zap.String(language.PodStatus, string(pod.Status.Phase)))
navigator.LogInfoWithEmoji(language.PirateEmoji, fmt.Sprintf(language.ProcessingPods, pod.Name), podFields...)
logPod(baseFields, &pod)
}
}

// checkPodsHealth spawns a goroutine to check the health of each pod in the provided list.
// It sends the health status messages to a channel which can be used for further processing.
// logPod constructs a log entry for a single pod, combining base contextual fields
// with pod-specific information such as its name and status. This function encapsulates
// the logic for logging a single pod, which simplifies the logPods function and allows
// for potential reuse in other contexts where individual pod logging is required.
//
// Parameters:
// - baseFields: A slice of zap.Field structs providing contextual logging information.
// - pod: A pointer to a corev1.Pod representing the pod to log information about.
func logPod(baseFields []zap.Field, pod *corev1.Pod) {
podFields := append([]zap.Field(nil), baseFields...)
podFields = append(podFields, zap.String(language.PodsName, pod.Name), zap.String(language.PodStatus, string(pod.Status.Phase)))
navigator.LogInfoWithEmoji(language.PirateEmoji, fmt.Sprintf(language.ProcessingPods, pod.Name), podFields...)
}

// checkPodsHealth initiates concurrent health checks for all pods in the provided list.
// It returns a channel that communicates each pod's health status back to the caller,
// allowing for asynchronous processing of the results.
//
// Parameters:
// - ctx: The context to control the cancellation of the health check operation.
// - ctx: A context.Context to allow for cancellation of the health checks.
// - podList: A pointer to a corev1.PodList containing the pods to be checked.
//
// Returns:
// - A channel of strings, where each string is a message about a pod's health status.
//
// The function creates a channel to send the health status messages. It then iterates over
// each pod, checks its health, and sends a status message to the channel. If the context
// is cancelled, the function stops processing and exits the goroutine.
// - A channel of strings, where each string represents a pod's health status message.
func (c *CrewProcessCheckHealthTask) checkPodsHealth(ctx context.Context, podList *corev1.PodList) chan string {
results := make(chan string, len(podList.Items))
go func() {
defer close(results)
for _, pod := range podList.Items {
select {
case <-ctx.Done():
return
default:
healthStatus := language.NotHealthyStatus
if CrewCheckingisPodHealthy(&pod) {
healthStatus = language.HealthyStatus
}
statusMsg := fmt.Sprintf(language.PodAndStatusAndHealth, pod.Name, pod.Status.Phase, healthStatus)
results <- statusMsg
}
}
}()
go c.checkHealthWorker(ctx, podList, results)
return results
}

// logResults listens on a channel for pod health status messages and logs them.
// It continues to log messages until the channel is closed or the context is cancelled.
// checkHealthWorker is responsible for conducting health checks on each pod in the list.
// It reports each pod's health status back to the caller via the provided results channel.
// This function is designed to run as a goroutine, allowing multiple pods to be checked
// concurrently for efficiency.
//
// Parameters:
// - ctx: A context.Context to allow for cancellation of the health checks.
// - podList: A pointer to a corev1.PodList containing the pods to be checked.
// - results: A channel for sending back health status messages.
func (c *CrewProcessCheckHealthTask) checkHealthWorker(ctx context.Context, podList *corev1.PodList, results chan<- string) {
defer close(results)
for _, pod := range podList.Items {
if ctx.Err() != nil {
return
}
healthStatus := language.NotHealthyStatus
if CrewCheckingisPodHealthy(&pod) {
healthStatus = language.HealthyStatus
}
statusMsg := fmt.Sprintf(language.PodAndStatusAndHealth, pod.Name, pod.Status.Phase, healthStatus)
results <- statusMsg
}
}

// logResults continuously listens for health status messages on the results channel
// and logs them. The function will keep logging until there are no more messages to
// process or until the context is cancelled, whichever comes first. This function
// effectively decouples the logging of results from the health checking process.
//
// Parameters:
// - ctx: The context to control the cancellation of the logging operation.
// - results: A channel of strings containing the health status messages of pods.
// - ctx: A context.Context to allow for cancellation of the logging process.
// - results: A channel from which to read health status messages.
//
// Returns:
// - An error if the context is cancelled, indicating that logging was not completed.
//
// The function selects on the context's Done channel and the results channel.
// If the context is cancelled, it logs an error message and returns the context's error.
// Otherwise, it logs the health status messages until the results channel is closed.
// - An error if the context is cancelled, signaling incomplete logging.
func (c *CrewProcessCheckHealthTask) logResults(ctx context.Context, results chan string) error {
for {
select {
Expand Down
Loading

0 comments on commit ba99954

Please sign in to comment.