Skip to content

Commit

Permalink
[Go Docs] Update Documentation (#60)
Browse files Browse the repository at this point in the history
* Go Docs [Worker] [Helper Function] Fix Formatting

- [+] refactor(helper.go): improve function and parameter descriptions in getParamAsString, getParamAsInt64, getParamAsInt, and logTaskStart functions

* Go Docs [Worker] [Helper Function] Update Documentation

- [+] refactor(helper.go): improve comments and function names for clarity and consistency
- [+] feat(helper.go): add logErrorWithFields function to log errors with additional context
- [+] feat(helper.go): add logResultsFromChannel function to log messages received from a channel

* Go Docs [Worker] [Crew] Fix Formatting

- [+] feat(crew.go): add function signatures and parameter descriptions for CrewWorker, processTask, handleFailedTask, handleSuccessfulTask, performTaskWithRetries, and CrewProcessPods functions
- [+] fix(crew.go): fix parameter type for handleFailedTask function
- [+] fix(crew.go): fix parameter type for performTaskWithRetries function
- [+] fix(crew.go): fix parameter type for CrewProcessPods function
- [+] fix(crew.go): fix return type for performTaskWithRetries function
- [+] fix(crew.go): fix return type for CrewCheckingisPodHealthy function

* Go Docs [Worker] [Captain] Fix Formatting

- [+] refactor(captain.go): update function signature and add parameter descriptions
- [+] chore(captain.go): remove unused imports

* Go Docs [Worker] [Error and Retry] Fix Formatting

- [+] chore(worker/error_and_retry.go): refactor function signatures and add parameter descriptions
- [+] feat(worker/error_and_retry.go): add logRetryAttempt function to log retry attempts
- [+] feat(worker/error_and_retry.go): add logFinalError function to log final error
- [+] feat(worker/error_and_retry.go): add handleTaskError function to handle task errors
- [+] feat(worker/error_and_retry.go): add handleConflictError function to handle conflict errors
- [+] feat(worker/error_and_retry.go): add handleGenericError function to handle generic errors

* Go Docs [Worker] [Labels Pods] Fix Formatting

- [+] chore(labels_pods.go): fix formatting and add missing parameter descriptions in function comments

* Go Docs [Worker] [List Options] Fix Formatting

- [+] docs(list_options.go): update function signature and add parameter description

* Go Docs [Worker] [List Pods] Fix Formatting

- [+] docs(list_pods.go): update function signature and parameter descriptions for listPods function

* Go Docs [Worker] [Logs Pods] Fix Formatting

- [+] chore(worker): update function signatures and add comments for clarity
- [+] refactor(worker): rename variables for better readability

* Go Docs [Worker] [PVC Storage] Fix Formatting

- [+] docs(pvc_storage.go): update function signature and parameter descriptions for createPVC function

* Go Docs [Worker] [Scale Deployment] Fix Formatting

- [+] refactor(scale.go): improve comments and parameter names for ScaleDeployment and scaleDeploymentOnce functions
- [+] chore(scale.go): fix formatting and indentation in int32Ptr function

* Go Docs [Worker] [Setup K8S] Fix Formatting

- [+] refactor(worker): update comments in setup.go
- [+] The comments in the `setup.go` file have been updated to provide more clarity and consistency.

* Go Docs [Worker] [Task Crew] Fix Missing Docs

- [+] feat(tasks_crew.go): add InitializeTasks function to load tasks from configuration file
- [+] feat(tasks_crew.go): add extractScaleParameters function to extract scaling parameters from task
- [+] feat(tasks_crew.go): add performScaling function to carry out scaling operation for Kubernetes deployment

* Go Docs [Worker] [Track Task] Fix Formatting

- [+] chore(track_task.go): update comments and formatting in TaskStatusMap struct and methods
- [+] chore(track_task.go): remove deadcode comments and update parameter comments in TaskStatusMap methods

* Go Docs [Worker] [Update Image Deployments] Fix Formatting

- [+] docs(update_image.go): update function signature and parameter descriptions in UpdateDeploymentImage function

* Go Docs [Worker] [Update Network Policy] Fix Formatting

- [+] chore(worker): update function signatures and add parameter descriptions in update_network_policy.go
- [+] chore(worker): update function signature and add parameter description in unmarshalPolicySpec function

* Go Docs [Worker] [Configuration [Task] Fix Missing Docs

- [+] fix(task.go): add comments explaining unmarshalJSON and unmarshalYAML functions
- [+] fix(task.go): add comments explaining parseTasks function
- [+] fix(task.go): add comments explaining ParseDuration function
  • Loading branch information
H0llyW00dzZ authored Dec 31, 2023
1 parent 3c98ee2 commit 39f0c22
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 206 deletions.
16 changes: 9 additions & 7 deletions worker/captain.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ import (
// The shutdown function ensures all workers are stopped and the results channel is closed.
//
// Parameters:
// - ctx: Parent context to control the lifecycle of the workers.
// - clientset: Kubernetes API client for task operations.
// - shipsNamespace: Namespace in Kubernetes to perform tasks.
// - tasks: Slice of Task structs to be executed by the workers.
// - workerCount: Number of worker goroutines to start.
//
// ctx context.Context: Parent context to control the lifecycle of the workers.
// clientset *kubernetes.Clientset: Kubernetes API client for task operations.
// shipsNamespace string: Namespace in Kubernetes to perform tasks.
// tasks []configuration.Task: Slice of Task structs to be executed by the workers.
// workerCount int: Number of worker goroutines to start.
//
// Returns:
// - <-chan string: A read-only channel to receive task results.
// - func(): A function to call for initiating a graceful shutdown of the workers.
//
// <-chan string,: A read-only channel to receive task results.
// func()): A function to call for initiating a graceful shutdown of the workers.
func CaptainTellWorkers(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, tasks []configuration.Task, workerCount int) (<-chan string, func()) {
results := make(chan string)
var wg sync.WaitGroup
Expand Down
12 changes: 11 additions & 1 deletion worker/configuration/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func LoadTasksFromJSON(filePath string) ([]Task, error) {
return parseTasks(tasks)
}

// unmarshalJSON takes a byte slice of JSON data and unmarshals it into the provided tasks slice.
// It returns an error if the unmarshalling process fails.
func unmarshalJSON(file []byte, tasks *[]Task) error {
return json.Unmarshal(file, tasks)
}
Expand All @@ -83,15 +85,21 @@ func LoadTasksFromYAML(filePath string) ([]Task, error) {
return parseTasks(tasks)
}

// unmarshalYAML takes a byte slice of YAML data and unmarshals it into the provided tasks slice.
// It returns an error if the unmarshalling process fails.
func unmarshalYAML(file []byte, tasks *[]Task) error {
return yaml.Unmarshal(file, tasks)
}

// parseTasks iterates through a slice of tasks, parsing the RetryDelay string into a time.Duration
// and updating the RetryDelayDuration field for each task. It returns the updated slice of tasks
// and any error that occurs during the parsing of the retry delay.
func parseTasks(tasks []Task) ([]Task, error) {
for i, task := range tasks {
duration, err := ParseDuration(task.RetryDelay)
if err != nil {
return nil, fmt.Errorf(language.ErrorFailedToParseRetryDelayFromTask, task.Name, err)
// Wrapping the error with the task name to provide context.
return nil, fmt.Errorf("%s: %w", task.Name, err)
}
tasks[i].RetryDelayDuration = duration
}
Expand All @@ -118,6 +126,8 @@ func LoadTasks(filePath string) ([]Task, error) {
}
}

// ParseDuration converts a duration string into a time.Duration object.
// It returns an error if the string is empty or if the format is not recognized.
func ParseDuration(durationStr string) (time.Duration, error) {
if durationStr == "" {
return 0, fmt.Errorf(language.ErrorDurationstringisEmpty)
Expand Down
64 changes: 34 additions & 30 deletions worker/crew.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func CrewWorker(ctx context.Context, clientset *kubernetes.Clientset, shipsNames
//
// Parameters:
//
// ctx: Context for cancellation and timeout of the task processing.
// clientset: Kubernetes API client for cluster interactions.
// shipsNamespace: Namespace in Kubernetes where the task is executed.
// task: The task to be processed.
// results: Channel to return execution results to the caller.
// logger: Logger for structured logging within the worker.
// taskStatus: Map to track and control the status of tasks.
// workerIndex: Identifier for the worker instance for logging.
// ctx context.Context: Context for cancellation and timeout of the task processing.
// clientset *kubernetes.Clientset: Kubernetes API client for cluster interactions.
// shipsNamespace string: Namespace in Kubernetes where the task is executed.
// task configuration.Task: The task to be processed.
// results chan<- string: Channel to return execution results to the caller.
// logger *zap.Logger: Logger for structured logging within the worker.
// taskStatus *TaskStatusMap: Map to track and control the status of tasks.
// workerIndex int: Identifier for the worker instance for logging.
func processTask(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, results chan<- string, logger *zap.Logger, taskStatus *TaskStatusMap, workerIndex int) {
if !taskStatus.Claim(task.Name) {
return
Expand All @@ -67,12 +67,12 @@ func processTask(ctx context.Context, clientset *kubernetes.Clientset, shipsName
//
// Parameters:
//
// task: The task that has failed.
// taskStatus: Map to track and control the status of tasks.
// shipsNamespace: Namespace in Kubernetes associated with the task.
// err: The error that occurred during task processing.
// results: Channel to return execution results to the caller.
// workerIndex: Identifier for the worker instance for logging.
// task configuration.Task: The task that has failed.
// taskStatus *TaskStatusMap: Map to track and control the status of tasks.
// shipsNamespace string: Namespace in Kubernetes associated with the task.
// err error: The error that occurred during task processing.
// results chan<- string: Channel to return execution results to the caller.
// workerIndex int: Identifier for the worker instance for logging.
func handleFailedTask(task configuration.Task, taskStatus *TaskStatusMap, shipsNamespace string, err error, results chan<- string, workerIndex int) {
taskStatus.Release(task.Name)
logFinalError(shipsNamespace, task.Name, err, task.MaxRetries)
Expand All @@ -84,9 +84,9 @@ func handleFailedTask(task configuration.Task, taskStatus *TaskStatusMap, shipsN
//
// Parameters:
//
// task: The task that has been successfully completed.
// results: Channel to return execution results to the caller.
// workerIndex: Identifier for the worker instance for logging.
// task configuration.Task: The task that has been successfully completed.
// results chan<- string: Channel to return execution results to the caller.
// workerIndex int: Identifier for the worker instance for logging.
func handleSuccessfulTask(task configuration.Task, results chan<- string, workerIndex int) {
successMessage := fmt.Sprintf(language.TaskWorker_Name, workerIndex, fmt.Sprintf(language.TaskCompleteS, task.Name))
results <- successMessage
Expand All @@ -98,15 +98,17 @@ func handleSuccessfulTask(task configuration.Task, results chan<- string, worker
// it returns an error detailing the failure.
//
// Parameters:
// - ctx: Context for task cancellation and timeouts.
// - clientset: Kubernetes API client for executing tasks.
// - shipsNamespace: Kubernetes namespace for task execution.
// - task: Task to be executed.
// - results: Channel for reporting task execution results.
// - workerIndex: Index of the worker for contextual logging.
//
// ctx context.Context: Context for task cancellation and timeouts.
// clientset *kubernetes.Clientset: Kubernetes API client for executing tasks.
// shipsNamespace string: Kubernetes namespace for task execution.
// task configuration.Task: Task to be executed.
// results chan<- string: Channel for reporting task execution results.
// workerIndex int: Index of the worker for contextual logging.
//
// Returns:
// - error: Error if the task fails after all retry attempts.
//
// error: Error if the task fails after all retry attempts.
func performTaskWithRetries(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, results chan<- string, workerIndex int) error {
for attempt := 0; attempt < task.MaxRetries; attempt++ {
err := performTask(ctx, clientset, shipsNamespace, task, workerIndex)
Expand All @@ -129,10 +131,10 @@ func performTaskWithRetries(ctx context.Context, clientset *kubernetes.Clientset
//
// Parameters:
//
// ctx: The context governing cancellation.
// clientset: The Kubernetes client set used for interacting with the Kubernetes API.
// shipsnamespace: The Kubernetes namespace where the pod is located.
// task: The task containing the parameters that need to be updated with the latest pod information.
// ctx context.Context: The context governing cancellation.
// clientset *kubernetes.Clientset: The Kubernetes client set used for interacting with the Kubernetes API.
// shipsNamespace string: The Kubernetes namespace where the pod is located.
// task *configuration.Task: The task containing the parameters that need to be updated with the latest pod information.
//
// Returns:
//
Expand Down Expand Up @@ -189,10 +191,12 @@ func CrewProcessPods(ctx context.Context, pods []corev1.Pod, results chan<- stri
// It returns true if the pod is in the running phase and all its containers are ready.
//
// Parameters:
// - pod: The pod to check for health status.
//
// pod *corev1.Pod: The pod to check for health status.
//
// Returns:
// - bool: True if the pod is considered healthy, false otherwise.
//
// bool: True if the pod is considered healthy, false otherwise.
func CrewCheckingisPodHealthy(pod *corev1.Pod) bool {
// Check if the pod is in the running phase.
if pod.Status.Phase != corev1.PodRunning {
Expand Down
64 changes: 33 additions & 31 deletions worker/error_and_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
// to indicate the total number of allowed retries.
//
// Parameters:
// - taskName: The name of the task being attempted.
// - attempt: The current retry attempt number.
// - err: The error encountered during the task execution that prompted the retry.
// - maxRetries: The maximum number of retry attempts.
//
// taskName string: The name of the task being attempted.
// attempt int: The current retry attempt number.
// err error: The error encountered during the task execution that prompted the retry.
// maxRetries int: The maximum number of retry attempts.
func logRetryAttempt(taskName string, attempt int, err error, maxRetries int) {
navigator.LogErrorWithEmojiRateLimited(
constant.ErrorEmoji,
Expand All @@ -37,10 +38,11 @@ func logRetryAttempt(taskName string, attempt int, err error, maxRetries int) {
// to indicate the total number of allowed retries.
//
// Parameters:
// - shipsnamespace: The namespace where the task was attempted.
// - taskName: The name of the task that failed.
// - err: The final error encountered that resulted in the task failure.
// - maxRetries: The maximum number of retry attempts.
//
// shipsnamespace string: The namespace where the task was attempted.
// taskName string: The name of the task that failed.
// err error: The final error encountered that resulted in the task failure.
// maxRetries int: The maximum number of retry attempts.
func logFinalError(shipsnamespace string, taskName string, err error, maxRetries int) {
finalErrorMessage := fmt.Sprintf(language.ErrorFailedToCompleteTask, taskName, maxRetries)
navigator.LogErrorWithEmojiRateLimited(
Expand All @@ -59,19 +61,19 @@ func logFinalError(shipsnamespace string, taskName string, err error, maxRetries
//
// Parameters:
//
// ctx: The context governing cancellation.
// clientset: The Kubernetes client set used for task operations.
// shipsnamespace: The Kubernetes namespace where the task was attempted.
// err: The error encountered during the task execution.
// attempt: The current retry attempt number.
// task: The task being attempted.
// workerIndex: The index of the worker processing the task.
// maxRetries: The maximum number of retry attempts allowed.
// retryDelay: The duration to wait before making the next retry attempt.
// ctx context.Context: The context governing cancellation.
// *kubernetes.Clientset: The Kubernetes client set used for task operations.
// shipsnamespace string: The Kubernetes namespace where the task was attempted.
// err error: The error encountered during the task execution.
// attempt int: The current retry attempt number.
// task *configuration.Task: The task being attempted.
// workerIndex int: The index of the worker processing the task.
// maxRetries int: The maximum number of retry attempts allowed.
// retryDelay time.Duration: The duration to wait before making the next retry attempt.
//
// Returns:
//
// shouldContinue: A boolean indicating whether the task should be retried or not.
// shouldContinue bool: A boolean indicating whether the task should be retried or not.
func handleTaskError(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, err error, attempt int, task *configuration.Task, workerIndex int, maxRetries int, retryDelay time.Duration) (shouldContinue bool) {
if ctx.Err() != nil {
return false
Expand All @@ -91,14 +93,14 @@ func handleTaskError(ctx context.Context, clientset *kubernetes.Clientset, ships
//
// Parameters:
//
// ctx: The context governing cancellation.
// clientset: The Kubernetes client set used for task operations.
// shipsnamespace: The Kubernetes namespace where the task was attempted.
// task: The task being attempted.
// ctx context.Context: The context governing cancellation.
// clientset *kubernetes.Clientset: The Kubernetes client set used for task operations.
// shipsnamespace string: The Kubernetes namespace where the task was attempted.
// task *configuration.Task: The task being attempted.
//
// Returns:
//
// A boolean indicating whether the task should be retried after conflict resolution.
// bool: A boolean indicating whether the task should be retried after conflict resolution.
func handleConflictError(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task *configuration.Task) bool {
if resolveErr := resolveConflict(ctx, clientset, shipsnamespace, task); resolveErr != nil {
return false
Expand All @@ -113,17 +115,17 @@ func handleConflictError(ctx context.Context, clientset *kubernetes.Clientset, s
//
// Parameters:
//
// ctx: The context governing cancellation.
// err: The error encountered during task execution.
// attempt: The current retry attempt number.
// task: The task being attempted.
// workerIndex: The index of the worker processing the task.
// maxRetries: The maximum number of retry attempts allowed.
// retryDelay: The duration to wait before making the next retry attempt.
// ctx context.Context: The context governing cancellation.
// err error: The error encountered during task execution.
// attempt int: The current retry attempt number.
// task *configuration.Task: The task being attempted.
// workerIndex int: The index of the worker processing the task.
// maxRetries int: The maximum number of retry attempts allowed.
// retryDelay time.Duration: The duration to wait before making the next retry attempt.
//
// Returns:
//
// A boolean indicating whether the task should be retried or not.
// bool: A boolean indicating whether the task should be retried or not.
func handleGenericError(ctx context.Context, err error, attempt int, task *configuration.Task, workerIndex int, maxRetries int, retryDelay time.Duration) bool {
logRetryAttempt(task.Name, attempt, err, maxRetries)
time.Sleep(retryDelay)
Expand Down
42 changes: 32 additions & 10 deletions worker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
// getParamAsString retrieves a string value from a map based on a key.
// It returns an error if the key is not present or the value is not a string.
//
// params - a map of parameters where the key is expected to be associated with a string value.
// key - the key for which to retrieve the string value.
// params map[string]interface{}: a map of parameters where the key is expected to be associated with a string value.
// key string: the key for which to retrieve the string value.
//
// Returns the string value and nil on success, or an empty string and an error on failure.
func getParamAsString(params map[string]interface{}, key string) (string, error) {
Expand All @@ -28,8 +28,8 @@ func getParamAsString(params map[string]interface{}, key string) (string, error)
// It handles both int and float64 data types due to the way JSON and YAML unmarshal numbers.
// It returns an error if the key is not present or the value is not a number.
//
// params - a map of parameters where the key is expected to be associated with an integer value.
// key - the key for which to retrieve the integer value.
// params map[string]interface{}: a map of parameters where the key is expected to be associated with an integer value.
// key string: the key for which to retrieve the integer value.
//
// Returns the int64 value and nil on success, or 0 and an error on failure.
func getParamAsInt64(params map[string]interface{}, key string) (int64, error) {
Expand Down Expand Up @@ -63,12 +63,14 @@ func getParamAsInt64(params map[string]interface{}, key string) (int64, error) {
// is strictly numerical and prevents any unexpected types from causing issues in the application.
//
// Params:
// - params: map[string]interface{} - The map containing parameter keys and values.
// - key: string - The key corresponding to the integer parameter to be retrieved.
//
// params map[string]interface{}: The map containing parameter keys and values.
// key string: The key corresponding to the integer parameter to be retrieved.
//
// Returns:
// - int: The extracted integer value associated with the provided key.
// - error: An error if the key is not found, or if the value is not a type that can be converted to an int.
//
// int: The extracted integer value associated with the provided key.
// error: An error if the key is not found, or if the value is not a type that can be converted to an int.
func getParamAsInt(params map[string]interface{}, key string) (int, error) {
value, ok := params[key]
if !ok {
Expand All @@ -85,12 +87,22 @@ func getParamAsInt(params map[string]interface{}, key string) (int, error) {
}

// logTaskStart logs the start of a task runner with a custom message and additional fields.
// message - the message to log, which should describe the task being started.
// fields - a slice of zap.Field items that provide additional context for the log entry.
// It uses an emoji for visual emphasis in the log.
//
// message string: The message to log, which should describe the task being started.
// fields []zap.Field: A slice of zap.Field items that provide additional context for the log entry.
func logTaskStart(message string, fields []zap.Field) {
navigator.LogInfoWithEmoji(language.PirateEmoji, message, fields...)
}

// createLogFieldsForRunnerTask generates a slice of zap.Field items for structured logging.
// It is used to create log fields that describe a runner task, including the task type and namespace.
//
// task configuration.Task: The task for which to create log fields.
// shipsNamespace string: The namespace associated with the task.
// taskType string: The type of the task being logged.
//
// Returns a slice of zap.Field items that can be used for structured logging.
func createLogFieldsForRunnerTask(task configuration.Task, shipsNamespace string, taskType string) []zap.Field {
return navigator.CreateLogFields(
taskType,
Expand All @@ -99,10 +111,20 @@ func createLogFieldsForRunnerTask(task configuration.Task, shipsNamespace string
)
}

// logErrorWithFields logs an error message with additional fields for context.
// It uses an emoji and rate limiting for logging errors to avoid flooding the log with repetitive messages.
//
// err error: The error to log.
// fields []zap.Field: A slice of zap.Field items that provide additional context for the error log entry.
func logErrorWithFields(err error, fields []zap.Field) {
navigator.LogErrorWithEmojiRateLimited(language.PirateEmoji, err.Error(), fields...)
}

// logResultsFromChannel logs messages received from a channel.
// It continues to log until the channel is closed.
//
// results chan string: A channel from which to read result strings to log.
// fields []zap.Field: A slice of zap.Field items that provide additional context for each log entry.
func logResultsFromChannel(results chan string, fields []zap.Field) {
for result := range results {
navigator.LogInfoWithEmoji(language.PirateEmoji, result, fields...)
Expand Down
Loading

0 comments on commit 39f0c22

Please sign in to comment.