Skip to content

Commit

Permalink
Added badges
Browse files Browse the repository at this point in the history
  • Loading branch information
Joker666 committed Nov 15, 2020
1 parent 59e214e commit 8884fea
Show file tree
Hide file tree
Showing 14 changed files with 82 additions and 50 deletions.
46 changes: 28 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
<h1 align="center"> Cogman </h1>
[![Go Report Card](https://goreportcard.com/badge/Joker666/cogman)](https://goreportcard.com/report/github.com/Joker666/cogman) [![License](https://img.shields.io/badge/license-SATA-blue)](https://github.com/Joker666/cogman/blob/master/LICENSE)

<h1 align="center"> Cogman </h1>

A distributed task runner.

---

### Clone & Build:
```
go get github.com/Tapfury/cogman
go get github.com/Joker666/cogman
cd cogman
./build.sh
```
Expand Down Expand Up @@ -59,7 +61,7 @@ Client & Server have individual config file to use them separately.

#### Client/Server

This Congman api call will start a client and a server.
This Cogman api call will start a client and a server.

```go
if err := cogman.StartBackground(cfg); err != nil {
Expand All @@ -71,24 +73,24 @@ Initiate Client & Server individually:

```go
// Client
clnt, err := cogman.NewSession(cfg)
client, err := cogman.NewSession(cfg)
if err != nil {
log.Fatal(err)
}

if err := clnt.Connect(); err != nil {
if err := client.Connect(); err != nil {
log.Fatal(err)
}

// Server
srvr, err := cogman.NewServer(cfg)
server, err := cogman.NewServer(cfg)
if err != nil {
log.Fatal(err)
}

go func() {
defer srvr.Stop()
if err = srvr.Start(); err != nil {
defer server.Stop()
if err = server.Start(); err != nil {
log.Fatal(err)
}
}()
Expand All @@ -105,7 +107,7 @@ type Task struct {
OriginalTaskID string // a retry task will carry it's parents ID.
PrimaryKey string // optional. Client can set any key to trace a task.
Retry int // default value 0.
Prefetch int // optional. Number of task fetch from queue by consumer at a time.
Prefetch int // optional. Number of task fetch from queue by consumer at a time.
Payload []byte // required
Priority TaskPriority // required. High or Low
Status Status // current task status
Expand All @@ -118,7 +120,7 @@ type Task struct {

#### Worker/Task Handler

Any struct can be passed as a handler it implement below `interface`:
Any struct can be passed as a handler it implements below `interface`:

```go
type Handler interface {
Expand Down Expand Up @@ -173,22 +175,30 @@ Cogman queue type:
- Low_Priority_Queue [lazy Queue]
```

Two type queue Cogman maintain. Default & Lazy queue. All the high priority task will be push to default queue and low priority task will be push to lazy queue. The number of each type of queue can be set by client/server. And queue won't be lost after any sort of connection interruption.
There are two types queues that Cogman maintains. Default & Lazy queue.
High priority tasks would be pushed to default queue and low priority task would be pushed to lazy queue.
The number of each type of queues can be set by client/server through configuration.
Queue won't be lost after any sort of connection interruption.

#### Re-Connection

Cogman Client & Server both handler reconnection. If Client loss connection, it can still take task and those will be processed immediate after Cogman client get back the connection.
After Server reconnect, it will start to consume task without losing any task.
Cogman Client & Server both handles reconnection. If the client loses connection, it can still take tasks,
and those will be processed immediate after Cogman client gets back the connection.
After Server reconnects, it will start to consume tasks without losing any task.

#### Re-Enqueue

Re-enqueue feature to recover all the initiated task those are lost for connection error. If client somehow lost the amqp connection, Cogman can still take the task in offline. All offline task will be re-queue after connection re-established. Cogman fetch all the offline task from mongo logs, and re-initiate them. Mongo connection required here. For re-enqueuing, task retry count wont be changed.
Re-enqueue feature to recover all the initiated task those are lost for connection error.
If client somehow loses the amqp connection, Cogman can still take the task in offline.
All offline task will be re-queue after connection re-established.
Cogman fetches all the offline tasks from mongo logs, and re-initiate them. Mongo connection required here.
For re-enqueuing, task retry count would not change.

#### Implementation
- [Simple use](https://github.com/Tapfury/cogman/tree/master/example/simple)
- [Queue type](https://github.com/Tapfury/cogman/tree/master/example/queue)
- [Client & Server](https://github.com/Tapfury/cogman/tree/master/example/client-server)
- [Task & Task Handler](https://github.com/Tapfury/cogman/tree/master/example/tasks)
- [Simple use](https://github.com/Joker666/cogman/tree/master/example/simple)
- [Queue type](https://github.com/Joker666/cogman/tree/master/example/queue)
- [Client & Server](https://github.com/Joker666/cogman/tree/master/example/client-server)
- [Task & Task Handler](https://github.com/Joker666/cogman/tree/master/example/tasks)


### Feature Comparison
Expand Down
20 changes: 10 additions & 10 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *Server) consume(ctx context.Context, prefetch int) error {
continue
}

wrkr, ok := s.workers[taskName]
worker, ok := s.workers[taskName]
if !ok {
errCh <- errorTaskBody{
taskID,
Expand All @@ -133,12 +133,12 @@ func (s *Server) consume(ctx context.Context, prefetch int) error {

wg.Add(1)
// Start processing task
go func(wrkr *util.Worker, msg *amqp.Delivery) {
go func(worker *util.Worker, msg *amqp.Delivery) {
defer wg.Done()

s.lgr.Info("processing task", util.Object{Key: "taskName", Val: wrkr.Name()}, util.Object{Key: "taskID", Val: taskID})
s.lgr.Info("processing task", util.Object{Key: "taskName", Val: worker.Name()}, util.Object{Key: "taskID", Val: taskID})
startAt := time.Now()
if err := wrkr.Process(msg); err != nil {
if err := worker.Process(msg); err != nil {
errCh <- errorTaskBody{
taskID,
util.StatusFailed,
Expand All @@ -150,7 +150,7 @@ func (s *Server) consume(ctx context.Context, prefetch int) error {

s.taskRepo.UpdateTaskStatus(ctx, taskID, util.StatusSuccess, duration)

}(wrkr, &msg)
}(worker, &msg)
}

wg.Wait()
Expand All @@ -160,22 +160,22 @@ func (s *Server) consume(ctx context.Context, prefetch int) error {

// setConsumer set a consumer for each single queue.
func (s *Server) setConsumer(ctx context.Context, queue, mode string, prefetch int, taskPool chan<- amqp.Delivery) {
chnl, err := s.acon.Channel()
channel, err := s.acon.Channel()
if err != nil {
s.lgr.Error("failed to create channel", err)
return
}

defer chnl.Close()
defer channel.Close()

closeNotification := chnl.NotifyClose(make(chan *amqp.Error, 1))
closeNotification := channel.NotifyClose(make(chan *amqp.Error, 1))

if err := chnl.Qos(prefetch, 0, false); err != nil {
if err := channel.Qos(prefetch, 0, false); err != nil {
s.lgr.Error("failed to set qos", err)
return
}

msg, err := chnl.Consume(
msg, err := channel.Consume(
queue,
"",
false,
Expand Down
2 changes: 2 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cogman

import "errors"

// list of errors
var (
ErrRequestTimeout = errors.New("cogman: request timeout")
ErrInvalidData = errors.New("cogman: invalid data")
Expand All @@ -17,6 +18,7 @@ var (
ErrNoTaskID = errors.New("cogman: no task id")
)

// TaskHandlerMissingError is error when task handler is missing
type TaskHandlerMissingError string

func (t TaskHandlerMissingError) Error() string {
Expand Down
1 change: 1 addition & 0 deletions example/client-server/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func main() {
<-end
}

// SendExampleTask sends example task
func SendExampleTask(client *cogman.Session) error {
log.Printf("========================================>")

Expand Down
3 changes: 3 additions & 0 deletions example/tasks/mul_num.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ import (
"github.com/Joker666/cogman/util"
)

// MulTask holds necessary fields for multiply task
type MulTask struct {
Name string
}

// NewMulTask returns instance of MulTask
func NewMulTask() util.Handler {
return MulTask{
Name: TaskMultiplication,
}
}

// Do handles task
func (t MulTask) Do(ctx context.Context, payload []byte) error {
var body TaskBody
if err := json.Unmarshal(payload, &body); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions example/tasks/sub_num .go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ import (
"github.com/Joker666/cogman/util"
)

// SubTask holds necessary fields for subtract task
type SubTask struct {
Name string
}

// NewSubTask returns instance of SubTask
func NewSubTask() util.Handler {
return SubTask{
TaskSubtraction,
}
}

// Do handles task
func (t SubTask) Do(ctx context.Context, payload []byte) error {
var body TaskBody
if err := json.Unmarshal(payload, &body); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions example/tasks/sum_num.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ import (
"github.com/Joker666/cogman/util"
)

// SumTask holds necessary fields for multiply task
type SumTask struct {
Name string
}

// NewSumTask returns instance of SumTask
func NewSumTask() util.Handler {
return SumTask{
Name: TaskAddition,
}
}

// Do handles task
func (t SumTask) Do(ctx context.Context, payload []byte) error {
var body TaskBody
if err := json.Unmarshal(payload, &body); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions example/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
TaskErrorGenerator = "error_generator"
)

// TaskList returns list of task names
func TaskList() []string {
return []string{
TaskAddition,
Expand All @@ -24,11 +25,13 @@ func TaskList() []string {
}
}

// TaskBody holds fields for task body
type TaskBody struct {
Num1 int
Num2 int
}

// GetAdditionTask returns addition task
func GetAdditionTask(numA, numB int, p util.TaskPriority, retryCount int) (*util.Task, error) {
body := TaskBody{
Num1: numA,
Expand All @@ -51,6 +54,7 @@ func GetAdditionTask(numA, numB int, p util.TaskPriority, retryCount int) (*util
return task, nil
}

// GetMultiplicationTask returns multiplication task
func GetMultiplicationTask(numA, numB int, p util.TaskPriority, retryCount int) (*util.Task, error) {
body := TaskBody{
Num1: numA,
Expand All @@ -73,6 +77,7 @@ func GetMultiplicationTask(numA, numB int, p util.TaskPriority, retryCount int)
return task, nil
}

// GetSubtractionTask returns subtraction task
func GetSubtractionTask(numA, numB int, p util.TaskPriority, retryCount int) (*util.Task, error) {
body := TaskBody{
Num1: numA,
Expand Down
15 changes: 7 additions & 8 deletions repo/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ func (s *TaskRepository) CreateTask(task *util.Task) error {
return
}

byts, err := json.Marshal(task)
bytes, err := json.Marshal(task)
if err != nil {
errs = err
return
}

err = s.RedisConn.Create(task.TaskID, byts)
err = s.RedisConn.Create(task.TaskID, bytes)
if err != nil {
errs = err
return
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *TaskRepository) UpdateTaskStatus(ctx context.Context, id string, status
case util.StatusSuccess:
dur, ok := args[0].(float64)
if !ok {
s.lgr.Error("UpdateTaskStatus", ErrDurationRequired)
s.lgr.Error("bytes", ErrDurationRequired)
return
}
duration = &dur
Expand Down Expand Up @@ -342,14 +342,13 @@ func (s *TaskRepository) UpdateTaskStatus(ctx context.Context, id string, status
task.FailError = failError.Error()
}

byts, err := json.Marshal(task)
bytes, err := json.Marshal(task)
if err != nil {
errs = err
return
}

errs = s.RedisConn.Update(task.TaskID, byts)

errs = s.RedisConn.Update(task.TaskID, bytes)
}()

if errs != nil {
Expand Down Expand Up @@ -413,13 +412,13 @@ func (s *TaskRepository) UpdateRetryCount(id string, count int) {
task.UpdatedAt = time.Now()
task.Retry += count

byts, err := json.Marshal(task)
bytes, err := json.Marshal(task)
if err != nil {
errs = err
return
}

errs = s.RedisConn.Update(task.TaskID, byts)
errs = s.RedisConn.Update(task.TaskID, bytes)
}()

if errs != nil {
Expand Down
Loading

0 comments on commit 8884fea

Please sign in to comment.