Skip to content

Commit

Permalink
Rabbit (#2)
Browse files Browse the repository at this point in the history
* starting

* topology

* topology

* producer

* producer

* producer

* messagerype

* confirms

* producer make it better

* bugfix + consumer

* temp consumer

* working

* deserializer

* error handling

* bugfix + correlation

* make log better

* default serializer

* echange bind

* active passive

* make it better

* return response

* partitioner

* bugfix

* error handling

* minor

* retry moved

* waiting error moved

* protobuf support

* inject header in channel

* readme

* readme

* examples
  • Loading branch information
jd78 authored Feb 12, 2018
1 parent e05ad99 commit ed6e3fb
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 7 deletions.
39 changes: 32 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
# Rebbit

[![Build Status](https://travis-ci.org/jd78/rabbit.svg?branch=master)](https://travis-ci.org/jd78/rabbit)

Rabbit is built on top of streadway/amqp RabbitMQ client (https://github.com/streadway/amqp) and brings the following functionalities:

- Can produce and consume messages in jSON and Protobuf.
- Can produce and consume concurrently, specifying the wanted number. The producer is concurrent round robin, the consumer is a concurrent multichannel.
- Can produce and consume concurrently, specifying the number of consumers/producers. The producer is concurrent round robin, the consumer is a concurrent multichannels.
- Partition consumer, can consume messages in sequences while concurrent. Assigning a partition id to a message, this will be consumed by the same channel, in order.
- Dynamic handlers.
- Message retries.
Expand Down Expand Up @@ -53,7 +51,7 @@ rabbit.TopologyConfiguration().
#### Producer

It's necessary to configure a producer for each output exchange you want to send messages to.
Each producer has at least one rabbit channel. You can specify how many concurrent and round robin producers you want, keeping in mind that each producer specified in the parameter "numberOfProducers" will create it's own rabbit channel. This has been done to ensure that the publish confirmation works correctly. Two messages cannot be sent at the same time for the same channel. Sent are queued in a round robin way.
Each producer has at least one rabbit channel. You can specify how many concurrent and round robin producers you want, keeping in mind that each producer specified in the parameter "numberOfProducers" will create it's own rabbit channel. This has been done to ensure that the publish confirmation works correctly. Two messages cannot be sent at the same time for the same channel. Sends are queued in a round robin way.

```go

Expand Down Expand Up @@ -99,7 +97,7 @@ func TestMessageHandler(test string, producer rabbit.IProducer) func(message int
```

A message handler can have parameters or not. You might want to inject your repository, a rabbit producer, etc.
A handler always return the function func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse, where message and handler are respectively the message and the handler that will be injected by the rabbit library; and rabbit.HandlerResponse is the return status of the handler that will be read by the rabbit library to perfom specific operations like ack, reqeueing, etc.
A handler always returns the function func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse, where message and handler are respectively the message and the handler that will be injected by the rabbit library; and rabbit.HandlerResponse is the return status of the handler that will be read by the rabbit library to perfom specific operations like ack, reqeueing, etc.

rabbit.HandlerResponse can be:
- rabbit.Completed, if all good, the message will be eventually acked
Expand All @@ -123,11 +121,38 @@ consumer := rabbit.ConfigureConsumer(100, 5*time.Second)

```go

h := TestMessageHandler("teststring", producer) //I'm passing a string and the producer dependency
h := TestMessageHandler("teststring", consumerProducer) //I'm passing a string and the producer dependency
consumer.AddHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h)

```

main.TestMessage is the type that will be send as envelope type, and the reflected type as second parameter is the is the contract representing the message. Finally, as third parameter, we pass the handler that will handle the message.


At this point, to start the consumer, without using the partitioner, it's enough to do

```go

//parameters: queue name, ack, use active passive, passive check for consumer interval, concurrent consumers, amqp consumer args
consumer.StartConsuming("test.inbound", true, true, 1000*time.Millisecond, 1, nil)

```

If you want to use the partitioner instead you need to configure the partition resolver.

```go

partitionerResolver := make(map[reflect.Type]func(message interface{}) int64)
partitionerResolver[reflect.TypeOf(TestMessage{})] = func(message interface{}) int64 { return int64(message.(TestMessage).Id) }

//parameters: queue name, ack, use active passive, passive check for consumer interval, max time waiting on partition handler error,
//number or partitions, resolver, amqp consumer args.
consumer.StartConsumingPartitions("test.inbound", true, true,
3000*time.Millisecond, 5*time.Second, 30, partitionerResolver, nil)

```

Be aware that if a partition is in error, no messages will be consumed for that partition but will be queued until the message in error gets correctly consumed.

### Demo

A simple demo is located in the example folder
Binary file added example/example
Binary file not shown.
34 changes: 34 additions & 0 deletions example/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"fmt"
"log"
"rabbit"
"time"
)

func TestMessageHandler(test string, producer rabbit.IProducer) func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse {
return func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse {
testMessage := message.(TestMessage)
log.Println("executing testMessage, test: " + test)
log.Println(testMessage)
log.Println(fmt.Sprintf("header: %v", header))
time.Sleep(500 * time.Millisecond)
err := producer.Send(message, "", "test", "correlation", "", nil, rabbit.Json)
if err != nil {
log.Println("Error sending the message")
return rabbit.Err
}
return rabbit.Completed
}
}

func ProtoMessageHandler() func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse {
return func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse {
testMessage := message.(Test)
log.Printf("Id: %d", testMessage.GetId())
log.Printf("Label: %s", testMessage.GetLabel())
time.Sleep(500 * time.Millisecond)
return rabbit.Completed
}
}
82 changes: 82 additions & 0 deletions example/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"fmt"
"log"
"rabbit"
"reflect"
"runtime"
"time"

"github.com/golang/protobuf/proto"
)

type TestMessage struct {
Id int
Name string
}

var roundrobin int64 = 0

func main() {

runtime.GOMAXPROCS(4)

logger := rabbit.CreateLogger(
func(l string) { log.Println(l) },
func(l string) { log.Println(l) },
func(l string) { log.Println(l) },
func(l string) { log.Fatal(l) },
func(l string) { log.Println(l) },
rabbit.Debug)

rabbit.Initialize(logger, "amqp://guest:guest@localhost:5672/")
rabbit.TopologyConfiguration().
DeclareExchange("test.output", "topic", true, false, false, nil).
DeclareExchange("consumer.output", "fanout", true, false, false, nil).
DeclareQueue("test.inbound", true, false, false, nil).
BindQueue("test.inbound", "#", "test.output", nil).
DeclareQueue("consumer.inbound", true, false, false, map[string]interface{}{
"x-message-ttl": int32(600000),
}).
BindQueue("consumer.inbound", "", "consumer.output", nil).
Complete()

producer := rabbit.ConfigureProducer(3, "test.output", rabbit.Transient, true)
consumerProducer := rabbit.ConfigureProducer(3, "consumer.output", rabbit.Transient, true)

consumer := rabbit.ConfigureConsumer(100, 5*time.Second)
h := TestMessageHandler("teststring", consumerProducer)
consumer.AddHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h)

protoH := ProtoMessageHandler()
consumer.AddHandler("main.Test", reflect.TypeOf(Test{}), protoH)

//consumer.AddRetryHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h, 10)
partitionerResolver := make(map[reflect.Type]func(message interface{}) int64)
partitionerResolver[reflect.TypeOf(TestMessage{})] = func(message interface{}) int64 { return int64(message.(TestMessage).Id) }
//consumer.StartConsuming("test.inbound", true, true, 1000*time.Millisecond, 1, nil)
consumer.StartConsumingPartitions("test.inbound", true, true,
3000*time.Millisecond, 5*time.Second, 30, partitionerResolver, nil)

for i := 0; i < 1; i++ {
go func(i int) {
message := TestMessage{i, "test"}
err := producer.Send(message, "testmessage.1", fmt.Sprintf("%d", i), "correlation", "", nil, rabbit.Json)
if err != nil {
log.Fatal(err)
}

protoMessage := &Test{
Id: proto.Int32(int32(i)),
Label: proto.String("test protobuf"),
}
err2 := producer.Send(protoMessage, "protoMessage.1", fmt.Sprintf("%d", i), "correlation", "", nil, rabbit.Protobuf)
if err2 != nil {
log.Fatal(err2)
}
}(i)
}

fmt.Scanln()
}
69 changes: 69 additions & 0 deletions example/protoMessage.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions example/protoMessage.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
syntax = "proto2";
package main;

message Test {
required int32 Id = 1;
required string label = 2;
}

0 comments on commit ed6e3fb

Please sign in to comment.