-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* starting * topology * topology * producer * producer * producer * messagerype * confirms * producer make it better * bugfix + consumer * temp consumer * working * deserializer * error handling * bugfix + correlation * make log better * default serializer * echange bind * active passive * make it better * return response * partitioner * bugfix * error handling * minor * retry moved * waiting error moved * protobuf support * inject header in channel * readme
- Loading branch information
Showing
11 changed files
with
926 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
# Rebbit | ||
|
||
[![Build Status](https://travis-ci.org/jd78/rabbit.svg?branch=master)](https://travis-ci.org/jd78/rabbit) | ||
|
||
Rabbit is built on top of streadway/amqp RabbitMQ client (https://github.com/streadway/amqp) and brings the following functionalities: | ||
|
||
- Can produce and consume messages in jSON and Protobuf. | ||
- Can produce and consume concurrently, specifying the wanted number. The producer is concurrent round robin, the consumer is a concurrent multichannel. | ||
- Partition consumer, can consume messages in sequences while concurrent. Assigning a partition id to a message, this will be consumed by the same channel, in order. | ||
- Dynamic handlers. | ||
- Message retries. | ||
- Can discard or requeue a message just returning the wanted behaviour from the handler. | ||
- Active and sleepy passive consumer. | ||
- Topology configurator. | ||
- Injection logger. | ||
|
||
#### Logger injection and rabbit initializer | ||
|
||
```go | ||
|
||
import "github.com/jd78/rabbit" | ||
|
||
logger := rabbit.CreateLogger( | ||
func(l string) { log.Println(l) }, //debug | ||
func(l string) { log.Println(l) }, //info | ||
func(l string) { log.Println(l) }, //warn | ||
func(l string) { log.Fatal(l) }, //error | ||
func(l string) { log.Println(l) }, //fata; | ||
rabbit.Debug //logLevel | ||
) | ||
|
||
rabbit.Initialize(logger, "amqp://username:password@localhost:5672/") | ||
``` | ||
|
||
#### Topology configuration | ||
|
||
It's possible to configure the topology, creating exchnges, queues and bindings. | ||
|
||
```go | ||
|
||
rabbit.TopologyConfiguration(). | ||
DeclareExchange("test.output", "topic", true, false, false, nil). | ||
DeclareExchange("consumer.output", "fanout", true, false, false, nil). | ||
DeclareQueue("test.inbound", true, false, false, nil). | ||
BindQueue("test.inbound", "#", "test.output", nil). | ||
DeclareQueue("consumer.inbound", true, false, false, map[string]interface{}{ | ||
"x-message-ttl": int32(600000), | ||
}). | ||
BindQueue("consumer.inbound", "", "consumer.output", nil). | ||
Complete() | ||
``` | ||
|
||
#### Producer | ||
|
||
It's necessary to configure a producer for each output exchange you want to send messages to. | ||
Each producer has at least one rabbit channel. You can specify how many concurrent and round robin producers you want, keeping in mind that each producer specified in the parameter "numberOfProducers" will create it's own rabbit channel. This has been done to ensure that the publish confirmation works correctly. Two messages cannot be sent at the same time for the same channel. Sent are queued in a round robin way. | ||
|
||
```go | ||
|
||
//params: number of producers, output exchange, publish type (Transient or Persistent), confirm publish | ||
testOutputProducer := rabbit.ConfigureProducer(1, "test.output", rabbit.Transient, true) | ||
consumerProducer := rabbit.ConfigureProducer(3, "consumer.output", rabbit.Transient, true) | ||
|
||
``` | ||
|
||
### Message types and message handlers | ||
|
||
Supposing we have the following message type we want to publish and consume | ||
|
||
```go | ||
|
||
type TestMessage struct { | ||
Id int | ||
Name string | ||
} | ||
|
||
``` | ||
|
||
Firstly we want to create a handler for this message | ||
|
||
```go | ||
|
||
func TestMessageHandler(test string, producer rabbit.IProducer) func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse { | ||
return func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse { | ||
testMessage := message.(TestMessage) | ||
log.Println("executing testMessage, test: " + test) | ||
log.Println(testMessage) | ||
log.Println(fmt.Sprintf("header: %v", header)) | ||
time.Sleep(500 * time.Millisecond) | ||
err := producer.Send(message, "", "test", "correlation", "", nil, rabbit.Json) | ||
if err != nil { | ||
log.Println("Error sending the message") | ||
return rabbit.Err | ||
} | ||
return rabbit.Completed | ||
} | ||
} | ||
|
||
``` | ||
|
||
A message handler can have parameters or not. You might want to inject your repository, a rabbit producer, etc. | ||
A handler always return the function func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse, where message and handler are respectively the message and the handler that will be injected by the rabbit library; and rabbit.HandlerResponse is the return status of the handler that will be read by the rabbit library to perfom specific operations like ack, reqeueing, etc. | ||
|
||
rabbit.HandlerResponse can be: | ||
- rabbit.Completed, if all good, the message will be eventually acked | ||
- rabbit.Reject, if you want to reject the message | ||
- rabbit.Requeue, if you with to requeue the message and re-consume it again | ||
- rabbit.Err, an error occurred. The message will be requeued or retried N time based on the client configuration that we are going to discuss later. | ||
|
||
In the previous example, we are just handling the TestMessage writing some log and publishing the same message again. | ||
|
||
### Consumer | ||
|
||
```go | ||
//params: prefetch, requeue waiting time on error | ||
consumer := rabbit.ConfigureConsumer(100, 5*time.Second) | ||
|
||
``` | ||
|
||
We can specify a prefetch and the requeue waiting time on error. The second parameter is the waiting time before the message gets requeued when you return rabbit.Err from a handler. | ||
|
||
Below is how you have to register your handlers | ||
|
||
```go | ||
|
||
h := TestMessageHandler("teststring", producer) //I'm passing a string and the producer dependency | ||
consumer.AddHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h) | ||
|
||
``` | ||
|
||
main.TestMessage is the type that will be send as envelope type, and the reflected type as second parameter is the is the contract representing the message. Finally, as third parameter, we pass the handler that will handle the message. | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
package rabbit | ||
|
||
import ( | ||
"crypto/rand" | ||
"encoding/hex" | ||
"fmt" | ||
) | ||
|
||
type Queue struct { | ||
name string | ||
durable bool | ||
autoDelete bool | ||
exclusive bool | ||
args map[string]interface{} | ||
} | ||
|
||
var Queues map[string]Queue | ||
|
||
func checkError(err error, additionalData string, lg *rabbitLogger) { | ||
if err != nil { | ||
l := fmt.Sprintf("%s: %s", additionalData, err.Error()) | ||
lg.fatal(l) | ||
panic(l) | ||
} | ||
} | ||
|
||
func checkErrorLight(err error, additionalData string, lg *rabbitLogger) { | ||
if err != nil { | ||
if lg.logLevel >= Warn { | ||
lg.warn(fmt.Sprintf("%s: %s", additionalData, err.Error())) | ||
} | ||
} | ||
} | ||
|
||
func getUniqueId() string { | ||
b := make([]byte, 4) | ||
rand.Read(b) | ||
return hex.EncodeToString(b) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package rabbit | ||
|
||
import "github.com/streadway/amqp" | ||
import "fmt" | ||
|
||
type rabbit struct { | ||
connection *amqp.Connection | ||
log *rabbitLogger | ||
} | ||
|
||
func initialize(endpoint string, log *rabbitLogger) rabbit { | ||
conn, err := amqp.Dial(endpoint) | ||
checkError(err, "error during connection", log) | ||
go func() { | ||
ch := make(chan *amqp.Error) | ||
conn.NotifyClose(ch) | ||
err := <-ch | ||
checkError(err, "Connection lost!", log) | ||
}() | ||
|
||
go func() { | ||
ch := make(chan amqp.Blocking) | ||
conn.NotifyBlocked(ch) | ||
for { | ||
status := <-ch | ||
if log.logLevel >= Warn { | ||
log.warn(fmt.Sprintf("connection blocked detected - block enabled: %t, reason: %s", status.Active, status.Reason)) | ||
} | ||
} | ||
}() | ||
|
||
return rabbit{conn, log} | ||
} | ||
|
||
func (r *rabbit) close() { | ||
r.connection.Close() | ||
} |
Oops, something went wrong.