diff --git a/worker/captain.go b/worker/captain.go index e61694d..034b481 100644 --- a/worker/captain.go +++ b/worker/captain.go @@ -15,15 +15,17 @@ import ( // The shutdown function ensures all workers are stopped and the results channel is closed. // // Parameters: -// - ctx: Parent context to control the lifecycle of the workers. -// - clientset: Kubernetes API client for task operations. -// - shipsNamespace: Namespace in Kubernetes to perform tasks. -// - tasks: Slice of Task structs to be executed by the workers. -// - workerCount: Number of worker goroutines to start. +// +// ctx context.Context: Parent context to control the lifecycle of the workers. +// clientset *kubernetes.Clientset: Kubernetes API client for task operations. +// shipsNamespace string: Namespace in Kubernetes to perform tasks. +// tasks []configuration.Task: Slice of Task structs to be executed by the workers. +// workerCount int: Number of worker goroutines to start. // // Returns: -// - <-chan string: A read-only channel to receive task results. -// - func(): A function to call for initiating a graceful shutdown of the workers. +// +// <-chan string,: A read-only channel to receive task results. +// func()): A function to call for initiating a graceful shutdown of the workers. func CaptainTellWorkers(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, tasks []configuration.Task, workerCount int) (<-chan string, func()) { results := make(chan string) var wg sync.WaitGroup diff --git a/worker/configuration/task.go b/worker/configuration/task.go index 352bb1c..7b29865 100644 --- a/worker/configuration/task.go +++ b/worker/configuration/task.go @@ -58,6 +58,8 @@ func LoadTasksFromJSON(filePath string) ([]Task, error) { return parseTasks(tasks) } +// unmarshalJSON takes a byte slice of JSON data and unmarshals it into the provided tasks slice. +// It returns an error if the unmarshalling process fails. func unmarshalJSON(file []byte, tasks *[]Task) error { return json.Unmarshal(file, tasks) } @@ -83,15 +85,21 @@ func LoadTasksFromYAML(filePath string) ([]Task, error) { return parseTasks(tasks) } +// unmarshalYAML takes a byte slice of YAML data and unmarshals it into the provided tasks slice. +// It returns an error if the unmarshalling process fails. func unmarshalYAML(file []byte, tasks *[]Task) error { return yaml.Unmarshal(file, tasks) } +// parseTasks iterates through a slice of tasks, parsing the RetryDelay string into a time.Duration +// and updating the RetryDelayDuration field for each task. It returns the updated slice of tasks +// and any error that occurs during the parsing of the retry delay. func parseTasks(tasks []Task) ([]Task, error) { for i, task := range tasks { duration, err := ParseDuration(task.RetryDelay) if err != nil { - return nil, fmt.Errorf(language.ErrorFailedToParseRetryDelayFromTask, task.Name, err) + // Wrapping the error with the task name to provide context. + return nil, fmt.Errorf("%s: %w", task.Name, err) } tasks[i].RetryDelayDuration = duration } @@ -118,6 +126,8 @@ func LoadTasks(filePath string) ([]Task, error) { } } +// ParseDuration converts a duration string into a time.Duration object. +// It returns an error if the string is empty or if the format is not recognized. func ParseDuration(durationStr string) (time.Duration, error) { if durationStr == "" { return 0, fmt.Errorf(language.ErrorDurationstringisEmpty) diff --git a/worker/crew.go b/worker/crew.go index b988655..27b8ce0 100644 --- a/worker/crew.go +++ b/worker/crew.go @@ -41,14 +41,14 @@ func CrewWorker(ctx context.Context, clientset *kubernetes.Clientset, shipsNames // // Parameters: // -// ctx: Context for cancellation and timeout of the task processing. -// clientset: Kubernetes API client for cluster interactions. -// shipsNamespace: Namespace in Kubernetes where the task is executed. -// task: The task to be processed. -// results: Channel to return execution results to the caller. -// logger: Logger for structured logging within the worker. -// taskStatus: Map to track and control the status of tasks. -// workerIndex: Identifier for the worker instance for logging. +// ctx context.Context: Context for cancellation and timeout of the task processing. +// clientset *kubernetes.Clientset: Kubernetes API client for cluster interactions. +// shipsNamespace string: Namespace in Kubernetes where the task is executed. +// task configuration.Task: The task to be processed. +// results chan<- string: Channel to return execution results to the caller. +// logger *zap.Logger: Logger for structured logging within the worker. +// taskStatus *TaskStatusMap: Map to track and control the status of tasks. +// workerIndex int: Identifier for the worker instance for logging. func processTask(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, results chan<- string, logger *zap.Logger, taskStatus *TaskStatusMap, workerIndex int) { if !taskStatus.Claim(task.Name) { return @@ -67,12 +67,12 @@ func processTask(ctx context.Context, clientset *kubernetes.Clientset, shipsName // // Parameters: // -// task: The task that has failed. -// taskStatus: Map to track and control the status of tasks. -// shipsNamespace: Namespace in Kubernetes associated with the task. -// err: The error that occurred during task processing. -// results: Channel to return execution results to the caller. -// workerIndex: Identifier for the worker instance for logging. +// task configuration.Task: The task that has failed. +// taskStatus *TaskStatusMap: Map to track and control the status of tasks. +// shipsNamespace string: Namespace in Kubernetes associated with the task. +// err error: The error that occurred during task processing. +// results chan<- string: Channel to return execution results to the caller. +// workerIndex int: Identifier for the worker instance for logging. func handleFailedTask(task configuration.Task, taskStatus *TaskStatusMap, shipsNamespace string, err error, results chan<- string, workerIndex int) { taskStatus.Release(task.Name) logFinalError(shipsNamespace, task.Name, err, task.MaxRetries) @@ -84,9 +84,9 @@ func handleFailedTask(task configuration.Task, taskStatus *TaskStatusMap, shipsN // // Parameters: // -// task: The task that has been successfully completed. -// results: Channel to return execution results to the caller. -// workerIndex: Identifier for the worker instance for logging. +// task configuration.Task: The task that has been successfully completed. +// results chan<- string: Channel to return execution results to the caller. +// workerIndex int: Identifier for the worker instance for logging. func handleSuccessfulTask(task configuration.Task, results chan<- string, workerIndex int) { successMessage := fmt.Sprintf(language.TaskWorker_Name, workerIndex, fmt.Sprintf(language.TaskCompleteS, task.Name)) results <- successMessage @@ -98,15 +98,17 @@ func handleSuccessfulTask(task configuration.Task, results chan<- string, worker // it returns an error detailing the failure. // // Parameters: -// - ctx: Context for task cancellation and timeouts. -// - clientset: Kubernetes API client for executing tasks. -// - shipsNamespace: Kubernetes namespace for task execution. -// - task: Task to be executed. -// - results: Channel for reporting task execution results. -// - workerIndex: Index of the worker for contextual logging. +// +// ctx context.Context: Context for task cancellation and timeouts. +// clientset *kubernetes.Clientset: Kubernetes API client for executing tasks. +// shipsNamespace string: Kubernetes namespace for task execution. +// task configuration.Task: Task to be executed. +// results chan<- string: Channel for reporting task execution results. +// workerIndex int: Index of the worker for contextual logging. // // Returns: -// - error: Error if the task fails after all retry attempts. +// +// error: Error if the task fails after all retry attempts. func performTaskWithRetries(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, results chan<- string, workerIndex int) error { for attempt := 0; attempt < task.MaxRetries; attempt++ { err := performTask(ctx, clientset, shipsNamespace, task, workerIndex) @@ -129,10 +131,10 @@ func performTaskWithRetries(ctx context.Context, clientset *kubernetes.Clientset // // Parameters: // -// ctx: The context governing cancellation. -// clientset: The Kubernetes client set used for interacting with the Kubernetes API. -// shipsnamespace: The Kubernetes namespace where the pod is located. -// task: The task containing the parameters that need to be updated with the latest pod information. +// ctx context.Context: The context governing cancellation. +// clientset *kubernetes.Clientset: The Kubernetes client set used for interacting with the Kubernetes API. +// shipsNamespace string: The Kubernetes namespace where the pod is located. +// task *configuration.Task: The task containing the parameters that need to be updated with the latest pod information. // // Returns: // @@ -189,10 +191,12 @@ func CrewProcessPods(ctx context.Context, pods []corev1.Pod, results chan<- stri // It returns true if the pod is in the running phase and all its containers are ready. // // Parameters: -// - pod: The pod to check for health status. +// +// pod *corev1.Pod: The pod to check for health status. // // Returns: -// - bool: True if the pod is considered healthy, false otherwise. +// +// bool: True if the pod is considered healthy, false otherwise. func CrewCheckingisPodHealthy(pod *corev1.Pod) bool { // Check if the pod is in the running phase. if pod.Status.Phase != corev1.PodRunning { diff --git a/worker/error_and_retry.go b/worker/error_and_retry.go index fc48a97..8771108 100644 --- a/worker/error_and_retry.go +++ b/worker/error_and_retry.go @@ -19,10 +19,11 @@ import ( // to indicate the total number of allowed retries. // // Parameters: -// - taskName: The name of the task being attempted. -// - attempt: The current retry attempt number. -// - err: The error encountered during the task execution that prompted the retry. -// - maxRetries: The maximum number of retry attempts. +// +// taskName string: The name of the task being attempted. +// attempt int: The current retry attempt number. +// err error: The error encountered during the task execution that prompted the retry. +// maxRetries int: The maximum number of retry attempts. func logRetryAttempt(taskName string, attempt int, err error, maxRetries int) { navigator.LogErrorWithEmojiRateLimited( constant.ErrorEmoji, @@ -37,10 +38,11 @@ func logRetryAttempt(taskName string, attempt int, err error, maxRetries int) { // to indicate the total number of allowed retries. // // Parameters: -// - shipsnamespace: The namespace where the task was attempted. -// - taskName: The name of the task that failed. -// - err: The final error encountered that resulted in the task failure. -// - maxRetries: The maximum number of retry attempts. +// +// shipsnamespace string: The namespace where the task was attempted. +// taskName string: The name of the task that failed. +// err error: The final error encountered that resulted in the task failure. +// maxRetries int: The maximum number of retry attempts. func logFinalError(shipsnamespace string, taskName string, err error, maxRetries int) { finalErrorMessage := fmt.Sprintf(language.ErrorFailedToCompleteTask, taskName, maxRetries) navigator.LogErrorWithEmojiRateLimited( @@ -59,19 +61,19 @@ func logFinalError(shipsnamespace string, taskName string, err error, maxRetries // // Parameters: // -// ctx: The context governing cancellation. -// clientset: The Kubernetes client set used for task operations. -// shipsnamespace: The Kubernetes namespace where the task was attempted. -// err: The error encountered during the task execution. -// attempt: The current retry attempt number. -// task: The task being attempted. -// workerIndex: The index of the worker processing the task. -// maxRetries: The maximum number of retry attempts allowed. -// retryDelay: The duration to wait before making the next retry attempt. +// ctx context.Context: The context governing cancellation. +// *kubernetes.Clientset: The Kubernetes client set used for task operations. +// shipsnamespace string: The Kubernetes namespace where the task was attempted. +// err error: The error encountered during the task execution. +// attempt int: The current retry attempt number. +// task *configuration.Task: The task being attempted. +// workerIndex int: The index of the worker processing the task. +// maxRetries int: The maximum number of retry attempts allowed. +// retryDelay time.Duration: The duration to wait before making the next retry attempt. // // Returns: // -// shouldContinue: A boolean indicating whether the task should be retried or not. +// shouldContinue bool: A boolean indicating whether the task should be retried or not. func handleTaskError(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, err error, attempt int, task *configuration.Task, workerIndex int, maxRetries int, retryDelay time.Duration) (shouldContinue bool) { if ctx.Err() != nil { return false @@ -91,14 +93,14 @@ func handleTaskError(ctx context.Context, clientset *kubernetes.Clientset, ships // // Parameters: // -// ctx: The context governing cancellation. -// clientset: The Kubernetes client set used for task operations. -// shipsnamespace: The Kubernetes namespace where the task was attempted. -// task: The task being attempted. +// ctx context.Context: The context governing cancellation. +// clientset *kubernetes.Clientset: The Kubernetes client set used for task operations. +// shipsnamespace string: The Kubernetes namespace where the task was attempted. +// task *configuration.Task: The task being attempted. // // Returns: // -// A boolean indicating whether the task should be retried after conflict resolution. +// bool: A boolean indicating whether the task should be retried after conflict resolution. func handleConflictError(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task *configuration.Task) bool { if resolveErr := resolveConflict(ctx, clientset, shipsnamespace, task); resolveErr != nil { return false @@ -113,17 +115,17 @@ func handleConflictError(ctx context.Context, clientset *kubernetes.Clientset, s // // Parameters: // -// ctx: The context governing cancellation. -// err: The error encountered during task execution. -// attempt: The current retry attempt number. -// task: The task being attempted. -// workerIndex: The index of the worker processing the task. -// maxRetries: The maximum number of retry attempts allowed. -// retryDelay: The duration to wait before making the next retry attempt. +// ctx context.Context: The context governing cancellation. +// err error: The error encountered during task execution. +// attempt int: The current retry attempt number. +// task *configuration.Task: The task being attempted. +// workerIndex int: The index of the worker processing the task. +// maxRetries int: The maximum number of retry attempts allowed. +// retryDelay time.Duration: The duration to wait before making the next retry attempt. // // Returns: // -// A boolean indicating whether the task should be retried or not. +// bool: A boolean indicating whether the task should be retried or not. func handleGenericError(ctx context.Context, err error, attempt int, task *configuration.Task, workerIndex int, maxRetries int, retryDelay time.Duration) bool { logRetryAttempt(task.Name, attempt, err, maxRetries) time.Sleep(retryDelay) diff --git a/worker/helper.go b/worker/helper.go index 6b3d729..fd33702 100644 --- a/worker/helper.go +++ b/worker/helper.go @@ -12,8 +12,8 @@ import ( // getParamAsString retrieves a string value from a map based on a key. // It returns an error if the key is not present or the value is not a string. // -// params - a map of parameters where the key is expected to be associated with a string value. -// key - the key for which to retrieve the string value. +// params map[string]interface{}: a map of parameters where the key is expected to be associated with a string value. +// key string: the key for which to retrieve the string value. // // Returns the string value and nil on success, or an empty string and an error on failure. func getParamAsString(params map[string]interface{}, key string) (string, error) { @@ -28,8 +28,8 @@ func getParamAsString(params map[string]interface{}, key string) (string, error) // It handles both int and float64 data types due to the way JSON and YAML unmarshal numbers. // It returns an error if the key is not present or the value is not a number. // -// params - a map of parameters where the key is expected to be associated with an integer value. -// key - the key for which to retrieve the integer value. +// params map[string]interface{}: a map of parameters where the key is expected to be associated with an integer value. +// key string: the key for which to retrieve the integer value. // // Returns the int64 value and nil on success, or 0 and an error on failure. func getParamAsInt64(params map[string]interface{}, key string) (int64, error) { @@ -63,12 +63,14 @@ func getParamAsInt64(params map[string]interface{}, key string) (int64, error) { // is strictly numerical and prevents any unexpected types from causing issues in the application. // // Params: -// - params: map[string]interface{} - The map containing parameter keys and values. -// - key: string - The key corresponding to the integer parameter to be retrieved. +// +// params map[string]interface{}: The map containing parameter keys and values. +// key string: The key corresponding to the integer parameter to be retrieved. // // Returns: -// - int: The extracted integer value associated with the provided key. -// - error: An error if the key is not found, or if the value is not a type that can be converted to an int. +// +// int: The extracted integer value associated with the provided key. +// error: An error if the key is not found, or if the value is not a type that can be converted to an int. func getParamAsInt(params map[string]interface{}, key string) (int, error) { value, ok := params[key] if !ok { @@ -85,12 +87,22 @@ func getParamAsInt(params map[string]interface{}, key string) (int, error) { } // logTaskStart logs the start of a task runner with a custom message and additional fields. -// message - the message to log, which should describe the task being started. -// fields - a slice of zap.Field items that provide additional context for the log entry. +// It uses an emoji for visual emphasis in the log. +// +// message string: The message to log, which should describe the task being started. +// fields []zap.Field: A slice of zap.Field items that provide additional context for the log entry. func logTaskStart(message string, fields []zap.Field) { navigator.LogInfoWithEmoji(language.PirateEmoji, message, fields...) } +// createLogFieldsForRunnerTask generates a slice of zap.Field items for structured logging. +// It is used to create log fields that describe a runner task, including the task type and namespace. +// +// task configuration.Task: The task for which to create log fields. +// shipsNamespace string: The namespace associated with the task. +// taskType string: The type of the task being logged. +// +// Returns a slice of zap.Field items that can be used for structured logging. func createLogFieldsForRunnerTask(task configuration.Task, shipsNamespace string, taskType string) []zap.Field { return navigator.CreateLogFields( taskType, @@ -99,10 +111,20 @@ func createLogFieldsForRunnerTask(task configuration.Task, shipsNamespace string ) } +// logErrorWithFields logs an error message with additional fields for context. +// It uses an emoji and rate limiting for logging errors to avoid flooding the log with repetitive messages. +// +// err error: The error to log. +// fields []zap.Field: A slice of zap.Field items that provide additional context for the error log entry. func logErrorWithFields(err error, fields []zap.Field) { navigator.LogErrorWithEmojiRateLimited(language.PirateEmoji, err.Error(), fields...) } +// logResultsFromChannel logs messages received from a channel. +// It continues to log until the channel is closed. +// +// results chan string: A channel from which to read result strings to log. +// fields []zap.Field: A slice of zap.Field items that provide additional context for each log entry. func logResultsFromChannel(results chan string, fields []zap.Field) { for result := range results { navigator.LogInfoWithEmoji(language.PirateEmoji, result, fields...) diff --git a/worker/labels_pods.go b/worker/labels_pods.go index 7e3bf9d..edef736 100644 --- a/worker/labels_pods.go +++ b/worker/labels_pods.go @@ -16,15 +16,17 @@ import ( // It fetches the latest version of the pod to ensure the update is based on the current state of the pod. // // Parameters: -// - ctx: A context.Context for managing cancellation and deadlines. -// - clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. -// - podName: The name of the pod to label. -// - namespace: The namespace in which the pod is located. -// - labelKey: The key of the label to be added or updated. -// - labelValue: The value for the label. +// +// ctx context.Context: A context.Context for managing cancellation and deadlines. +// clientset *kubernetes.Clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. +// podName: The name of the pod to label. +// namespace: The namespace in which the pod is located. +// labelKey: The key of the label to be added or updated. +// labelValue string: The value for the label. // // Returns: -// - An error if the pod cannot be retrieved or updated with the new label. +// +// error: An error if the pod cannot be retrieved or updated with the new label. func labelSinglePodWithResourceVersion(ctx context.Context, clientset *kubernetes.Clientset, podName, namespace, labelKey, labelValue string) error { latestPod, err := fetchLatestPodVersion(ctx, clientset, podName, namespace) if err != nil { @@ -42,14 +44,16 @@ func labelSinglePodWithResourceVersion(ctx context.Context, clientset *kubernete // This is necessary to avoid conflicts when updating the pod's labels. // // Parameters: -// - ctx: A context.Context for managing cancellation and deadlines. -// - clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. -// - podName: The name of the pod to retrieve. -// - namespace: The namespace in which the pod is located. +// +// ctx context.Context: A context.Context for managing cancellation and deadlines. +// clientset *kubernetes.Clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. +// podName: The name of the pod to retrieve. +// namespace string: The namespace in which the pod is located. // // Returns: -// - A pointer to the retrieved corev1.Pod instance. -// - An error if the pod cannot be retrieved. +// +// *corev1.Pod: A pointer to the retrieved corev1.Pod instance. +// error: An error if the pod cannot be retrieved. func fetchLatestPodVersion(ctx context.Context, clientset *kubernetes.Clientset, podName, namespace string) (*corev1.Pod, error) { return clientset.CoreV1().Pods(namespace).Get(ctx, podName, v1.GetOptions{}) } @@ -58,12 +62,14 @@ func fetchLatestPodVersion(ctx context.Context, clientset *kubernetes.Clientset, // It compares the existing labels of the pod to the desired label. // // Parameters: -// - pod: A pointer to the corev1.Pod instance to check. -// - labelKey: The key of the label to be added or updated. -// - labelValue: The value for the label. +// +// pod *corev1.Pod: A pointer to the corev1.Pod instance to check. +// labelKey: The key of the label to be added or updated. +// labelValue string: The value for the label. // // Returns: -// - True if the pod needs to be updated, false otherwise. +// +// bool: True if the pod needs to be updated, false otherwise. func shouldUpdatePod(pod *corev1.Pod, labelKey, labelValue string) bool { return pod.Labels[labelKey] != labelValue } @@ -72,16 +78,18 @@ func shouldUpdatePod(pod *corev1.Pod, labelKey, labelValue string) bool { // It ensures that only the labels are updated, leaving the rest of the pod configuration unchanged. // // Parameters: -// - ctx: A context.Context for managing cancellation and deadlines. -// - clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. -// - pod: A pointer to the corev1.Pod instance to update. -// - namespace: The namespace in which the pod is located. -// - podName: The name of the pod to update. -// - labelKey: The key of the label to be added or updated. -// - labelValue: The value for the label. +// +// ctx context.Context: A context.Context for managing cancellation and deadlines. +// clientset *kubernetes.Clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. +// pod *corev1.Pod: A pointer to the corev1.Pod instance to update. +// namespace: The namespace in which the pod is located. +// podName: The name of the pod to update. +// labelKey: The key of the label to be added or updated. +// labelValue string: The value for the label. // // Returns: -// - An error if the patch cannot be created or applied to the pod. +// +// error: An error if the patch cannot be created or applied to the pod. func updatePodLabels(ctx context.Context, clientset *kubernetes.Clientset, pod *corev1.Pod, namespace, podName, labelKey, labelValue string) error { pod.Labels = getUpdatedLabels(pod.Labels, labelKey, labelValue) @@ -106,12 +114,14 @@ func updatePodLabels(ctx context.Context, clientset *kubernetes.Clientset, pod * // If the original labels map is nil, it initializes a new map before adding the label. // // Parameters: -// - labels: The original map of labels to update. -// - labelKey: The key of the label to be added or updated. -// - labelValue: The value for the label. +// +// labels map[string]string: The original map of labels to update. +// labelKey: The key of the label to be added or updated. +// labelValue string: The value for the label. // // Returns: -// - A new map of labels with the updated label included. +// +// map[string]string: A new map of labels with the updated label included. func getUpdatedLabels(labels map[string]string, labelKey, labelValue string) map[string]string { if labels == nil { labels = make(map[string]string) @@ -124,11 +134,13 @@ func getUpdatedLabels(labels map[string]string, labelKey, labelValue string) map // This helps in identifying which pod encountered the error when multiple pods are being processed. // // Parameters: -// - podName: The name of the pod related to the error. -// - err: The original error to wrap with additional context. +// +// podName string: The name of the pod related to the error. +// err error: The original error to wrap with additional context. // // Returns: -// - An error that includes the pod name and the original error message. +// +// error: An error that includes the pod name and the original error message. func wrapPodError(podName string, err error) error { return fmt.Errorf(language.ErrorFailedToUpdateLabelSPods, podName, err) } @@ -138,14 +150,16 @@ func wrapPodError(podName string, err error) error { // to the labelSinglePod function. // // Parameters: -// - ctx: A context.Context for managing cancellation and deadlines. -// - clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. -// - namespace: The namespace in which the pods are located. -// - labelKey: The key of the label to be added or updated. -// - labelValue: The value for the label. +// +// ctx context.Context: A context.Context for managing cancellation and deadlines. +// clientset *kubernetes.Clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. +// namespace: The namespace in which the pods are located. +// labelKey: The key of the label to be added or updated. +// labelValue string: The value for the label. // // Returns: -// - An error if listing pods or updating any pod's labels fails. +// +// error: An error if listing pods or updating any pod's labels fails. func LabelPods(ctx context.Context, clientset *kubernetes.Clientset, namespace, labelKey, labelValue string) error { // Retrieve a list of all pods in the given namespace using the provided context. pods, err := clientset.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{}) @@ -167,15 +181,17 @@ func LabelPods(ctx context.Context, clientset *kubernetes.Clientset, namespace, // if the label is not already set to the desired value. // // Parameters: -// - ctx: A context.Context for managing cancellation and deadlines. -// - clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. -// - pod: A pointer to the corev1.Pod instance to label. -// - namespace: The namespace in which the pod is located. -// - labelKey: The key of the label to be added or updated. -// - labelValue: The value for the label. +// +// ctx context.Context: A context.Context for managing cancellation and deadlines. +// clientset *kubernetes.Clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API. +// pod *corev1.Pod: A pointer to the corev1.Pod instance to label. +// namespace: The namespace in which the pod is located. +// labelKey: The key of the label to be added or updated. +// labelValue string: The value for the label. // // Returns: -// - An error if the pod's labels cannot be updated. +// +// error: An error if the pod's labels cannot be updated. func labelSinglePod(ctx context.Context, clientset *kubernetes.Clientset, pod *corev1.Pod, namespace, labelKey, labelValue string) error { // If the pod already has the label with the correct value, skip updating. if pod.Labels[labelKey] == labelValue { @@ -205,12 +221,14 @@ func labelSinglePod(ctx context.Context, clientset *kubernetes.Clientset, pod *c // before they are used to label pods. // // Parameters: -// - parameters: A map of interface{} values that should contain the labeling parameters. +// +// parameters map[string]interface{}: A map of interface{} values that should contain the labeling parameters. // // Returns: -// - labelKey: The extracted label key as a string if present and of type string. -// - labelValue: The extracted label value as a string if present and of type string. -// - err: An error if either the label key or value is missing from the parameters or is not a string. +// +// labelKey string: The extracted label key as a string if present and of type string. +// labelValue string: The extracted label value as a string if present and of type string. +// err error: An error if either the label key or value is missing from the parameters or is not a string. // // The function will return an error if the required parameters ("labelKey" and "labelValue") are // not found in the input map, or if they are not of type string. This error can then be handled diff --git a/worker/list_options.go b/worker/list_options.go index 8693a83..01f2d14 100644 --- a/worker/list_options.go +++ b/worker/list_options.go @@ -11,7 +11,7 @@ import ( // It extracts 'labelSelector', 'fieldSelector', and 'limit' from the map. // This function is designed to parse and validate the parameters required for listing Kubernetes resources. // -// params - a map containing the keys and values for constructing the ListOptions. +// params map[string]interface{}: a map containing the keys and values for constructing the ListOptions. // // Expected keys are 'labelSelector', 'fieldSelector', and 'limit'. // diff --git a/worker/list_pods.go b/worker/list_pods.go index 72d11b4..ca453c0 100644 --- a/worker/list_pods.go +++ b/worker/list_pods.go @@ -16,16 +16,16 @@ import ( // // Parameters: // -// - ctx: A context.Context object, which governs the lifetime of the request to the Kubernetes API. -// It can be used to cancel the request, set deadlines, or pass request-scoped values. -// - clientset: A *kubernetes.Clientset that provides access to the Kubernetes API. -// - namespace: A string specifying the namespace from which to list the Pods. Namespaces are a way to divide cluster resources. -// - listOptions: A v1.ListOptions struct that defines the conditions and limits for the API query, such as label and field selectors. +// ctx context.Context: A context.Context object, which governs the lifetime of the request to the Kubernetes API. +// It can be used to cancel the request, set deadlines, or pass request-scoped values. +// clientset *kubernetes.Clientset: A *kubernetes.Clientset that provides access to the Kubernetes API. +// namespace string: A string specifying the namespace from which to list the Pods. Namespaces are a way to divide cluster resources. +// listOptions v1.ListOptions: A v1.ListOptions struct that defines the conditions and limits for the API query, such as label and field selectors. // // Returns: // -// - A pointer to a corev1.PodList containing the Pods that match the list options, along with metadata about the list. -// - An error if the call to the Kubernetes API fails, otherwise nil. +// *corev1.PodList: A pointer to a corev1.PodList containing the Pods that match the list options, along with metadata about the list. +// error: An error if the call to the Kubernetes API fails, otherwise nil. func listPods(ctx context.Context, clientset *kubernetes.Clientset, namespace string, listOptions v1.ListOptions) (*corev1.PodList, error) { pods, err := clientset.CoreV1().Pods(namespace).List(ctx, listOptions) if err != nil { diff --git a/worker/logs_pods.go b/worker/logs_pods.go index 07b2ea3..9237203 100644 --- a/worker/logs_pods.go +++ b/worker/logs_pods.go @@ -16,8 +16,9 @@ import ( // pod list and the actual logging of pod information. // // Parameters: -// - baseFields: A slice of zap.Field structs providing contextual logging information. -// - podList: A pointer to a corev1.PodList containing the list of pods to log. +// +// baseFields []zap.Field: A slice of zap.Field structs providing contextual logging information. +// podList *corev1.PodList: A pointer to a corev1.PodList containing the list of pods to log. func logPods(baseFields []zap.Field, podList *corev1.PodList) { for _, pod := range podList.Items { logPod(baseFields, &pod) @@ -30,8 +31,9 @@ func logPods(baseFields []zap.Field, podList *corev1.PodList) { // for potential reuse in other contexts where individual pod logging is required. // // Parameters: -// - baseFields: A slice of zap.Field structs providing contextual logging information. -// - pod: A pointer to a corev1.Pod representing the pod to log information about. +// +// baseFields []zap.Field: A slice of zap.Field structs providing contextual logging information. +// pod *corev1.Pod: A pointer to a corev1.Pod representing the pod to log information about. func logPod(baseFields []zap.Field, pod *corev1.Pod) { podFields := append([]zap.Field(nil), baseFields...) podFields = append(podFields, zap.String(language.PodsName, pod.Name), zap.String(language.PodStatus, string(pod.Status.Phase))) @@ -43,11 +45,13 @@ func logPod(baseFields []zap.Field, pod *corev1.Pod) { // allowing for asynchronous processing of the results. // // Parameters: -// - ctx: A context.Context to allow for cancellation of the health checks. -// - podList: A pointer to a corev1.PodList containing the pods to be checked. +// +// ctx context.Context: A context.Context to allow for cancellation of the health checks. +// podList *corev1.PodList: A pointer to a corev1.PodList containing the pods to be checked. // // Returns: -// - A channel of strings, where each string represents a pod's health status message. +// +// chan string: A channel of strings, where each string represents a pods health status message. func (c *CrewProcessCheckHealthTask) checkPodsHealth(ctx context.Context, podList *corev1.PodList) chan string { results := make(chan string, len(podList.Items)) go c.checkHealthWorker(ctx, podList, results) @@ -60,9 +64,10 @@ func (c *CrewProcessCheckHealthTask) checkPodsHealth(ctx context.Context, podLis // concurrently for efficiency. // // Parameters: -// - ctx: A context.Context to allow for cancellation of the health checks. -// - podList: A pointer to a corev1.PodList containing the pods to be checked. -// - results: A channel for sending back health status messages. +// +// ctx context.Context: A context.Context to allow for cancellation of the health checks. +// podList *corev1.PodList: A pointer to a corev1.PodList containing the pods to be checked. +// results chan<- string: A channel for sending back health status messages. func (c *CrewProcessCheckHealthTask) checkHealthWorker(ctx context.Context, podList *corev1.PodList, results chan<- string) { defer close(results) for _, pod := range podList.Items { @@ -84,11 +89,13 @@ func (c *CrewProcessCheckHealthTask) checkHealthWorker(ctx context.Context, podL // effectively decouples the logging of results from the health checking process. // // Parameters: -// - ctx: A context.Context to allow for cancellation of the logging process. -// - results: A channel from which to read health status messages. +// +// ctx context.Context: A context.Context to allow for cancellation of the logging process. +// results chan string: A channel from which to read health status messages. // // Returns: -// - An error if the context is cancelled, signaling incomplete logging. +// +// error: An error if the context is cancelled, signaling incomplete logging. func (c *CrewProcessCheckHealthTask) logResults(ctx context.Context, results chan string) error { for { select { diff --git a/worker/pvc_storage.go b/worker/pvc_storage.go index 5a533b6..7008f00 100644 --- a/worker/pvc_storage.go +++ b/worker/pvc_storage.go @@ -14,12 +14,13 @@ import ( // createPVC creates a persistent volume claim (PVC) in the specified namespace. // // Parameters: -// - ctx: Context for cancellation and timeout. -// - clientset: A Kubernetes clientset to interact with the Kubernetes API. -// - shipsNamespace: The Kubernetes namespace in which to create the PVC. -// - storageClassName: The name of the storage class to use for the PVC. -// - pvcName: The name of the PVC to create. -// - storageSize: The size of the PVC in gigabytes. +// +// ctx context.Context: Context for cancellation and timeout. +// clientset *kubernetes.Clientset: A Kubernetes clientset to interact with the Kubernetes API. +// shipsNamespace: The Kubernetes namespace in which to create the PVC. +// storageClassName: The name of the storage class to use for the PVC. +// pvcName: The name of the PVC to create. +// storageSize string: The size of the PVC in gigabytes. // // Returns an error if the PVC cannot be created. func createPVC(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace, storageClassName, pvcName, storageSize string) error { diff --git a/worker/scale.go b/worker/scale.go index 15b71c5..4c18b07 100644 --- a/worker/scale.go +++ b/worker/scale.go @@ -20,16 +20,20 @@ import ( // sent through the results channel, and logs are produced accordingly. // // Parameters: -// - ctx: Context for cancellation and timeout of the scaling process. -// - clientset: Kubernetes API client for interacting with the cluster. -// - namespace: The namespace of the deployment. -// - deploymentName: The name of the deployment to scale. -// - scale: The desired number of replicas to scale to. -// - results: A channel for sending the results of the scaling operation. -// - logger: A structured logger for logging information and errors. +// +// ctx context.Context: Context for cancellation and timeout of the scaling process. +// clientset *kubernetes.Clientset: Kubernetes API client for interacting with the cluster. +// namespace string: The namespace of the deployment. +// deploymentName string: The name of the deployment to scale. +// scale int: The desired number of replicas to scale to. +// maxRetries int: The maximum number of retries for the scaling operation. +// retryDelay time.Duration: The duration to wait before retrying the scaling operation. +// results chan<- string: A channel for sending the results of the scaling operation. +// logger *zap.Logger: A structured logger for logging information and errors. // // Returns: -// - error: An error if scaling fails after all retries, or nil on success. +// +// error: An error if scaling fails after all retries, or nil on success. func ScaleDeployment(ctx context.Context, clientset *kubernetes.Clientset, namespace string, deploymentName string, scale int, maxRetries int, retryDelay time.Duration, results chan<- string, logger *zap.Logger) error { var lastScaleErr error for attempt := 0; attempt < maxRetries; attempt++ { @@ -73,14 +77,16 @@ func ScaleDeployment(ctx context.Context, clientset *kubernetes.Clientset, names // It updates the deployment's replica count and handles any errors that occur during the update process. // // Parameters: -// - ctx: Context for cancellation and timeout of the scaling operation. -// - clientset: Kubernetes API client for interacting with the cluster. -// - namespace: The namespace of the deployment. -// - deploymentName: The name of the deployment to scale. -// - scale: The desired number of replicas to scale to. +// +// ctx context.Context: Context for cancellation and timeout of the scaling operation. +// clientset *kubernetes.Clientset: Kubernetes API client for interacting with the cluster. +// namespace string: The namespace of the deployment. +// deploymentName string: The name of the deployment to scale. +// scale int: The desired number of replicas to scale to. // // Returns: -// - error: An error if the scaling operation fails, or nil if the operation is successful. +// +// error: An error if the scaling operation fails, or nil if the operation is successful. func scaleDeploymentOnce(ctx context.Context, clientset *kubernetes.Clientset, namespace string, deploymentName string, scale int) error { // Get the current deployment. deployment, getErr := clientset.AppsV1().Deployments(namespace).Get(ctx, deploymentName, v1.GetOptions{}) @@ -104,10 +110,12 @@ func scaleDeploymentOnce(ctx context.Context, clientset *kubernetes.Clientset, n // This is a helper function used to assign values to fields that expect a pointer to an int32. // // Parameters: -// - i: The int32 value to convert. +// +// i int32: The int32 value to convert. // // Returns: -// - *int32: A pointer to the int32 value. +// +// *int32: A pointer to the int32 value. func int32Ptr(i int32) *int32 { return &i } diff --git a/worker/setup.go b/worker/setup.go index 8249aa5..f2ba3d7 100644 --- a/worker/setup.go +++ b/worker/setup.go @@ -16,8 +16,9 @@ import ( // or the kubeconfig file, depending on the environment. // // Returns: -// - A pointer to a kubernetes.Clientset ready for Kubernetes API interactions. -// - An error if the configuration fails or the client cannot be created. +// +// *kubernetes.Clientset: A pointer to a Kubernetes Clientset ready for API interactions. +// error: An error if the configuration fails or the client cannot be created. func NewKubernetesClient() (*kubernetes.Clientset, error) { config, err := rest.InClusterConfig() if err != nil { @@ -41,8 +42,9 @@ func NewKubernetesClient() (*kubernetes.Clientset, error) { // buildOutOfClusterConfig attempts to build a configuration from the kubeconfig file. // // Returns: -// - A configuration object for the Kubernetes client. -// - An error if the kubeconfig file cannot be found or is invalid. +// +// *rest.Config: A configuration object for the Kubernetes client. +// error: An error if the kubeconfig file cannot be found or is invalid. func buildOutOfClusterConfig() (*rest.Config, error) { homeDir, found := os.LookupEnv(homeEnvVar) if !found { diff --git a/worker/tasks_crew.go b/worker/tasks_crew.go index b22678a..dcdcb00 100644 --- a/worker/tasks_crew.go +++ b/worker/tasks_crew.go @@ -16,6 +16,8 @@ import ( ) // InitializeTasks loads tasks from the specified configuration file. +// filePath is the path to the configuration file that contains the task definitions. +// It returns a slice of Task structs loaded from the configuration file and any error encountered. func InitializeTasks(filePath string) ([]configuration.Task, error) { return configuration.LoadTasks(filePath) } @@ -211,6 +213,11 @@ func (c *CrewScaleDeployments) Run(ctx context.Context, clientset *kubernetes.Cl return nil } +// extractScaleParameters extracts the scaling parameters 'deploymentName' and 'replicas' from the task parameters. +// +// It validates the parameters and returns them along with any error encountered. +// task is the configuration.Task struct containing the parameters. +// Returns the deployment name, the number of replicas, the retry delay duration, and any error encountered. func (c *CrewScaleDeployments) extractScaleParameters(task configuration.Task) (string, int, time.Duration, error) { deploymentName, err := getParamAsString(task.Parameters, deploYmentName) if err != nil { @@ -230,6 +237,20 @@ func (c *CrewScaleDeployments) extractScaleParameters(task configuration.Task) ( return deploymentName, replicas, retryDelayDuration, nil } +// performScaling carries out the scaling operation for a Kubernetes deployment. +// +// It uses the provided Kubernetes clientset to change the number of replicas for the specified deployment. +// The operation is retried up to maxRetries times with a delay of retryDelayDuration between attempts. +// The results of the operation are sent to the provided results channel. +// ctx is the context for cancellation and deadlines. +// clientset is the Kubernetes clientset for API interactions. +// shipsNamespace is the namespace where the deployment resides. +// deploymentName is the name of the deployment to scale. +// replicas is the desired number of replicas. +// maxRetries is the maximum number of retry attempts. +// retryDelayDuration is the duration to wait between retries. +// results is a channel for sending the results of the scaling operation. +// Returns an error if the scaling operation fails. func (c *CrewScaleDeployments) performScaling(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace, deploymentName string, replicas, maxRetries int, retryDelayDuration time.Duration, results chan<- string) error { return ScaleDeployment(ctx, clientset, shipsNamespace, deploymentName, replicas, maxRetries, retryDelayDuration, results, zap.L()) } diff --git a/worker/track_task.go b/worker/track_task.go index 6d238a7..dceda03 100644 --- a/worker/track_task.go +++ b/worker/track_task.go @@ -12,8 +12,9 @@ import ( // coordinating task claims among multiple worker routines in a concurrent environment. // // The struct contains two maps: -// - tasks: A map that stores tasks by their names, allowing quick retrieval and updates. -// - claimed: A map that tracks whether tasks have been claimed, with a boolean indicating the claim status. +// +// tasks map[string]configuration.Task: A map that stores tasks by their names, allowing quick retrieval and updates. +// claimed map[string]bool: A map that tracks whether tasks have been claimed, with a boolean indicating the claim status. // // The methods of TaskStatusMap provide safe manipulation of tasks and their claim status, ensuring // that all operations are atomic and no data races occur. @@ -28,7 +29,8 @@ type TaskStatusMap struct { // structure for task tracking. // // Returns: -// - *TaskStatusMap: A pointer to the newly created TaskStatusMap instance. +// +// *TaskStatusMap: A pointer to the newly created TaskStatusMap instance. func NewTaskStatusMap() *TaskStatusMap { return &TaskStatusMap{ tasks: make(map[string]configuration.Task), @@ -41,7 +43,8 @@ func NewTaskStatusMap() *TaskStatusMap { // does not interfere with other concurrent operations on the TaskStatusMap. // // Parameters: -// - task: The task to add or update in the map. +// +// task configuration.Task: The task to add or update in the map. // // Note: this deadcode is left here for future use. func (s *TaskStatusMap) AddTask(task configuration.Task) { @@ -55,11 +58,13 @@ func (s *TaskStatusMap) AddTask(task configuration.Task) { // existence and details of a task without risking a data race. // // Parameters: -// - name: The name of the task to retrieve. +// +// name string: The name of the task to retrieve. // // Returns: -// - configuration.Task: The retrieved task. -// - bool: A boolean indicating whether the task was found in the map. +// +// configuration.Task: The retrieved task. +// bool: A boolean indicating whether the task was found in the map. // // Note: this deadcode is left here for future use. func (s *TaskStatusMap) GetTask(name string) (configuration.Task, bool) { @@ -74,7 +79,8 @@ func (s *TaskStatusMap) GetTask(name string) (configuration.Task, bool) { // is useful when a task's properties need to be changed during its lifecycle. // // Parameters: -// - task: The task with updated information to be stored in the map. +// +// task configuration.Task: The task with updated information to be stored in the map. // // Note: this deadcode is left here for future use. func (s *TaskStatusMap) UpdateTask(task configuration.Task) { @@ -88,7 +94,8 @@ func (s *TaskStatusMap) UpdateTask(task configuration.Task) { // to a task that is no longer relevant. // // Parameters: -// - name: The name of the task to remove. +// +// name string: The name of the task to remove. // // Note: this deadcode is left here for future use. func (s *TaskStatusMap) DeleteTask(name string) { @@ -104,10 +111,12 @@ func (s *TaskStatusMap) DeleteTask(name string) { // for coordinating task claims among concurrent workers. // // Parameters: -// - taskName: The name of the task to claim. +// +// taskName string: The name of the task to claim. // // Returns: -// - bool: A boolean indicating whether the task was successfully claimed. +// +// bool: A boolean indicating whether the task was successfully claimed. // // Note: this deadcode is left here for future use. func (s *TaskStatusMap) Claim(taskName string) bool { @@ -126,7 +135,8 @@ func (s *TaskStatusMap) Claim(taskName string) bool { // available again. // // Parameters: -// - taskName: The name of the task to unclaim. +// +// taskName string: The name of the task to unclaim. func (s *TaskStatusMap) Release(taskName string) { s.mu.Lock() defer s.mu.Unlock() @@ -138,7 +148,8 @@ func (s *TaskStatusMap) Release(taskName string) { // that further manipulations of the slice do not affect the original tasks in the map. // // Returns: -// - []configuration.Task: A slice containing all tasks from the tasks map. +// +// []configuration.Task: A slice containing all tasks from the tasks map. // // Note: this deadcode is left here for future use. func (s *TaskStatusMap) GetAllTasks() []configuration.Task { @@ -156,10 +167,12 @@ func (s *TaskStatusMap) GetAllTasks() []configuration.Task { // to verify if a task is already being processed by another worker. // // Parameters: -// - taskName: The name of the task to check the claim status for. +// +// taskName string: The name of the task to check the claim status for. // // Returns: -// - bool: A boolean indicating whether the task is currently claimed. +// +// bool: A boolean indicating whether the task is currently claimed. // // Note: this deadcode is left here for future use. func (s *TaskStatusMap) IsClaimed(taskName string) bool { diff --git a/worker/update_image.go b/worker/update_image.go index d98b214..949c042 100644 --- a/worker/update_image.go +++ b/worker/update_image.go @@ -21,14 +21,15 @@ import ( // number of retries, it reports the failure. // // Parameters: -// - ctx: Context for cancellation and timeout. -// - clientset: A Kubernetes clientset to interact with the Kubernetes API. -// - namespace: The Kubernetes namespace containing the deployment. -// - deploymentName: The name of the deployment to update. -// - containerName: The name of the container within the deployment to update. -// - newImage: The new image to apply to the container. -// - results: A channel to send operation results for logging. -// - logger: A logger for structured logging. +// +// ctx context.Context: Context for cancellation and timeout. +// clientset *kubernetes.Clientset: A Kubernetes clientset to interact with the Kubernetes API. +// namespace: The Kubernetes namespace containing the deployment. +// deploymentName: The name of the deployment to update. +// containerName: The name of the container within the deployment to update. +// newImage string: The new image to apply to the container. +// maxRetries int: A channel to send operation results for logging. +// retryDelay time.Duration: A logger for structured logging. // // Returns an error if the operation fails after the maximum number of retries or if a non-conflict error is encountered. func UpdateDeploymentImage(ctx context.Context, clientset *kubernetes.Clientset, namespace, deploymentName, containerName, newImage string, maxRetries int, retryDelay time.Duration, results chan<- string, logger *zap.Logger) error { diff --git a/worker/update_network_policy.go b/worker/update_network_policy.go index 31986d6..74c847c 100644 --- a/worker/update_network_policy.go +++ b/worker/update_network_policy.go @@ -23,13 +23,14 @@ import ( // a failure is reported. // // Parameters: -// - ctx: Context for cancellation and timeout. -// - clientset: A Kubernetes clientset for interacting with the Kubernetes API. -// - namespace: The Kubernetes namespace containing the NetworkPolicy. -// - policyName: The name of the NetworkPolicy to update. -// - policySpec: The new specification for the NetworkPolicy. -// - results: A channel to send operation results for logging. -// - logger: A logger for structured logging. +// +// ctx context.Context: Context for cancellation and timeout. +// clientset *kubernetes.Clientset: A Kubernetes clientset for interacting with the Kubernetes API. +// namespace: The Kubernetes namespace containing the NetworkPolicy. +// policyName string: The name of the NetworkPolicy to update. +// policySpec networkingv1.NetworkPolicySpec: The new specification for the NetworkPolicy. +// esults chan<- string: A channel to send operation results for logging. +// logger *zap.Logger: A logger for structured logging. // // Returns an error if the operation fails after retries or if a non-conflict error is encountered. func UpdateNetworkPolicy(ctx context.Context, clientset *kubernetes.Clientset, namespace, policyName string, policySpec networkingv1.NetworkPolicySpec, results chan<- string, logger *zap.Logger) error { @@ -94,7 +95,8 @@ func extractPolicyName(parameters map[string]interface{}) (string, error) { // into a networkingv1.NetworkPolicySpec struct. // // Parameters: -// - policySpecData: A string containing the NetworkPolicy specification in JSON or YAML format. +// +// policySpecData string: A string containing the NetworkPolicy specification in JSON or YAML format. // // Returns the unmarshaled NetworkPolicySpec and an error if unmarshaling fails. func unmarshalPolicySpec(policySpecData string) (networkingv1.NetworkPolicySpec, error) {