Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rabbit #2

Merged
merged 33 commits into from
Feb 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
# Rebbit

[![Build Status](https://travis-ci.org/jd78/rabbit.svg?branch=master)](https://travis-ci.org/jd78/rabbit)

Rabbit is built on top of streadway/amqp RabbitMQ client (https://github.com/streadway/amqp) and brings the following functionalities:

- Can produce and consume messages in jSON and Protobuf.
- Can produce and consume concurrently, specifying the wanted number. The producer is concurrent round robin, the consumer is a concurrent multichannel.
- Can produce and consume concurrently, specifying the number of consumers/producers. The producer is concurrent round robin, the consumer is a concurrent multichannels.
- Partition consumer, can consume messages in sequences while concurrent. Assigning a partition id to a message, this will be consumed by the same channel, in order.
- Dynamic handlers.
- Message retries.
Expand Down Expand Up @@ -53,7 +51,7 @@ rabbit.TopologyConfiguration().
#### Producer

It's necessary to configure a producer for each output exchange you want to send messages to.
Each producer has at least one rabbit channel. You can specify how many concurrent and round robin producers you want, keeping in mind that each producer specified in the parameter "numberOfProducers" will create it's own rabbit channel. This has been done to ensure that the publish confirmation works correctly. Two messages cannot be sent at the same time for the same channel. Sent are queued in a round robin way.
Each producer has at least one rabbit channel. You can specify how many concurrent and round robin producers you want, keeping in mind that each producer specified in the parameter "numberOfProducers" will create it's own rabbit channel. This has been done to ensure that the publish confirmation works correctly. Two messages cannot be sent at the same time for the same channel. Sends are queued in a round robin way.

```go

Expand Down Expand Up @@ -99,7 +97,7 @@ func TestMessageHandler(test string, producer rabbit.IProducer) func(message int
```

A message handler can have parameters or not. You might want to inject your repository, a rabbit producer, etc.
A handler always return the function func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse, where message and handler are respectively the message and the handler that will be injected by the rabbit library; and rabbit.HandlerResponse is the return status of the handler that will be read by the rabbit library to perfom specific operations like ack, reqeueing, etc.
A handler always returns the function func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse, where message and handler are respectively the message and the handler that will be injected by the rabbit library; and rabbit.HandlerResponse is the return status of the handler that will be read by the rabbit library to perfom specific operations like ack, reqeueing, etc.

rabbit.HandlerResponse can be:
- rabbit.Completed, if all good, the message will be eventually acked
Expand All @@ -123,11 +121,38 @@ consumer := rabbit.ConfigureConsumer(100, 5*time.Second)

```go

h := TestMessageHandler("teststring", producer) //I'm passing a string and the producer dependency
h := TestMessageHandler("teststring", consumerProducer) //I'm passing a string and the producer dependency
consumer.AddHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h)

```

main.TestMessage is the type that will be send as envelope type, and the reflected type as second parameter is the is the contract representing the message. Finally, as third parameter, we pass the handler that will handle the message.


At this point, to start the consumer, without using the partitioner, it's enough to do

```go

//parameters: queue name, ack, use active passive, passive check for consumer interval, concurrent consumers, amqp consumer args
consumer.StartConsuming("test.inbound", true, true, 1000*time.Millisecond, 1, nil)

```

If you want to use the partitioner instead you need to configure the partition resolver.

```go

partitionerResolver := make(map[reflect.Type]func(message interface{}) int64)
partitionerResolver[reflect.TypeOf(TestMessage{})] = func(message interface{}) int64 { return int64(message.(TestMessage).Id) }

//parameters: queue name, ack, use active passive, passive check for consumer interval, max time waiting on partition handler error,
//number or partitions, resolver, amqp consumer args.
consumer.StartConsumingPartitions("test.inbound", true, true,
3000*time.Millisecond, 5*time.Second, 30, partitionerResolver, nil)

```

Be aware that if a partition is in error, no messages will be consumed for that partition but will be queued until the message in error gets correctly consumed.

### Demo

A simple demo is located in the example folder
Binary file added example/example
Binary file not shown.
34 changes: 34 additions & 0 deletions example/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"fmt"
"log"
"rabbit"
"time"
)

func TestMessageHandler(test string, producer rabbit.IProducer) func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse {
return func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse {
testMessage := message.(TestMessage)
log.Println("executing testMessage, test: " + test)
log.Println(testMessage)
log.Println(fmt.Sprintf("header: %v", header))
time.Sleep(500 * time.Millisecond)
err := producer.Send(message, "", "test", "correlation", "", nil, rabbit.Json)
if err != nil {
log.Println("Error sending the message")
return rabbit.Err
}
return rabbit.Completed
}
}

func ProtoMessageHandler() func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse {
return func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse {
testMessage := message.(Test)
log.Printf("Id: %d", testMessage.GetId())
log.Printf("Label: %s", testMessage.GetLabel())
time.Sleep(500 * time.Millisecond)
return rabbit.Completed
}
}
82 changes: 82 additions & 0 deletions example/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"fmt"
"log"
"rabbit"
"reflect"
"runtime"
"time"

"github.com/golang/protobuf/proto"
)

type TestMessage struct {
Id int
Name string
}

var roundrobin int64 = 0

func main() {

runtime.GOMAXPROCS(4)

logger := rabbit.CreateLogger(
func(l string) { log.Println(l) },
func(l string) { log.Println(l) },
func(l string) { log.Println(l) },
func(l string) { log.Fatal(l) },
func(l string) { log.Println(l) },
rabbit.Debug)

rabbit.Initialize(logger, "amqp://guest:guest@localhost:5672/")
rabbit.TopologyConfiguration().
DeclareExchange("test.output", "topic", true, false, false, nil).
DeclareExchange("consumer.output", "fanout", true, false, false, nil).
DeclareQueue("test.inbound", true, false, false, nil).
BindQueue("test.inbound", "#", "test.output", nil).
DeclareQueue("consumer.inbound", true, false, false, map[string]interface{}{
"x-message-ttl": int32(600000),
}).
BindQueue("consumer.inbound", "", "consumer.output", nil).
Complete()

producer := rabbit.ConfigureProducer(3, "test.output", rabbit.Transient, true)
consumerProducer := rabbit.ConfigureProducer(3, "consumer.output", rabbit.Transient, true)

consumer := rabbit.ConfigureConsumer(100, 5*time.Second)
h := TestMessageHandler("teststring", consumerProducer)
consumer.AddHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h)

protoH := ProtoMessageHandler()
consumer.AddHandler("main.Test", reflect.TypeOf(Test{}), protoH)

//consumer.AddRetryHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h, 10)
partitionerResolver := make(map[reflect.Type]func(message interface{}) int64)
partitionerResolver[reflect.TypeOf(TestMessage{})] = func(message interface{}) int64 { return int64(message.(TestMessage).Id) }
//consumer.StartConsuming("test.inbound", true, true, 1000*time.Millisecond, 1, nil)
consumer.StartConsumingPartitions("test.inbound", true, true,
3000*time.Millisecond, 5*time.Second, 30, partitionerResolver, nil)

for i := 0; i < 1; i++ {
go func(i int) {
message := TestMessage{i, "test"}
err := producer.Send(message, "testmessage.1", fmt.Sprintf("%d", i), "correlation", "", nil, rabbit.Json)
if err != nil {
log.Fatal(err)
}

protoMessage := &Test{
Id: proto.Int32(int32(i)),
Label: proto.String("test protobuf"),
}
err2 := producer.Send(protoMessage, "protoMessage.1", fmt.Sprintf("%d", i), "correlation", "", nil, rabbit.Protobuf)
if err2 != nil {
log.Fatal(err2)
}
}(i)
}

fmt.Scanln()
}
69 changes: 69 additions & 0 deletions example/protoMessage.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions example/protoMessage.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
syntax = "proto2";
package main;

message Test {
required int32 Id = 1;
required string label = 2;
}