Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go Docs] Update Documentation #60

Merged
merged 17 commits into from
Dec 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions worker/captain.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ import (
// The shutdown function ensures all workers are stopped and the results channel is closed.
//
// Parameters:
// - ctx: Parent context to control the lifecycle of the workers.
// - clientset: Kubernetes API client for task operations.
// - shipsNamespace: Namespace in Kubernetes to perform tasks.
// - tasks: Slice of Task structs to be executed by the workers.
// - workerCount: Number of worker goroutines to start.
//
// ctx context.Context: Parent context to control the lifecycle of the workers.
// clientset *kubernetes.Clientset: Kubernetes API client for task operations.
// shipsNamespace string: Namespace in Kubernetes to perform tasks.
// tasks []configuration.Task: Slice of Task structs to be executed by the workers.
// workerCount int: Number of worker goroutines to start.
//
// Returns:
// - <-chan string: A read-only channel to receive task results.
// - func(): A function to call for initiating a graceful shutdown of the workers.
//
// <-chan string,: A read-only channel to receive task results.
// func()): A function to call for initiating a graceful shutdown of the workers.
func CaptainTellWorkers(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, tasks []configuration.Task, workerCount int) (<-chan string, func()) {
results := make(chan string)
var wg sync.WaitGroup
Expand Down
12 changes: 11 additions & 1 deletion worker/configuration/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func LoadTasksFromJSON(filePath string) ([]Task, error) {
return parseTasks(tasks)
}

// unmarshalJSON takes a byte slice of JSON data and unmarshals it into the provided tasks slice.
// It returns an error if the unmarshalling process fails.
func unmarshalJSON(file []byte, tasks *[]Task) error {
return json.Unmarshal(file, tasks)
}
Expand All @@ -83,15 +85,21 @@ func LoadTasksFromYAML(filePath string) ([]Task, error) {
return parseTasks(tasks)
}

// unmarshalYAML takes a byte slice of YAML data and unmarshals it into the provided tasks slice.
// It returns an error if the unmarshalling process fails.
func unmarshalYAML(file []byte, tasks *[]Task) error {
return yaml.Unmarshal(file, tasks)
}

// parseTasks iterates through a slice of tasks, parsing the RetryDelay string into a time.Duration
// and updating the RetryDelayDuration field for each task. It returns the updated slice of tasks
// and any error that occurs during the parsing of the retry delay.
func parseTasks(tasks []Task) ([]Task, error) {
for i, task := range tasks {
duration, err := ParseDuration(task.RetryDelay)
if err != nil {
return nil, fmt.Errorf(language.ErrorFailedToParseRetryDelayFromTask, task.Name, err)
// Wrapping the error with the task name to provide context.
return nil, fmt.Errorf("%s: %w", task.Name, err)
}
tasks[i].RetryDelayDuration = duration
}
Expand All @@ -118,6 +126,8 @@ func LoadTasks(filePath string) ([]Task, error) {
}
}

// ParseDuration converts a duration string into a time.Duration object.
// It returns an error if the string is empty or if the format is not recognized.
func ParseDuration(durationStr string) (time.Duration, error) {
if durationStr == "" {
return 0, fmt.Errorf(language.ErrorDurationstringisEmpty)
Expand Down
64 changes: 34 additions & 30 deletions worker/crew.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func CrewWorker(ctx context.Context, clientset *kubernetes.Clientset, shipsNames
//
// Parameters:
//
// ctx: Context for cancellation and timeout of the task processing.
// clientset: Kubernetes API client for cluster interactions.
// shipsNamespace: Namespace in Kubernetes where the task is executed.
// task: The task to be processed.
// results: Channel to return execution results to the caller.
// logger: Logger for structured logging within the worker.
// taskStatus: Map to track and control the status of tasks.
// workerIndex: Identifier for the worker instance for logging.
// ctx context.Context: Context for cancellation and timeout of the task processing.
// clientset *kubernetes.Clientset: Kubernetes API client for cluster interactions.
// shipsNamespace string: Namespace in Kubernetes where the task is executed.
// task configuration.Task: The task to be processed.
// results chan<- string: Channel to return execution results to the caller.
// logger *zap.Logger: Logger for structured logging within the worker.
// taskStatus *TaskStatusMap: Map to track and control the status of tasks.
// workerIndex int: Identifier for the worker instance for logging.
func processTask(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, results chan<- string, logger *zap.Logger, taskStatus *TaskStatusMap, workerIndex int) {
if !taskStatus.Claim(task.Name) {
return
Expand All @@ -67,12 +67,12 @@ func processTask(ctx context.Context, clientset *kubernetes.Clientset, shipsName
//
// Parameters:
//
// task: The task that has failed.
// taskStatus: Map to track and control the status of tasks.
// shipsNamespace: Namespace in Kubernetes associated with the task.
// err: The error that occurred during task processing.
// results: Channel to return execution results to the caller.
// workerIndex: Identifier for the worker instance for logging.
// task configuration.Task: The task that has failed.
// taskStatus *TaskStatusMap: Map to track and control the status of tasks.
// shipsNamespace string: Namespace in Kubernetes associated with the task.
// err error: The error that occurred during task processing.
// results chan<- string: Channel to return execution results to the caller.
// workerIndex int: Identifier for the worker instance for logging.
func handleFailedTask(task configuration.Task, taskStatus *TaskStatusMap, shipsNamespace string, err error, results chan<- string, workerIndex int) {
taskStatus.Release(task.Name)
logFinalError(shipsNamespace, task.Name, err, task.MaxRetries)
Expand All @@ -84,9 +84,9 @@ func handleFailedTask(task configuration.Task, taskStatus *TaskStatusMap, shipsN
//
// Parameters:
//
// task: The task that has been successfully completed.
// results: Channel to return execution results to the caller.
// workerIndex: Identifier for the worker instance for logging.
// task configuration.Task: The task that has been successfully completed.
// results chan<- string: Channel to return execution results to the caller.
// workerIndex int: Identifier for the worker instance for logging.
func handleSuccessfulTask(task configuration.Task, results chan<- string, workerIndex int) {
successMessage := fmt.Sprintf(language.TaskWorker_Name, workerIndex, fmt.Sprintf(language.TaskCompleteS, task.Name))
results <- successMessage
Expand All @@ -98,15 +98,17 @@ func handleSuccessfulTask(task configuration.Task, results chan<- string, worker
// it returns an error detailing the failure.
//
// Parameters:
// - ctx: Context for task cancellation and timeouts.
// - clientset: Kubernetes API client for executing tasks.
// - shipsNamespace: Kubernetes namespace for task execution.
// - task: Task to be executed.
// - results: Channel for reporting task execution results.
// - workerIndex: Index of the worker for contextual logging.
//
// ctx context.Context: Context for task cancellation and timeouts.
// clientset *kubernetes.Clientset: Kubernetes API client for executing tasks.
// shipsNamespace string: Kubernetes namespace for task execution.
// task configuration.Task: Task to be executed.
// results chan<- string: Channel for reporting task execution results.
// workerIndex int: Index of the worker for contextual logging.
//
// Returns:
// - error: Error if the task fails after all retry attempts.
//
// error: Error if the task fails after all retry attempts.
func performTaskWithRetries(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, results chan<- string, workerIndex int) error {
for attempt := 0; attempt < task.MaxRetries; attempt++ {
err := performTask(ctx, clientset, shipsNamespace, task, workerIndex)
Expand All @@ -129,10 +131,10 @@ func performTaskWithRetries(ctx context.Context, clientset *kubernetes.Clientset
//
// Parameters:
//
// ctx: The context governing cancellation.
// clientset: The Kubernetes client set used for interacting with the Kubernetes API.
// shipsnamespace: The Kubernetes namespace where the pod is located.
// task: The task containing the parameters that need to be updated with the latest pod information.
// ctx context.Context: The context governing cancellation.
// clientset *kubernetes.Clientset: The Kubernetes client set used for interacting with the Kubernetes API.
// shipsNamespace string: The Kubernetes namespace where the pod is located.
// task *configuration.Task: The task containing the parameters that need to be updated with the latest pod information.
//
// Returns:
//
Expand Down Expand Up @@ -189,10 +191,12 @@ func CrewProcessPods(ctx context.Context, pods []corev1.Pod, results chan<- stri
// It returns true if the pod is in the running phase and all its containers are ready.
//
// Parameters:
// - pod: The pod to check for health status.
//
// pod *corev1.Pod: The pod to check for health status.
//
// Returns:
// - bool: True if the pod is considered healthy, false otherwise.
//
// bool: True if the pod is considered healthy, false otherwise.
func CrewCheckingisPodHealthy(pod *corev1.Pod) bool {
// Check if the pod is in the running phase.
if pod.Status.Phase != corev1.PodRunning {
Expand Down
64 changes: 33 additions & 31 deletions worker/error_and_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
// to indicate the total number of allowed retries.
//
// Parameters:
// - taskName: The name of the task being attempted.
// - attempt: The current retry attempt number.
// - err: The error encountered during the task execution that prompted the retry.
// - maxRetries: The maximum number of retry attempts.
//
// taskName string: The name of the task being attempted.
// attempt int: The current retry attempt number.
// err error: The error encountered during the task execution that prompted the retry.
// maxRetries int: The maximum number of retry attempts.
func logRetryAttempt(taskName string, attempt int, err error, maxRetries int) {
navigator.LogErrorWithEmojiRateLimited(
constant.ErrorEmoji,
Expand All @@ -37,10 +38,11 @@ func logRetryAttempt(taskName string, attempt int, err error, maxRetries int) {
// to indicate the total number of allowed retries.
//
// Parameters:
// - shipsnamespace: The namespace where the task was attempted.
// - taskName: The name of the task that failed.
// - err: The final error encountered that resulted in the task failure.
// - maxRetries: The maximum number of retry attempts.
//
// shipsnamespace string: The namespace where the task was attempted.
// taskName string: The name of the task that failed.
// err error: The final error encountered that resulted in the task failure.
// maxRetries int: The maximum number of retry attempts.
func logFinalError(shipsnamespace string, taskName string, err error, maxRetries int) {
finalErrorMessage := fmt.Sprintf(language.ErrorFailedToCompleteTask, taskName, maxRetries)
navigator.LogErrorWithEmojiRateLimited(
Expand All @@ -59,19 +61,19 @@ func logFinalError(shipsnamespace string, taskName string, err error, maxRetries
//
// Parameters:
//
// ctx: The context governing cancellation.
// clientset: The Kubernetes client set used for task operations.
// shipsnamespace: The Kubernetes namespace where the task was attempted.
// err: The error encountered during the task execution.
// attempt: The current retry attempt number.
// task: The task being attempted.
// workerIndex: The index of the worker processing the task.
// maxRetries: The maximum number of retry attempts allowed.
// retryDelay: The duration to wait before making the next retry attempt.
// ctx context.Context: The context governing cancellation.
// *kubernetes.Clientset: The Kubernetes client set used for task operations.
// shipsnamespace string: The Kubernetes namespace where the task was attempted.
// err error: The error encountered during the task execution.
// attempt int: The current retry attempt number.
// task *configuration.Task: The task being attempted.
// workerIndex int: The index of the worker processing the task.
// maxRetries int: The maximum number of retry attempts allowed.
// retryDelay time.Duration: The duration to wait before making the next retry attempt.
//
// Returns:
//
// shouldContinue: A boolean indicating whether the task should be retried or not.
// shouldContinue bool: A boolean indicating whether the task should be retried or not.
func handleTaskError(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, err error, attempt int, task *configuration.Task, workerIndex int, maxRetries int, retryDelay time.Duration) (shouldContinue bool) {
if ctx.Err() != nil {
return false
Expand All @@ -91,14 +93,14 @@ func handleTaskError(ctx context.Context, clientset *kubernetes.Clientset, ships
//
// Parameters:
//
// ctx: The context governing cancellation.
// clientset: The Kubernetes client set used for task operations.
// shipsnamespace: The Kubernetes namespace where the task was attempted.
// task: The task being attempted.
// ctx context.Context: The context governing cancellation.
// clientset *kubernetes.Clientset: The Kubernetes client set used for task operations.
// shipsnamespace string: The Kubernetes namespace where the task was attempted.
// task *configuration.Task: The task being attempted.
//
// Returns:
//
// A boolean indicating whether the task should be retried after conflict resolution.
// bool: A boolean indicating whether the task should be retried after conflict resolution.
func handleConflictError(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task *configuration.Task) bool {
if resolveErr := resolveConflict(ctx, clientset, shipsnamespace, task); resolveErr != nil {
return false
Expand All @@ -113,17 +115,17 @@ func handleConflictError(ctx context.Context, clientset *kubernetes.Clientset, s
//
// Parameters:
//
// ctx: The context governing cancellation.
// err: The error encountered during task execution.
// attempt: The current retry attempt number.
// task: The task being attempted.
// workerIndex: The index of the worker processing the task.
// maxRetries: The maximum number of retry attempts allowed.
// retryDelay: The duration to wait before making the next retry attempt.
// ctx context.Context: The context governing cancellation.
// err error: The error encountered during task execution.
// attempt int: The current retry attempt number.
// task *configuration.Task: The task being attempted.
// workerIndex int: The index of the worker processing the task.
// maxRetries int: The maximum number of retry attempts allowed.
// retryDelay time.Duration: The duration to wait before making the next retry attempt.
//
// Returns:
//
// A boolean indicating whether the task should be retried or not.
// bool: A boolean indicating whether the task should be retried or not.
func handleGenericError(ctx context.Context, err error, attempt int, task *configuration.Task, workerIndex int, maxRetries int, retryDelay time.Duration) bool {
logRetryAttempt(task.Name, attempt, err, maxRetries)
time.Sleep(retryDelay)
Expand Down
42 changes: 32 additions & 10 deletions worker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
// getParamAsString retrieves a string value from a map based on a key.
// It returns an error if the key is not present or the value is not a string.
//
// params - a map of parameters where the key is expected to be associated with a string value.
// key - the key for which to retrieve the string value.
// params map[string]interface{}: a map of parameters where the key is expected to be associated with a string value.
// key string: the key for which to retrieve the string value.
//
// Returns the string value and nil on success, or an empty string and an error on failure.
func getParamAsString(params map[string]interface{}, key string) (string, error) {
Expand All @@ -28,8 +28,8 @@ func getParamAsString(params map[string]interface{}, key string) (string, error)
// It handles both int and float64 data types due to the way JSON and YAML unmarshal numbers.
// It returns an error if the key is not present or the value is not a number.
//
// params - a map of parameters where the key is expected to be associated with an integer value.
// key - the key for which to retrieve the integer value.
// params map[string]interface{}: a map of parameters where the key is expected to be associated with an integer value.
// key string: the key for which to retrieve the integer value.
//
// Returns the int64 value and nil on success, or 0 and an error on failure.
func getParamAsInt64(params map[string]interface{}, key string) (int64, error) {
Expand Down Expand Up @@ -63,12 +63,14 @@ func getParamAsInt64(params map[string]interface{}, key string) (int64, error) {
// is strictly numerical and prevents any unexpected types from causing issues in the application.
//
// Params:
// - params: map[string]interface{} - The map containing parameter keys and values.
// - key: string - The key corresponding to the integer parameter to be retrieved.
//
// params map[string]interface{}: The map containing parameter keys and values.
// key string: The key corresponding to the integer parameter to be retrieved.
//
// Returns:
// - int: The extracted integer value associated with the provided key.
// - error: An error if the key is not found, or if the value is not a type that can be converted to an int.
//
// int: The extracted integer value associated with the provided key.
// error: An error if the key is not found, or if the value is not a type that can be converted to an int.
func getParamAsInt(params map[string]interface{}, key string) (int, error) {
value, ok := params[key]
if !ok {
Expand All @@ -85,12 +87,22 @@ func getParamAsInt(params map[string]interface{}, key string) (int, error) {
}

// logTaskStart logs the start of a task runner with a custom message and additional fields.
// message - the message to log, which should describe the task being started.
// fields - a slice of zap.Field items that provide additional context for the log entry.
// It uses an emoji for visual emphasis in the log.
//
// message string: The message to log, which should describe the task being started.
// fields []zap.Field: A slice of zap.Field items that provide additional context for the log entry.
func logTaskStart(message string, fields []zap.Field) {
navigator.LogInfoWithEmoji(language.PirateEmoji, message, fields...)
}

// createLogFieldsForRunnerTask generates a slice of zap.Field items for structured logging.
// It is used to create log fields that describe a runner task, including the task type and namespace.
//
// task configuration.Task: The task for which to create log fields.
// shipsNamespace string: The namespace associated with the task.
// taskType string: The type of the task being logged.
//
// Returns a slice of zap.Field items that can be used for structured logging.
func createLogFieldsForRunnerTask(task configuration.Task, shipsNamespace string, taskType string) []zap.Field {
return navigator.CreateLogFields(
taskType,
Expand All @@ -99,10 +111,20 @@ func createLogFieldsForRunnerTask(task configuration.Task, shipsNamespace string
)
}

// logErrorWithFields logs an error message with additional fields for context.
// It uses an emoji and rate limiting for logging errors to avoid flooding the log with repetitive messages.
//
// err error: The error to log.
// fields []zap.Field: A slice of zap.Field items that provide additional context for the error log entry.
func logErrorWithFields(err error, fields []zap.Field) {
navigator.LogErrorWithEmojiRateLimited(language.PirateEmoji, err.Error(), fields...)
}

// logResultsFromChannel logs messages received from a channel.
// It continues to log until the channel is closed.
//
// results chan string: A channel from which to read result strings to log.
// fields []zap.Field: A slice of zap.Field items that provide additional context for each log entry.
func logResultsFromChannel(results chan string, fields []zap.Field) {
for result := range results {
navigator.LogInfoWithEmoji(language.PirateEmoji, result, fields...)
Expand Down
Loading