Skip to content

Commit

Permalink
Merge pull request #3 from ChandraNarreddy/development
Browse files Browse the repository at this point in the history
quick fixes and clean up
  • Loading branch information
ChandraNarreddy authored Nov 17, 2023
2 parents 481429e + 441531e commit feb8ab1
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 29 deletions.
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sqsd/task"
"sqsd/taskQ"
"sqsd/workerpool"
"syscall"
)

func main() {
Expand Down Expand Up @@ -103,7 +104,7 @@ func main() {

//create a channel for OS interrupt events and listen for them
quitSig := make(chan os.Signal, 1)
signal.Notify(quitSig, os.Interrupt, os.Kill)
signal.Notify(quitSig, os.Interrupt, syscall.SIGTERM)
<-quitSig

pollerQuitChan <- 1
Expand Down
26 changes: 13 additions & 13 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@

SQSD is an open source implementation of AWS' SQS Daemon used in their Elastic BeanStalk worker tier environments.

## Salient Features -
## Salient Features

* Employs a task queue model backed by go's channel-goroutines concurrency mechanism. Task queue's buffer and number of goroutines are both configurable.
* Employs a task queue worker pool model backed by go's channel-goroutines concurrency mechanism.
* Task queue's buffer size and number of goroutines are both configurable.
* Upstream worker endpoint can be any legal URL as opposed to a locally bound server endpoint.
*

## How to run

## How to run -
* Ensure that AWS credentials are available as environment variables or in shared configuration files or as ec2 role credentials.
* sqsd requires name of the sqs queue and URL address of the upstream as mandatory parameters.

* Ensure that AWS credentials are available as environment variables or as ec2 role credentials.
* sqsd requires the name of the sqs queue and the URL address to the workers as mandatory parameters.
```bash
$ sqsd -sqs zensqsd_test -forward http://localhost:8080/
% sqsd -sqs zensqsd_test -forward http://localhost:8080/
```
#### Options

### Options

| **Argument** | **Default** | **Required** | **Description** |
|---|--------------|------------------------|----------------------------------------------------------------------------------------------------------------------|
Expand All @@ -38,11 +39,10 @@ $ sqsd -sqs zensqsd_test -forward http://localhost:8080/
| -workersCount | `20` | no | Number of concurrent workers to spawn; default 20 |
| -logLevel | `0` | no | logging level. Pass -4 for DEBUG, 0 for INFO, 4 for WARN, 8 for ERROR; default 0 - INFO. |

## Docker version -
## Docker version

You can use docker images published here to invoke SQSD like so -

You can use docker images published here to invoke SQSD like so -
```bash
$ docker run -it --entrypoint /sqsd.cmd ghcr.io/chandranarreddy/sqsd:main -sqs <your_sqsq_name> -forward <your_upstream_worker_url>
% docker run -it --entrypoint /sqsd.cmd ghcr.io/chandranarreddy/sqsd:main -sqs <your_sqsq_name> -forward <your_upstream_worker_url>
```


21 changes: 6 additions & 15 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ import (

type SQSTaskStatus uint8

const (
ReceivedFromSQS SQSTaskStatus = iota
PushedToTaskQueue
PoppedFromTaskQueue
WaitingForAPIResponse
WorkerFailed
WorkerAbandoned
)

type TaskIfc interface {
GetSQSMessage() SQSTypes.Message
SetSQSMessage(SQSTypes.Message)
Expand Down Expand Up @@ -112,13 +103,13 @@ func (c *SQSDTaskPerformer) PerformTask(task TaskIfc) error {
SQSDTask, ok := task.(*SQSDTask)
if !ok {
slog.Error("Failed to perform task", "taskID", task.GetTaskID(), "error", "Task passed is not of type SQSDTask")
return fmt.Errorf("Task passed is not of type SQSDTask")
return fmt.Errorf("task passed is not of type SQSDTask")
}
if recvCount, ok := SQSDTask.sqsMessage.Attributes["ApproximateReceiveCount"]; ok {
count, convErr := strconv.Atoi(recvCount)
if convErr != nil {
slog.Error("Error performing task", "taskID", task.GetTaskID(), "error", fmt.Sprintf("ApproximateReceiveCount attribute in the message is not a valid integer - %#v", convErr.Error()))
return fmt.Errorf("Error while parsing ApproximateReceiveCount attribute in the message to an integer")
return fmt.Errorf("error while parsing ApproximateReceiveCount attribute in the message to an integer")
}
if count > int(c.maxRetries) {
slog.Debug("Max retries already attempted on this message, not attempting further", "messageID", *task.GetSQSMessage().MessageId)
Expand All @@ -129,7 +120,7 @@ func (c *SQSDTaskPerformer) PerformTask(task TaskIfc) error {
if reqErr != nil {
//error logging here
slog.Error("Error performing task", "taskID", task.GetTaskID(), "error", "Error while creating request to downstream")
return fmt.Errorf("Error while creating a request to the downstream")
return fmt.Errorf("error while creating a request to the downstream")
}
// add SQS message attributes as headers
req.Header.Add("User-Agent", "ZenSQSD")
Expand Down Expand Up @@ -167,7 +158,7 @@ func (c *SQSDTaskPerformer) PerformTask(task TaskIfc) error {
if opErr != nil {
slog.Error("Error changing visibility timeout on message", "messageID", *task.GetSQSMessage().MessageId, "error", opErr.Error())
}
return fmt.Errorf("Error obtaining response from upstream for message ID %#v", *task.GetSQSMessage().MessageId)
return fmt.Errorf("error obtaining response from upstream for message ID %#v", *task.GetSQSMessage().MessageId)
}
_, readBodyErr := io.ReadAll(resp.Body)
if readBodyErr != nil {
Expand Down Expand Up @@ -216,8 +207,8 @@ func (c *SQSDTaskPerformer) PerformTask(task TaskIfc) error {
break
}
}
if messageSuccessfullyDeleted == false {
return fmt.Errorf("Failed to delete message with ID %s from SQS", *task.GetSQSMessage().MessageId)
if !messageSuccessfullyDeleted {
return fmt.Errorf("failed to delete message with ID %s from SQS", *task.GetSQSMessage().MessageId)
}
return nil
}
Expand Down

0 comments on commit feb8ab1

Please sign in to comment.