From 441531e57e7a1a0a12226e620f51c568956055bc Mon Sep 17 00:00:00 2001 From: ChandraNarreddy <16383732+ChandraNarreddy@users.noreply.github.com> Date: Fri, 17 Nov 2023 20:59:14 +0530 Subject: [PATCH] quick fixes and clean up --- main.go | 3 ++- readme.md | 26 +++++++++++++------------- task/task.go | 21 ++++++--------------- 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/main.go b/main.go index 4b02ea2..e0d4951 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "sqsd/task" "sqsd/taskQ" "sqsd/workerpool" + "syscall" ) func main() { @@ -103,7 +104,7 @@ func main() { //create a channel for OS interrupt events and listen for them quitSig := make(chan os.Signal, 1) - signal.Notify(quitSig, os.Interrupt, os.Kill) + signal.Notify(quitSig, os.Interrupt, syscall.SIGTERM) <-quitSig pollerQuitChan <- 1 diff --git a/readme.md b/readme.md index f0cf1c9..5bb2cbf 100644 --- a/readme.md +++ b/readme.md @@ -4,21 +4,22 @@ SQSD is an open source implementation of AWS' SQS Daemon used in their Elastic BeanStalk worker tier environments. -## Salient Features - +## Salient Features -* Employs a task queue model backed by go's channel-goroutines concurrency mechanism. Task queue's buffer and number of goroutines are both configurable. +* Employs a task queue worker pool model backed by go's channel-goroutines concurrency mechanism. +* Task queue's buffer size and number of goroutines are both configurable. * Upstream worker endpoint can be any legal URL as opposed to a locally bound server endpoint. -* +## How to run -## How to run - +* Ensure that AWS credentials are available as environment variables or in shared configuration files or as ec2 role credentials. +* sqsd requires name of the sqs queue and URL address of the upstream as mandatory parameters. -* Ensure that AWS credentials are available as environment variables or as ec2 role credentials. -* sqsd requires the name of the sqs queue and the URL address to the workers as mandatory parameters. ```bash -$ sqsd -sqs zensqsd_test -forward http://localhost:8080/ +% sqsd -sqs zensqsd_test -forward http://localhost:8080/ ``` -#### Options + +### Options | **Argument** | **Default** | **Required** | **Description** | |---|--------------|------------------------|----------------------------------------------------------------------------------------------------------------------| @@ -38,11 +39,10 @@ $ sqsd -sqs zensqsd_test -forward http://localhost:8080/ | -workersCount | `20` | no | Number of concurrent workers to spawn; default 20 | | -logLevel | `0` | no | logging level. Pass -4 for DEBUG, 0 for INFO, 4 for WARN, 8 for ERROR; default 0 - INFO. | -## Docker version - +## Docker version + +You can use docker images published here to invoke SQSD like so - -You can use docker images published here to invoke SQSD like so - ```bash -$ docker run -it --entrypoint /sqsd.cmd ghcr.io/chandranarreddy/sqsd:main -sqs -forward +% docker run -it --entrypoint /sqsd.cmd ghcr.io/chandranarreddy/sqsd:main -sqs -forward ``` - - diff --git a/task/task.go b/task/task.go index fcfd0bf..5019404 100644 --- a/task/task.go +++ b/task/task.go @@ -20,15 +20,6 @@ import ( type SQSTaskStatus uint8 -const ( - ReceivedFromSQS SQSTaskStatus = iota - PushedToTaskQueue - PoppedFromTaskQueue - WaitingForAPIResponse - WorkerFailed - WorkerAbandoned -) - type TaskIfc interface { GetSQSMessage() SQSTypes.Message SetSQSMessage(SQSTypes.Message) @@ -112,13 +103,13 @@ func (c *SQSDTaskPerformer) PerformTask(task TaskIfc) error { SQSDTask, ok := task.(*SQSDTask) if !ok { slog.Error("Failed to perform task", "taskID", task.GetTaskID(), "error", "Task passed is not of type SQSDTask") - return fmt.Errorf("Task passed is not of type SQSDTask") + return fmt.Errorf("task passed is not of type SQSDTask") } if recvCount, ok := SQSDTask.sqsMessage.Attributes["ApproximateReceiveCount"]; ok { count, convErr := strconv.Atoi(recvCount) if convErr != nil { slog.Error("Error performing task", "taskID", task.GetTaskID(), "error", fmt.Sprintf("ApproximateReceiveCount attribute in the message is not a valid integer - %#v", convErr.Error())) - return fmt.Errorf("Error while parsing ApproximateReceiveCount attribute in the message to an integer") + return fmt.Errorf("error while parsing ApproximateReceiveCount attribute in the message to an integer") } if count > int(c.maxRetries) { slog.Debug("Max retries already attempted on this message, not attempting further", "messageID", *task.GetSQSMessage().MessageId) @@ -129,7 +120,7 @@ func (c *SQSDTaskPerformer) PerformTask(task TaskIfc) error { if reqErr != nil { //error logging here slog.Error("Error performing task", "taskID", task.GetTaskID(), "error", "Error while creating request to downstream") - return fmt.Errorf("Error while creating a request to the downstream") + return fmt.Errorf("error while creating a request to the downstream") } // add SQS message attributes as headers req.Header.Add("User-Agent", "ZenSQSD") @@ -167,7 +158,7 @@ func (c *SQSDTaskPerformer) PerformTask(task TaskIfc) error { if opErr != nil { slog.Error("Error changing visibility timeout on message", "messageID", *task.GetSQSMessage().MessageId, "error", opErr.Error()) } - return fmt.Errorf("Error obtaining response from upstream for message ID %#v", *task.GetSQSMessage().MessageId) + return fmt.Errorf("error obtaining response from upstream for message ID %#v", *task.GetSQSMessage().MessageId) } _, readBodyErr := io.ReadAll(resp.Body) if readBodyErr != nil { @@ -216,8 +207,8 @@ func (c *SQSDTaskPerformer) PerformTask(task TaskIfc) error { break } } - if messageSuccessfullyDeleted == false { - return fmt.Errorf("Failed to delete message with ID %s from SQS", *task.GetSQSMessage().MessageId) + if !messageSuccessfullyDeleted { + return fmt.Errorf("failed to delete message with ID %s from SQS", *task.GetSQSMessage().MessageId) } return nil }