From 01e4a60e8cd7e948679559da103a8f090c167ec1 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Wed, 10 Jan 2018 01:10:15 +0100 Subject: [PATCH 01/30] starting --- connectionManager.go | 15 +++++++++++++++ types.go | 13 +++++++++++++ 2 files changed, 28 insertions(+) create mode 100644 connectionManager.go create mode 100644 types.go diff --git a/connectionManager.go b/connectionManager.go new file mode 100644 index 0000000..d219e9f --- /dev/null +++ b/connectionManager.go @@ -0,0 +1,15 @@ +package rabbit + +import "github.com/streadway/amqp" + +var connection *amqp.Connection + +func Connect(endpoint string, logger *rabbitLogger) { + conn, err := amqp.Dial(endpoint) + if err != nil { + logger.fatal(err.Error()) + panic(err.Error()) + } + + connection = conn +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..c90e7cd --- /dev/null +++ b/types.go @@ -0,0 +1,13 @@ +package rabbit + +type logger func(string) + +type rabbitLogger struct { + fatal logger + err logger + info logger +} + +func CreateLogger(info logger, err logger, fatal logger) *rabbitLogger { + return &rabbitLogger{fatal, err, info} +} From f041b3f0e43db433102df01bcf980cea12312b70 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Fri, 12 Jan 2018 00:39:05 +0100 Subject: [PATCH 02/30] topology --- common.go | 13 +++++++++++ connectionManager.go | 52 ++++++++++++++++++++++++++++++++++++++------ init.go | 19 ++++++++++++++++ topology.go | 17 +++++++++++++++ 4 files changed, 94 insertions(+), 7 deletions(-) create mode 100644 common.go create mode 100644 init.go create mode 100644 topology.go diff --git a/common.go b/common.go new file mode 100644 index 0000000..8273942 --- /dev/null +++ b/common.go @@ -0,0 +1,13 @@ +package rabbit + +import ( + "fmt" +) + +func checkError(err error, additionalData string, lg *rabbitLogger) { + if err != nil { + l := fmt.Sprintf("%s: %s", additionalData, err.Error()) + lg.fatal(l) + panic(l) + } +} diff --git a/connectionManager.go b/connectionManager.go index d219e9f..1e35bb6 100644 --- a/connectionManager.go +++ b/connectionManager.go @@ -1,15 +1,53 @@ package rabbit import "github.com/streadway/amqp" +import "fmt" -var connection *amqp.Connection +type rabbit struct { + connection *amqp.Connection + log *rabbitLogger +} -func Connect(endpoint string, logger *rabbitLogger) { +func initialize(endpoint string, log *rabbitLogger) rabbit { conn, err := amqp.Dial(endpoint) - if err != nil { - logger.fatal(err.Error()) - panic(err.Error()) - } + checkError(err, "error during connection", log) + go func() { + ch := make(chan *amqp.Error) + conn.NotifyClose(ch) + err := <-ch + log.err(fmt.Sprintf("Connection lost - Error=%s", err.Error())) + }() + + return rabbit{conn, log} +} + +func (r *rabbit) close() { + r.connection.Close() +} + +func (r *rabbit) declareExchange(name, kind string, durable, autoDelete, internal bool, args map[string]interface{}) { + channel, err := r.connection.Channel() + checkError(err, "error creating topology channel", r.log) + defer channel.Close() + + exErr := channel.ExchangeDeclare(name, kind, durable, autoDelete, internal, false, args) + checkError(exErr, "error creating exchange", r.log) +} + +func (r *rabbit) declareQueue(name string, durable, autoDelete, exclusive bool, args map[string]interface{}) { + channel, err := r.connection.Channel() + checkError(err, "error creating topology channel", r.log) + defer channel.Close() + + _, qErr := channel.QueueDeclare(name, durable, autoDelete, exclusive, false, args) + checkError(qErr, "error creating queue", r.log) +} + +func (r *rabbit) bindQueue(name, routingKey, exchangeName string, args map[string]interface{}) { + channel, err := r.connection.Channel() + checkError(err, "error creating topology channel", r.log) + defer channel.Close() - connection = conn + bErr := channel.QueueBind(name, routingKey, exchangeName, false, args) + checkError(bErr, "error creating the queue bind", r.log) } diff --git a/init.go b/init.go new file mode 100644 index 0000000..3dd7c07 --- /dev/null +++ b/init.go @@ -0,0 +1,19 @@ +package rabbit + +var _r rabbit + +func Initialize(log *rabbitLogger, endpoint string) { + _r = initialize(endpoint, log) +} + +func DeclareExchange(name, kind string, durable, autoDelete, internal bool, args map[string]interface{}) { + _r.declareExchange(name, kind, durable, autoDelete, internal, args) +} + +func DeclareQueue(name string, durable, autoDelete, exclusive bool, args map[string]interface{}) { + _r.declareQueue(name, durable, autoDelete, exclusive, args) +} + +func BindQueue(name, routingKey, exchangeName string, args map[string]interface{}) { + _r.bindQueue(name, routingKey, exchangeName, args) +} diff --git a/topology.go b/topology.go new file mode 100644 index 0000000..942f323 --- /dev/null +++ b/topology.go @@ -0,0 +1,17 @@ +package rabbit + +import ( + "github.com/streadway/amqp" +) + +type topology struct { + channel *amqp.Channel +} + +func (r *rabbit) topologyConfiguration() topology { + channel, err := r.connection.Channel() + checkError(err, "error creating topology channel", r.log) + return topology{channel} +} + +//func (t *topology) From a4698176a73be065813a59a0994e4f5b8938819e Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Fri, 12 Jan 2018 00:50:53 +0100 Subject: [PATCH 03/30] topology --- connectionManager.go | 27 --------------------------- init.go | 12 ++---------- topology.go | 27 ++++++++++++++++++++++++--- 3 files changed, 26 insertions(+), 40 deletions(-) diff --git a/connectionManager.go b/connectionManager.go index 1e35bb6..b2ac6ba 100644 --- a/connectionManager.go +++ b/connectionManager.go @@ -24,30 +24,3 @@ func initialize(endpoint string, log *rabbitLogger) rabbit { func (r *rabbit) close() { r.connection.Close() } - -func (r *rabbit) declareExchange(name, kind string, durable, autoDelete, internal bool, args map[string]interface{}) { - channel, err := r.connection.Channel() - checkError(err, "error creating topology channel", r.log) - defer channel.Close() - - exErr := channel.ExchangeDeclare(name, kind, durable, autoDelete, internal, false, args) - checkError(exErr, "error creating exchange", r.log) -} - -func (r *rabbit) declareQueue(name string, durable, autoDelete, exclusive bool, args map[string]interface{}) { - channel, err := r.connection.Channel() - checkError(err, "error creating topology channel", r.log) - defer channel.Close() - - _, qErr := channel.QueueDeclare(name, durable, autoDelete, exclusive, false, args) - checkError(qErr, "error creating queue", r.log) -} - -func (r *rabbit) bindQueue(name, routingKey, exchangeName string, args map[string]interface{}) { - channel, err := r.connection.Channel() - checkError(err, "error creating topology channel", r.log) - defer channel.Close() - - bErr := channel.QueueBind(name, routingKey, exchangeName, false, args) - checkError(bErr, "error creating the queue bind", r.log) -} diff --git a/init.go b/init.go index 3dd7c07..31eb460 100644 --- a/init.go +++ b/init.go @@ -6,14 +6,6 @@ func Initialize(log *rabbitLogger, endpoint string) { _r = initialize(endpoint, log) } -func DeclareExchange(name, kind string, durable, autoDelete, internal bool, args map[string]interface{}) { - _r.declareExchange(name, kind, durable, autoDelete, internal, args) -} - -func DeclareQueue(name string, durable, autoDelete, exclusive bool, args map[string]interface{}) { - _r.declareQueue(name, durable, autoDelete, exclusive, args) -} - -func BindQueue(name, routingKey, exchangeName string, args map[string]interface{}) { - _r.bindQueue(name, routingKey, exchangeName, args) +func TopologyConfiguration() *topology { + return _r.topologyConfiguration() } diff --git a/topology.go b/topology.go index 942f323..914ae2b 100644 --- a/topology.go +++ b/topology.go @@ -6,12 +6,33 @@ import ( type topology struct { channel *amqp.Channel + log *rabbitLogger } -func (r *rabbit) topologyConfiguration() topology { +func (r *rabbit) topologyConfiguration() *topology { channel, err := r.connection.Channel() checkError(err, "error creating topology channel", r.log) - return topology{channel} + return &topology{channel, r.log} } -//func (t *topology) +func (t *topology) DeclareExchange(name, kind string, durable, autoDelete, internal bool, args map[string]interface{}) *topology { + err := t.channel.ExchangeDeclare(name, kind, durable, autoDelete, internal, false, args) + checkError(err, "error creating exchange", t.log) + return t +} + +func (t *topology) DeclareQueue(name string, durable, autoDelete, exclusive bool, args map[string]interface{}) *topology { + _, err := t.channel.QueueDeclare(name, durable, autoDelete, exclusive, false, args) + checkError(err, "error creating queue", t.log) + return t +} + +func (t *topology) BindQueue(name, routingKey, exchangeName string, args map[string]interface{}) *topology { + err := t.channel.QueueBind(name, routingKey, exchangeName, false, args) + checkError(err, "error creating the queue bind", t.log) + return t +} + +func (t *topology) Complete() { + t.channel.Close() +} From aa6a3ae092d63c017cae7666f6bd92709a8f2c91 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Fri, 12 Jan 2018 16:45:10 +0100 Subject: [PATCH 04/30] producer --- connectionManager.go | 10 ++++++ producer.go | 73 ++++++++++++++++++++++++++++++++++++++++++++ types.go | 20 ++++++++++-- 3 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 producer.go diff --git a/connectionManager.go b/connectionManager.go index b2ac6ba..41dd8bd 100644 --- a/connectionManager.go +++ b/connectionManager.go @@ -16,6 +16,16 @@ func initialize(endpoint string, log *rabbitLogger) rabbit { conn.NotifyClose(ch) err := <-ch log.err(fmt.Sprintf("Connection lost - Error=%s", err.Error())) + panic("connection lost") + }() + + go func() { + ch := make(chan amqp.Blocking) + conn.NotifyBlocked(ch) + for { + status := <-ch + log.warn(fmt.Sprintf("connection blocked detected - block enabled: %t, reason: %s", status.Active, status.Reason)) + } }() return rabbit{conn, log} diff --git a/producer.go b/producer.go new file mode 100644 index 0000000..0b731e9 --- /dev/null +++ b/producer.go @@ -0,0 +1,73 @@ +package rabbit + +import ( + "fmt" + "reflect" + "sync/atomic" + "time" + + "github.com/streadway/amqp" +) + +type producer struct { + numberOfProducers int + channels []*amqp.Channel + roundRobin int32 + exchangeName string + deliveryMode DeliveryMode +} + +var _producer producer + +func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode) { + if numberOfProducers < 1 { + msg := "numberOfProducers is less than 1" + r.log.err(msg) + panic(msg) + } + + channels := make([]*amqp.Channel, numberOfProducers, numberOfProducers) + for i := 0; i < numberOfProducers; i++ { + channel, err := r.connection.Channel() + + checkError(err, "Error creating the producing channel", r.log) + channels[i] = channel + + go func() { + ch := make(chan *amqp.Error) + channel.NotifyClose(ch) + err := <-ch + r.log.err(fmt.Sprintf("Connection lost - Error=%s", err.Error())) + panic("connection lost") + }() + + go func() { + ch := make(chan bool) + channel.NotifyFlow(ch) + for { + status := <-ch + r.log.warn(fmt.Sprintf("channel flow detected - flow enabled: %t", status)) + } + }() + } + + _producer = producer{numberOfProducers, channels, 0, exchangeName, deliveryMode} +} + +func (p *producer) getChannel() *amqp.Channel { + i := atomic.AddInt32(&p.roundRobin, 1) + return p.channels[int(i)%p.numberOfProducers] +} + +func (p *producer) send(message interface{}, routingKey, messageID string, header map[string]interface{}, contentType ContentType) error { + channel := p.getChannel() + err := channel.Publish(p.exchangeName, routingKey, false, false, amqp.Publishing{ + Headers: header, + ContentType: string(contentType), + DeliveryMode: uint8(p.deliveryMode), + MessageId: messageID, + Timestamp: time.Now().UTC(), + Type: reflect.TypeOf(message).String(), + }) + return err +} diff --git a/types.go b/types.go index c90e7cd..cd580c1 100644 --- a/types.go +++ b/types.go @@ -6,8 +6,24 @@ type rabbitLogger struct { fatal logger err logger info logger + warn logger } -func CreateLogger(info logger, err logger, fatal logger) *rabbitLogger { - return &rabbitLogger{fatal, err, info} +func CreateLogger(info logger, err logger, fatal logger, warn logger) *rabbitLogger { + return &rabbitLogger{fatal, err, info, warn} } + +type ContentType string + +const ( + text ContentType = "text" + json ContentType = "json" + protobuf ContentType = "protobuf" +) + +type DeliveryMode uint8 + +const ( + transient DeliveryMode = 0 + persistent DeliveryMode = 1 +) From 0f8f7b25616f9951c9e2de226f7118c3663f6cd1 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Sat, 13 Jan 2018 00:33:13 +0100 Subject: [PATCH 05/30] producer --- init.go | 4 ++++ producer.go | 34 +++++++++++++++++++++++++++------- types.go | 9 ++++----- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/init.go b/init.go index 31eb460..058bd31 100644 --- a/init.go +++ b/init.go @@ -9,3 +9,7 @@ func Initialize(log *rabbitLogger, endpoint string) { func TopologyConfiguration() *topology { return _r.topologyConfiguration() } + +func ConfigureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode) IProducer { + return _r.configureProducer(numberOfProducers, exchangeName, deliveryMode) +} diff --git a/producer.go b/producer.go index 0b731e9..5521c80 100644 --- a/producer.go +++ b/producer.go @@ -1,6 +1,8 @@ package rabbit import ( + "encoding/json" + "errors" "fmt" "reflect" "sync/atomic" @@ -9,17 +11,21 @@ import ( "github.com/streadway/amqp" ) +type IProducer interface { + Send(message interface{}, routingKey, messageID string, header map[string]interface{}, + contentType ContentType) error +} + type producer struct { numberOfProducers int channels []*amqp.Channel roundRobin int32 exchangeName string deliveryMode DeliveryMode + log *rabbitLogger } -var _producer producer - -func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode) { +func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode) IProducer { if numberOfProducers < 1 { msg := "numberOfProducers is less than 1" r.log.err(msg) @@ -51,7 +57,7 @@ func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, d }() } - _producer = producer{numberOfProducers, channels, 0, exchangeName, deliveryMode} + return &producer{numberOfProducers, channels, 0, exchangeName, deliveryMode, r.log} } func (p *producer) getChannel() *amqp.Channel { @@ -59,15 +65,29 @@ func (p *producer) getChannel() *amqp.Channel { return p.channels[int(i)%p.numberOfProducers] } -func (p *producer) send(message interface{}, routingKey, messageID string, header map[string]interface{}, contentType ContentType) error { +func (p *producer) Send(message interface{}, routingKey, messageID string, header map[string]interface{}, contentType ContentType) error { channel := p.getChannel() - err := channel.Publish(p.exchangeName, routingKey, false, false, amqp.Publishing{ + serialized, err := encode(message, contentType) + checkError(err, "json serializer error", p.log) + + pErr := channel.Publish(p.exchangeName, routingKey, false, false, amqp.Publishing{ Headers: header, ContentType: string(contentType), DeliveryMode: uint8(p.deliveryMode), MessageId: messageID, Timestamp: time.Now().UTC(), Type: reflect.TypeOf(message).String(), + Body: serialized, }) - return err + return pErr +} + +func encode(message interface{}, contentType ContentType) ([]byte, error) { + switch contentType { + case Json: + serialized, err := json.Marshal(message) + return serialized, err + default: + return nil, errors.New("unmapped content type") + } } diff --git a/types.go b/types.go index cd580c1..6cc006d 100644 --- a/types.go +++ b/types.go @@ -16,14 +16,13 @@ func CreateLogger(info logger, err logger, fatal logger, warn logger) *rabbitLog type ContentType string const ( - text ContentType = "text" - json ContentType = "json" - protobuf ContentType = "protobuf" + Json ContentType = "json" + Protobuf ContentType = "protobuf" ) type DeliveryMode uint8 const ( - transient DeliveryMode = 0 - persistent DeliveryMode = 1 + Transient DeliveryMode = 0 + Persistent DeliveryMode = 1 ) From b9aa5404294e29e9dfc7e8eedc0c02b1ac266222 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Sat, 13 Jan 2018 00:59:45 +0100 Subject: [PATCH 06/30] producer --- connectionManager.go | 5 +++-- init.go | 4 ++-- producer.go | 13 +++++++++---- types.go | 15 +++++++++++++-- 4 files changed, 27 insertions(+), 10 deletions(-) diff --git a/connectionManager.go b/connectionManager.go index 41dd8bd..90dac2a 100644 --- a/connectionManager.go +++ b/connectionManager.go @@ -6,9 +6,10 @@ import "fmt" type rabbit struct { connection *amqp.Connection log *rabbitLogger + logLevel LogLevel } -func initialize(endpoint string, log *rabbitLogger) rabbit { +func initialize(endpoint string, log *rabbitLogger, logLevel LogLevel) rabbit { conn, err := amqp.Dial(endpoint) checkError(err, "error during connection", log) go func() { @@ -28,7 +29,7 @@ func initialize(endpoint string, log *rabbitLogger) rabbit { } }() - return rabbit{conn, log} + return rabbit{conn, log, logLevel} } func (r *rabbit) close() { diff --git a/init.go b/init.go index 058bd31..534c1b5 100644 --- a/init.go +++ b/init.go @@ -2,8 +2,8 @@ package rabbit var _r rabbit -func Initialize(log *rabbitLogger, endpoint string) { - _r = initialize(endpoint, log) +func Initialize(log *rabbitLogger, endpoint string, logLevel LogLevel) { + _r = initialize(endpoint, log, logLevel) } func TopologyConfiguration() *topology { diff --git a/producer.go b/producer.go index 5521c80..de90e2b 100644 --- a/producer.go +++ b/producer.go @@ -23,6 +23,7 @@ type producer struct { exchangeName string deliveryMode DeliveryMode log *rabbitLogger + logLevel LogLevel } func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode) IProducer { @@ -57,7 +58,7 @@ func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, d }() } - return &producer{numberOfProducers, channels, 0, exchangeName, deliveryMode, r.log} + return &producer{numberOfProducers, channels, 0, exchangeName, deliveryMode, r.log, r.logLevel} } func (p *producer) getChannel() *amqp.Channel { @@ -67,8 +68,12 @@ func (p *producer) getChannel() *amqp.Channel { func (p *producer) Send(message interface{}, routingKey, messageID string, header map[string]interface{}, contentType ContentType) error { channel := p.getChannel() - serialized, err := encode(message, contentType) + serialized, err := serialize(message, contentType) checkError(err, "json serializer error", p.log) + messageType := reflect.TypeOf(message).String() + if p.logLevel >= Debug { + p.log.debug(fmt.Sprintf("Sending Message %s: %s", messageType, serialized)) + } pErr := channel.Publish(p.exchangeName, routingKey, false, false, amqp.Publishing{ Headers: header, @@ -76,13 +81,13 @@ func (p *producer) Send(message interface{}, routingKey, messageID string, heade DeliveryMode: uint8(p.deliveryMode), MessageId: messageID, Timestamp: time.Now().UTC(), - Type: reflect.TypeOf(message).String(), + Type: messageType, Body: serialized, }) return pErr } -func encode(message interface{}, contentType ContentType) ([]byte, error) { +func serialize(message interface{}, contentType ContentType) ([]byte, error) { switch contentType { case Json: serialized, err := json.Marshal(message) diff --git a/types.go b/types.go index 6cc006d..8dfedcc 100644 --- a/types.go +++ b/types.go @@ -7,10 +7,11 @@ type rabbitLogger struct { err logger info logger warn logger + debug logger } -func CreateLogger(info logger, err logger, fatal logger, warn logger) *rabbitLogger { - return &rabbitLogger{fatal, err, info, warn} +func CreateLogger(debug, info, warn, err, fatal logger) *rabbitLogger { + return &rabbitLogger{fatal, err, info, warn, debug} } type ContentType string @@ -26,3 +27,13 @@ const ( Transient DeliveryMode = 0 Persistent DeliveryMode = 1 ) + +type LogLevel int + +const ( + Fatal LogLevel = iota + Error + Warn + Info + Debug +) From 9039b0c392b7b0b250fac0dcbdad9d8e4d8f4433 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Sat, 13 Jan 2018 01:52:46 +0100 Subject: [PATCH 07/30] messagerype --- producer.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/producer.go b/producer.go index de90e2b..b1dfb22 100644 --- a/producer.go +++ b/producer.go @@ -8,11 +8,13 @@ import ( "sync/atomic" "time" + pattern "github.com/jd78/gopatternmatching" + "github.com/streadway/amqp" ) type IProducer interface { - Send(message interface{}, routingKey, messageID string, header map[string]interface{}, + Send(message interface{}, routingKey, messageID, messageType string, header map[string]interface{}, contentType ContentType) error } @@ -66,13 +68,19 @@ func (p *producer) getChannel() *amqp.Channel { return p.channels[int(i)%p.numberOfProducers] } -func (p *producer) Send(message interface{}, routingKey, messageID string, header map[string]interface{}, contentType ContentType) error { +//Send a message. +//messageType: if empty the message type will be reflected from the message +func (p *producer) Send(message interface{}, routingKey, messageID string, messageType string, header map[string]interface{}, contentType ContentType) error { channel := p.getChannel() serialized, err := serialize(message, contentType) checkError(err, "json serializer error", p.log) - messageType := reflect.TypeOf(message).String() + + mt := pattern.ResultMatch(messageType). + WhenValue("", func() interface{} { return messageType }). + ResultOrDefault(reflect.TypeOf(message).String()).(string) + if p.logLevel >= Debug { - p.log.debug(fmt.Sprintf("Sending Message %s: %s", messageType, serialized)) + p.log.debug(fmt.Sprintf("Sending Message %s: %s", mt, serialized)) } pErr := channel.Publish(p.exchangeName, routingKey, false, false, amqp.Publishing{ @@ -81,7 +89,7 @@ func (p *producer) Send(message interface{}, routingKey, messageID string, heade DeliveryMode: uint8(p.deliveryMode), MessageId: messageID, Timestamp: time.Now().UTC(), - Type: messageType, + Type: mt, Body: serialized, }) return pErr From e025647c82dab3ade972c8689580ed7c895a7f44 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Sat, 13 Jan 2018 15:00:00 +0100 Subject: [PATCH 08/30] confirms --- init.go | 4 ++-- producer.go | 39 ++++++++++++++++++++++++++++++++------- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/init.go b/init.go index 534c1b5..9731b74 100644 --- a/init.go +++ b/init.go @@ -10,6 +10,6 @@ func TopologyConfiguration() *topology { return _r.topologyConfiguration() } -func ConfigureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode) IProducer { - return _r.configureProducer(numberOfProducers, exchangeName, deliveryMode) +func ConfigureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode, confirmPublish bool) IProducer { + return _r.configureProducer(numberOfProducers, exchangeName, deliveryMode, confirmPublish) } diff --git a/producer.go b/producer.go index b1dfb22..c7b7874 100644 --- a/producer.go +++ b/producer.go @@ -26,9 +26,12 @@ type producer struct { deliveryMode DeliveryMode log *rabbitLogger logLevel LogLevel + confirmPublish bool + confirms []chan amqp.Confirmation } -func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode) IProducer { +func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode, + confirmPublish bool) IProducer { if numberOfProducers < 1 { msg := "numberOfProducers is less than 1" r.log.err(msg) @@ -36,12 +39,19 @@ func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, d } channels := make([]*amqp.Channel, numberOfProducers, numberOfProducers) + confirms := make([]chan amqp.Confirmation, numberOfProducers, numberOfProducers) for i := 0; i < numberOfProducers; i++ { channel, err := r.connection.Channel() checkError(err, "Error creating the producing channel", r.log) channels[i] = channel + if confirmPublish { + confirms[i] = channel.NotifyPublish(make(chan amqp.Confirmation, 1)) + err := channel.Confirm(false) + checkError(err, "failed to create channel confirmation", r.log) + } + go func() { ch := make(chan *amqp.Error) channel.NotifyClose(ch) @@ -60,18 +70,20 @@ func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, d }() } - return &producer{numberOfProducers, channels, 0, exchangeName, deliveryMode, r.log, r.logLevel} + return &producer{numberOfProducers, channels, 0, exchangeName, deliveryMode, r.log, r.logLevel, confirmPublish, + confirms} } -func (p *producer) getChannel() *amqp.Channel { - i := atomic.AddInt32(&p.roundRobin, 1) - return p.channels[int(i)%p.numberOfProducers] +func (p *producer) getNext() int { + return int(atomic.AddInt32(&p.roundRobin, 1)) } //Send a message. //messageType: if empty the message type will be reflected from the message func (p *producer) Send(message interface{}, routingKey, messageID string, messageType string, header map[string]interface{}, contentType ContentType) error { - channel := p.getChannel() + i := p.getNext() + channel := p.channels[i%p.numberOfProducers] + serialized, err := serialize(message, contentType) checkError(err, "json serializer error", p.log) @@ -92,7 +104,20 @@ func (p *producer) Send(message interface{}, routingKey, messageID string, messa Type: mt, Body: serialized, }) - return pErr + + if pErr != nil { + return pErr + } + + if p.confirmPublish { + if confirmed := <-p.confirms[i]; confirmed.Ack { + return nil + } + + return errors.New("unable to publish") + } + + return nil } func serialize(message interface{}, contentType ContentType) ([]byte, error) { From 6d4114e7a8e217648e9c79064e0a9ff925df276f Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Sun, 14 Jan 2018 19:01:39 +0100 Subject: [PATCH 09/30] producer make it better --- producer.go | 102 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 73 insertions(+), 29 deletions(-) diff --git a/producer.go b/producer.go index c7b7874..98fb51b 100644 --- a/producer.go +++ b/producer.go @@ -18,6 +18,18 @@ type IProducer interface { contentType ContentType) error } +type sendMessage struct { + message interface{} + routingKey string + messageID string + messageType string + header map[string]interface{} + contentType ContentType + responseChannel *chan error + producer *producer + producerIndex int +} + type producer struct { numberOfProducers int channels []*amqp.Channel @@ -28,6 +40,7 @@ type producer struct { logLevel LogLevel confirmPublish bool confirms []chan amqp.Confirmation + producers []chan sendMessage } func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode, @@ -40,6 +53,7 @@ func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, d channels := make([]*amqp.Channel, numberOfProducers, numberOfProducers) confirms := make([]chan amqp.Confirmation, numberOfProducers, numberOfProducers) + producers := make([]chan sendMessage, numberOfProducers, numberOfProducers) for i := 0; i < numberOfProducers; i++ { channel, err := r.connection.Channel() @@ -68,56 +82,86 @@ func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, d r.log.warn(fmt.Sprintf("channel flow detected - flow enabled: %t", status)) } }() + + producers[i] = make(chan sendMessage, 1) + go func(i int) { + for { + s := <-producers[i] + send(&s) + } + }(i) } return &producer{numberOfProducers, channels, 0, exchangeName, deliveryMode, r.log, r.logLevel, confirmPublish, - confirms} + confirms, producers} } -func (p *producer) getNext() int { - return int(atomic.AddInt32(&p.roundRobin, 1)) -} - -//Send a message. -//messageType: if empty the message type will be reflected from the message -func (p *producer) Send(message interface{}, routingKey, messageID string, messageType string, header map[string]interface{}, contentType ContentType) error { - i := p.getNext() - channel := p.channels[i%p.numberOfProducers] - - serialized, err := serialize(message, contentType) - checkError(err, "json serializer error", p.log) +func send(s *sendMessage) { + serialized, err := serialize(s.message, s.contentType) + checkError(err, "json serializer error", s.producer.log) - mt := pattern.ResultMatch(messageType). - WhenValue("", func() interface{} { return messageType }). - ResultOrDefault(reflect.TypeOf(message).String()).(string) + mt := pattern.ResultMatch(s.messageType). + WhenValue("", func() interface{} { return s.messageType }). + ResultOrDefault(reflect.TypeOf(s.message).String()).(string) - if p.logLevel >= Debug { - p.log.debug(fmt.Sprintf("Sending Message %s: %s", mt, serialized)) + if s.producer.logLevel >= Debug { + s.producer.log.debug(fmt.Sprintf("Sending Message %s: %s", mt, serialized)) } - pErr := channel.Publish(p.exchangeName, routingKey, false, false, amqp.Publishing{ - Headers: header, - ContentType: string(contentType), - DeliveryMode: uint8(p.deliveryMode), - MessageId: messageID, + pErr := s.producer.channels[s.producerIndex].Publish(s.producer.exchangeName, s.routingKey, false, false, amqp.Publishing{ + Headers: s.header, + ContentType: string(s.contentType), + DeliveryMode: uint8(s.producer.deliveryMode), + MessageId: s.messageID, Timestamp: time.Now().UTC(), Type: mt, Body: serialized, }) if pErr != nil { - return pErr + *s.responseChannel <- pErr + return } - if p.confirmPublish { - if confirmed := <-p.confirms[i]; confirmed.Ack { - return nil + if s.producer.confirmPublish { + if confirmed := <-s.producer.confirms[s.producerIndex]; confirmed.Ack { + *s.responseChannel <- nil + return } - return errors.New("unable to publish") + *s.responseChannel <- errors.New("unable to publish") + return + } + + *s.responseChannel <- nil +} + +func (p *producer) getNext() int { + return int(atomic.AddInt32(&p.roundRobin, 1)) +} + +//Send a message. +//messageType: if empty the message type will be reflected from the message +func (p *producer) Send(message interface{}, routingKey, messageID, messageType string, header map[string]interface{}, contentType ContentType) error { + i := p.getNext() % p.numberOfProducers + response := make(chan error, 1) + + //p.log.info(fmt.Sprintf("message id %s, assigned to worker %d", messageID, i)) + + s := sendMessage{ + producerIndex: i, + contentType: contentType, + header: header, + message: message, + messageID: messageID, + messageType: messageType, + producer: p, + responseChannel: &response, + routingKey: routingKey, } - return nil + p.producers[i] <- s + return <-response } func serialize(message interface{}, contentType ContentType) ([]byte, error) { From bc6e844d5f0b1308470fe05aebdafa0e67fa1ecb Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Sun, 14 Jan 2018 19:58:35 +0100 Subject: [PATCH 10/30] bugfix + consumer --- consumer.go | 30 ++++++++++++++++++++++++++++++ init.go | 4 ++++ producer.go | 4 ++-- 3 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 consumer.go diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..703cd76 --- /dev/null +++ b/consumer.go @@ -0,0 +1,30 @@ +package rabbit + +import ( + "fmt" +) + +type Handler func(message interface{}, params ...interface{}) + +type IConsumer interface { + AddHandler(messageType string, handler Handler) +} + +type consumer struct { + handlers map[string]Handler + log *rabbitLogger +} + +func (r *rabbit) configureConsumer() IConsumer { + h := make(map[string]Handler) + return &consumer{h, r.log} +} + +func (c *consumer) AddHandler(messageType string, handler Handler) { + if _, exists := c.handlers[messageType]; exists == true { + err := fmt.Sprintf("messageType %s already mapped", messageType) + c.log.err(err) + panic(err) + } + c.handlers[messageType] = handler +} diff --git a/init.go b/init.go index 9731b74..b38e97d 100644 --- a/init.go +++ b/init.go @@ -13,3 +13,7 @@ func TopologyConfiguration() *topology { func ConfigureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode, confirmPublish bool) IProducer { return _r.configureProducer(numberOfProducers, exchangeName, deliveryMode, confirmPublish) } + +func ConfigureConsumer() IConsumer { + return _r.configureConsumer() +} diff --git a/producer.go b/producer.go index 98fb51b..ec04177 100644 --- a/producer.go +++ b/producer.go @@ -101,8 +101,8 @@ func send(s *sendMessage) { checkError(err, "json serializer error", s.producer.log) mt := pattern.ResultMatch(s.messageType). - WhenValue("", func() interface{} { return s.messageType }). - ResultOrDefault(reflect.TypeOf(s.message).String()).(string) + WhenValue("", func() interface{} { return reflect.TypeOf(s.message).String() }). + ResultOrDefault(s.messageType).(string) if s.producer.logLevel >= Debug { s.producer.log.debug(fmt.Sprintf("Sending Message %s: %s", mt, serialized)) From 8d3ed74dc81731dcccd4cec87d9bfbc05f46274b Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Mon, 15 Jan 2018 11:12:27 +0100 Subject: [PATCH 11/30] temp consumer --- consumer.go | 87 +++++++++++++++++++++++++++++++++++++++++++++++---- init.go | 4 +-- producer.go | 15 ++------- serializer.go | 27 ++++++++++++++++ 4 files changed, 112 insertions(+), 21 deletions(-) create mode 100644 serializer.go diff --git a/consumer.go b/consumer.go index 703cd76..84b5011 100644 --- a/consumer.go +++ b/consumer.go @@ -1,30 +1,105 @@ package rabbit import ( + "crypto/rand" + "encoding/hex" + "errors" "fmt" + + "github.com/streadway/amqp" ) -type Handler func(message interface{}, params ...interface{}) +type Handler func(message interface{}) type IConsumer interface { AddHandler(messageType string, handler Handler) + StartConsuming(queue string, ack, activePassive bool, concurrentConsumers int, args map[string]interface{}) string } type consumer struct { - handlers map[string]Handler - log *rabbitLogger + handlers map[string]Handler + log *rabbitLogger + channel *amqp.Channel + consumerRunning bool } -func (r *rabbit) configureConsumer() IConsumer { +func (r *rabbit) configureConsumer(prefetch int) IConsumer { + channel, err := r.connection.Channel() + checkError(err, "Error creating the producing channel", r.log) + + qErr := channel.Qos(prefetch, 0, false) + checkError(qErr, "Error assigning prefetch on the channel", r.log) + + go func() { + ch := make(chan *amqp.Error) + channel.NotifyClose(ch) + err := <-ch + r.log.err(fmt.Sprintf("Channel closed - Error=%s", err.Error())) + panic("Channel closed") + }() + + go func() { + ch := make(chan bool) + channel.NotifyFlow(ch) + for { + status := <-ch + r.log.warn(fmt.Sprintf("channel flow detected - flow enabled: %t", status)) + } + }() + h := make(map[string]Handler) - return &consumer{h, r.log} + return &consumer{h, r.log, channel, false} +} + +func (c *consumer) handlerExists(messageType string) bool { + _, exists := c.handlers[messageType] + return exists } func (c *consumer) AddHandler(messageType string, handler Handler) { - if _, exists := c.handlers[messageType]; exists == true { + if c.handlerExists(messageType) { err := fmt.Sprintf("messageType %s already mapped", messageType) c.log.err(err) panic(err) } c.handlers[messageType] = handler } + +func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurrentConsumers int, args map[string]interface{}) string { + if c.consumerRunning { + err := errors.New("Consumer already running, please configure a new consumer for concurrent processing") + checkError(err, "Error starting the consumer", c.log) + } + + b := make([]byte, 4) //equals 8 charachters + rand.Read(b) + s := hex.EncodeToString(b) + + delivery, err := c.channel.Consume(queue, s, !ack, activePassive, false, false, args) + checkError(err, "Error starting the consumer", c.log) + + for i := 0; i < concurrentConsumers; i++ { + go func(work <-chan amqp.Delivery) { + for w := range work { + if !c.handlerExists(w.Type) { + if ack { + w.Ack(false) //TODO error + } + return + } + + handler := c.handlers[w.Type] + obj, err := deserialize(w.Body, ContentType(w.ContentType)) + if err != nil { + //TODO!! + } + handler(obj) //TODO check error + if ack { + w.Ack(false) //TODO error + } + } + }(delivery) + } + + return s +} diff --git a/init.go b/init.go index b38e97d..ec36409 100644 --- a/init.go +++ b/init.go @@ -14,6 +14,6 @@ func ConfigureProducer(numberOfProducers int, exchangeName string, deliveryMode return _r.configureProducer(numberOfProducers, exchangeName, deliveryMode, confirmPublish) } -func ConfigureConsumer() IConsumer { - return _r.configureConsumer() +func ConfigureConsumer(preferch int) IConsumer { + return _r.configureConsumer(preferch) } diff --git a/producer.go b/producer.go index ec04177..5e4cf88 100644 --- a/producer.go +++ b/producer.go @@ -1,7 +1,6 @@ package rabbit import ( - "encoding/json" "errors" "fmt" "reflect" @@ -70,8 +69,8 @@ func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, d ch := make(chan *amqp.Error) channel.NotifyClose(ch) err := <-ch - r.log.err(fmt.Sprintf("Connection lost - Error=%s", err.Error())) - panic("connection lost") + r.log.err(fmt.Sprintf("Channel closed - Error=%s", err.Error())) + panic("Channel closed") }() go func() { @@ -163,13 +162,3 @@ func (p *producer) Send(message interface{}, routingKey, messageID, messageType p.producers[i] <- s return <-response } - -func serialize(message interface{}, contentType ContentType) ([]byte, error) { - switch contentType { - case Json: - serialized, err := json.Marshal(message) - return serialized, err - default: - return nil, errors.New("unmapped content type") - } -} diff --git a/serializer.go b/serializer.go new file mode 100644 index 0000000..898c958 --- /dev/null +++ b/serializer.go @@ -0,0 +1,27 @@ +package rabbit + +import ( + "encoding/json" + "errors" +) + +func serialize(message interface{}, contentType ContentType) ([]byte, error) { + switch contentType { + case Json: + serialized, err := json.Marshal(message) + return serialized, err + default: + return nil, errors.New("unmapped content type") + } +} + +func deserialize(message []byte, contentType ContentType) (interface{}, error) { + switch contentType { + case Json: + var obj interface{} + err := json.Unmarshal(message, &obj) + return obj, err + default: + return nil, errors.New("unmapped content type") + } +} From ea8c56d5673b9b882dcffc2ff69ecbe71e1afa20 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Mon, 15 Jan 2018 13:10:03 +0100 Subject: [PATCH 12/30] working --- consumer.go | 12 ++++++++---- serializer.go | 6 ++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/consumer.go b/consumer.go index 84b5011..ac19038 100644 --- a/consumer.go +++ b/consumer.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "errors" "fmt" + "reflect" "github.com/streadway/amqp" ) @@ -12,12 +13,13 @@ import ( type Handler func(message interface{}) type IConsumer interface { - AddHandler(messageType string, handler Handler) + AddHandler(messageType string, concreteType reflect.Type, handler Handler) StartConsuming(queue string, ack, activePassive bool, concurrentConsumers int, args map[string]interface{}) string } type consumer struct { handlers map[string]Handler + types map[string]reflect.Type log *rabbitLogger channel *amqp.Channel consumerRunning bool @@ -48,7 +50,8 @@ func (r *rabbit) configureConsumer(prefetch int) IConsumer { }() h := make(map[string]Handler) - return &consumer{h, r.log, channel, false} + t := make(map[string]reflect.Type) + return &consumer{h, t, r.log, channel, false} } func (c *consumer) handlerExists(messageType string) bool { @@ -56,13 +59,14 @@ func (c *consumer) handlerExists(messageType string) bool { return exists } -func (c *consumer) AddHandler(messageType string, handler Handler) { +func (c *consumer) AddHandler(messageType string, concreteType reflect.Type, handler Handler) { if c.handlerExists(messageType) { err := fmt.Sprintf("messageType %s already mapped", messageType) c.log.err(err) panic(err) } c.handlers[messageType] = handler + c.types[messageType] = concreteType } func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurrentConsumers int, args map[string]interface{}) string { @@ -89,7 +93,7 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurr } handler := c.handlers[w.Type] - obj, err := deserialize(w.Body, ContentType(w.ContentType)) + obj, err := deserialize(w.Body, ContentType(w.ContentType), c.types[w.Type]) if err != nil { //TODO!! } diff --git a/serializer.go b/serializer.go index 898c958..ccbf68c 100644 --- a/serializer.go +++ b/serializer.go @@ -3,6 +3,7 @@ package rabbit import ( "encoding/json" "errors" + "reflect" ) func serialize(message interface{}, contentType ContentType) ([]byte, error) { @@ -15,10 +16,11 @@ func serialize(message interface{}, contentType ContentType) ([]byte, error) { } } -func deserialize(message []byte, contentType ContentType) (interface{}, error) { +func deserialize(message []byte, contentType ContentType, concreteType reflect.Type) (interface{}, error) { switch contentType { case Json: - var obj interface{} + //obj := reflect.Indirect(reflect.New(concreteType)).Interface() + obj := reflect.New(concreteType).Interface() err := json.Unmarshal(message, &obj) return obj, err default: From 2c5643c17aeff4e849ac2e906cf9e172a398340d Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Mon, 15 Jan 2018 13:17:56 +0100 Subject: [PATCH 13/30] deserializer --- serializer.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/serializer.go b/serializer.go index ccbf68c..be745ae 100644 --- a/serializer.go +++ b/serializer.go @@ -19,10 +19,13 @@ func serialize(message interface{}, contentType ContentType) ([]byte, error) { func deserialize(message []byte, contentType ContentType, concreteType reflect.Type) (interface{}, error) { switch contentType { case Json: - //obj := reflect.Indirect(reflect.New(concreteType)).Interface() - obj := reflect.New(concreteType).Interface() - err := json.Unmarshal(message, &obj) - return obj, err + pointer := reflect.New(concreteType).Interface() + err := json.Unmarshal(message, &pointer) + if err != nil { + return nil, err + } + noPointer := reflect.Indirect(reflect.ValueOf(pointer)).Interface() + return noPointer, nil default: return nil, errors.New("unmapped content type") } From a86d3ef0ae41e058f1a354003473eb5ad3384605 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Mon, 15 Jan 2018 13:35:37 +0100 Subject: [PATCH 14/30] error handling --- common.go | 6 ++++++ consumer.go | 27 +++++++++++++++++++-------- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/common.go b/common.go index 8273942..3bec9d1 100644 --- a/common.go +++ b/common.go @@ -11,3 +11,9 @@ func checkError(err error, additionalData string, lg *rabbitLogger) { panic(l) } } + +func checkErrorLight(err error, additionalData string, lg *rabbitLogger) { + if err != nil { + lg.warn(fmt.Sprintf("%s: %s", additionalData, err.Error())) + } +} diff --git a/consumer.go b/consumer.go index ac19038..f4acd9d 100644 --- a/consumer.go +++ b/consumer.go @@ -75,32 +75,43 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurr checkError(err, "Error starting the consumer", c.log) } - b := make([]byte, 4) //equals 8 charachters + b := make([]byte, 4) rand.Read(b) s := hex.EncodeToString(b) delivery, err := c.channel.Consume(queue, s, !ack, activePassive, false, false, args) checkError(err, "Error starting the consumer", c.log) + maybeAckMessage := func(m amqp.Delivery) { + if ack { + err := m.Ack(false) + checkErrorLight(err, "Could not ack the message, it will be eventually requeued", c.log) + } + } + + maybeNackMessage := func(m amqp.Delivery) { + if ack { + err := m.Nack(false, true) + checkErrorLight(err, "Could not nack the message, it will be eventually requeued", c.log) + } + } + for i := 0; i < concurrentConsumers; i++ { go func(work <-chan amqp.Delivery) { for w := range work { if !c.handlerExists(w.Type) { - if ack { - w.Ack(false) //TODO error - } + maybeAckMessage(w) return } handler := c.handlers[w.Type] obj, err := deserialize(w.Body, ContentType(w.ContentType), c.types[w.Type]) if err != nil { - //TODO!! + c.log.err(fmt.Sprintf("MessageID=%s, could not deserialize the message, requeueing...", w.MessageId)) + maybeNackMessage(w) } handler(obj) //TODO check error - if ack { - w.Ack(false) //TODO error - } + maybeAckMessage(w) } }(delivery) } From 1fbac13d994bc8f81708f71503f7f493bc004524 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Mon, 15 Jan 2018 14:54:13 +0100 Subject: [PATCH 15/30] bugfix + correlation --- consumer.go | 8 +++++--- producer.go | 21 ++++++++++++--------- types.go | 10 ++++++++++ 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/consumer.go b/consumer.go index f4acd9d..9c27963 100644 --- a/consumer.go +++ b/consumer.go @@ -10,7 +10,7 @@ import ( "github.com/streadway/amqp" ) -type Handler func(message interface{}) +type Handler func(message interface{}) HandlerResponse type IConsumer interface { AddHandler(messageType string, concreteType reflect.Type, handler Handler) @@ -69,6 +69,8 @@ func (c *consumer) AddHandler(messageType string, concreteType reflect.Type, han c.types[messageType] = concreteType } +//StartConsuming will start a new consumer +//concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurrentConsumers int, args map[string]interface{}) string { if c.consumerRunning { err := errors.New("Consumer already running, please configure a new consumer for concurrent processing") @@ -101,7 +103,7 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurr for w := range work { if !c.handlerExists(w.Type) { maybeAckMessage(w) - return + continue } handler := c.handlers[w.Type] @@ -110,7 +112,7 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurr c.log.err(fmt.Sprintf("MessageID=%s, could not deserialize the message, requeueing...", w.MessageId)) maybeNackMessage(w) } - handler(obj) //TODO check error + handler(obj) //TODO implement response maybeAckMessage(w) } }(delivery) diff --git a/producer.go b/producer.go index 5e4cf88..c956236 100644 --- a/producer.go +++ b/producer.go @@ -13,7 +13,7 @@ import ( ) type IProducer interface { - Send(message interface{}, routingKey, messageID, messageType string, header map[string]interface{}, + Send(message interface{}, routingKey, messageID, correlationId, messageType string, header map[string]interface{}, contentType ContentType) error } @@ -27,6 +27,7 @@ type sendMessage struct { responseChannel *chan error producer *producer producerIndex int + correlationId string } type producer struct { @@ -108,13 +109,14 @@ func send(s *sendMessage) { } pErr := s.producer.channels[s.producerIndex].Publish(s.producer.exchangeName, s.routingKey, false, false, amqp.Publishing{ - Headers: s.header, - ContentType: string(s.contentType), - DeliveryMode: uint8(s.producer.deliveryMode), - MessageId: s.messageID, - Timestamp: time.Now().UTC(), - Type: mt, - Body: serialized, + Headers: s.header, + ContentType: string(s.contentType), + DeliveryMode: uint8(s.producer.deliveryMode), + MessageId: s.messageID, + Timestamp: time.Now().UTC(), + Type: mt, + Body: serialized, + CorrelationId: s.correlationId, }) if pErr != nil { @@ -141,7 +143,7 @@ func (p *producer) getNext() int { //Send a message. //messageType: if empty the message type will be reflected from the message -func (p *producer) Send(message interface{}, routingKey, messageID, messageType string, header map[string]interface{}, contentType ContentType) error { +func (p *producer) Send(message interface{}, routingKey, messageID, correlationId, messageType string, header map[string]interface{}, contentType ContentType) error { i := p.getNext() % p.numberOfProducers response := make(chan error, 1) @@ -157,6 +159,7 @@ func (p *producer) Send(message interface{}, routingKey, messageID, messageType producer: p, responseChannel: &response, routingKey: routingKey, + correlationId: correlationId, } p.producers[i] <- s diff --git a/types.go b/types.go index 8dfedcc..b75f77e 100644 --- a/types.go +++ b/types.go @@ -37,3 +37,13 @@ const ( Info Debug ) + +type HandlerResponse int + +const ( + Completed HandlerResponse = iota + RequeueInHead + RequeueInTail + RejectMessage + Retry +) From ef722252a67d84d26d209e21cbd25a18f2cf87a3 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Mon, 15 Jan 2018 14:56:38 +0100 Subject: [PATCH 16/30] make log better --- consumer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consumer.go b/consumer.go index 9c27963..914b8f1 100644 --- a/consumer.go +++ b/consumer.go @@ -87,14 +87,14 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurr maybeAckMessage := func(m amqp.Delivery) { if ack { err := m.Ack(false) - checkErrorLight(err, "Could not ack the message, it will be eventually requeued", c.log) + checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not ack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), c.log) } } maybeNackMessage := func(m amqp.Delivery) { if ack { err := m.Nack(false, true) - checkErrorLight(err, "Could not nack the message, it will be eventually requeued", c.log) + checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not nack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), c.log) } } @@ -109,7 +109,7 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurr handler := c.handlers[w.Type] obj, err := deserialize(w.Body, ContentType(w.ContentType), c.types[w.Type]) if err != nil { - c.log.err(fmt.Sprintf("MessageID=%s, could not deserialize the message, requeueing...", w.MessageId)) + c.log.err(fmt.Sprintf("MessageID=%s, CorrelationId=%s, could not deserialize the message, requeueing...", w.MessageId, w.CorrelationId)) maybeNackMessage(w) } handler(obj) //TODO implement response From a3f084345c0a19e14acbf2f2e1c827bae461ad67 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Mon, 15 Jan 2018 15:59:31 +0100 Subject: [PATCH 17/30] default serializer --- serializer.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/serializer.go b/serializer.go index be745ae..132bb78 100644 --- a/serializer.go +++ b/serializer.go @@ -2,23 +2,25 @@ package rabbit import ( "encoding/json" - "errors" "reflect" ) func serialize(message interface{}, contentType ContentType) ([]byte, error) { - switch contentType { - case Json: + serializeJson := func() ([]byte, error) { serialized, err := json.Marshal(message) return serialized, err - default: - return nil, errors.New("unmapped content type") } -} -func deserialize(message []byte, contentType ContentType, concreteType reflect.Type) (interface{}, error) { switch contentType { case Json: + return serializeJson() + default: //use json + return serializeJson() + } +} + +func deserialize(message []byte, contentType ContentType, concreteType reflect.Type) (interface{}, error) { + deserializeJson := func() (interface{}, error) { pointer := reflect.New(concreteType).Interface() err := json.Unmarshal(message, &pointer) if err != nil { @@ -26,7 +28,12 @@ func deserialize(message []byte, contentType ContentType, concreteType reflect.T } noPointer := reflect.Indirect(reflect.ValueOf(pointer)).Interface() return noPointer, nil + } + + switch contentType { + case Json: + return deserializeJson() default: - return nil, errors.New("unmapped content type") + return deserializeJson() } } From 1da3aada8f1d4baf5c5e1c4a00b0ce0be23d0936 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Mon, 15 Jan 2018 23:46:18 +0100 Subject: [PATCH 18/30] echange bind --- topology.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/topology.go b/topology.go index 914ae2b..78b7288 100644 --- a/topology.go +++ b/topology.go @@ -11,25 +11,31 @@ type topology struct { func (r *rabbit) topologyConfiguration() *topology { channel, err := r.connection.Channel() - checkError(err, "error creating topology channel", r.log) + checkError(err, "Error creating topology channel", r.log) return &topology{channel, r.log} } func (t *topology) DeclareExchange(name, kind string, durable, autoDelete, internal bool, args map[string]interface{}) *topology { err := t.channel.ExchangeDeclare(name, kind, durable, autoDelete, internal, false, args) - checkError(err, "error creating exchange", t.log) + checkError(err, "Error creating exchange", t.log) return t } func (t *topology) DeclareQueue(name string, durable, autoDelete, exclusive bool, args map[string]interface{}) *topology { _, err := t.channel.QueueDeclare(name, durable, autoDelete, exclusive, false, args) - checkError(err, "error creating queue", t.log) + checkError(err, "Error creating queue", t.log) return t } func (t *topology) BindQueue(name, routingKey, exchangeName string, args map[string]interface{}) *topology { err := t.channel.QueueBind(name, routingKey, exchangeName, false, args) - checkError(err, "error creating the queue bind", t.log) + checkError(err, "Error creating the queue bind", t.log) + return t +} + +func (t *topology) BindExchange(source, destination, routingKey string, args map[string]interface{}) *topology { + err := t.channel.ExchangeBind(destination, routingKey, source, false, args) + checkError(err, "Error creating the exchange bind", t.log) return t } From 9a0cf2ccebb4d10a6426b834b1d4cdf218358311 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Tue, 16 Jan 2018 00:54:59 +0100 Subject: [PATCH 19/30] active passive --- common.go | 18 ++++++++++++++++++ consumer.go | 38 +++++++++++++++++++++++++++++--------- executeOnce.go | 12 ++++++++++++ topology.go | 2 ++ 4 files changed, 61 insertions(+), 9 deletions(-) create mode 100644 executeOnce.go diff --git a/common.go b/common.go index 3bec9d1..da0f239 100644 --- a/common.go +++ b/common.go @@ -1,9 +1,21 @@ package rabbit import ( + "crypto/rand" + "encoding/hex" "fmt" ) +type Queue struct { + name string + durable bool + autoDelete bool + exclusive bool + args map[string]interface{} +} + +var Queues map[string]Queue + func checkError(err error, additionalData string, lg *rabbitLogger) { if err != nil { l := fmt.Sprintf("%s: %s", additionalData, err.Error()) @@ -17,3 +29,9 @@ func checkErrorLight(err error, additionalData string, lg *rabbitLogger) { lg.warn(fmt.Sprintf("%s: %s", additionalData, err.Error())) } } + +func getUniqueId() string { + b := make([]byte, 4) + rand.Read(b) + return hex.EncodeToString(b) +} diff --git a/consumer.go b/consumer.go index 914b8f1..8c913ad 100644 --- a/consumer.go +++ b/consumer.go @@ -1,11 +1,10 @@ package rabbit import ( - "crypto/rand" - "encoding/hex" "errors" "fmt" "reflect" + "time" "github.com/streadway/amqp" ) @@ -14,7 +13,8 @@ type Handler func(message interface{}) HandlerResponse type IConsumer interface { AddHandler(messageType string, concreteType reflect.Type, handler Handler) - StartConsuming(queue string, ack, activePassive bool, concurrentConsumers int, args map[string]interface{}) string + StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, + concurrentConsumers int, args map[string]interface{}) string } type consumer struct { @@ -71,19 +71,38 @@ func (c *consumer) AddHandler(messageType string, concreteType reflect.Type, han //StartConsuming will start a new consumer //concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel -func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurrentConsumers int, args map[string]interface{}) string { +func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, + concurrentConsumers int, args map[string]interface{}) string { if c.consumerRunning { err := errors.New("Consumer already running, please configure a new consumer for concurrent processing") checkError(err, "Error starting the consumer", c.log) } - b := make([]byte, 4) - rand.Read(b) - s := hex.EncodeToString(b) + if activePassive { + logOnce := executeOnce{} + for { + qInfo := Queues[queue] + q, err := c.channel.QueueDeclarePassive(qInfo.name, qInfo.durable, qInfo.autoDelete, qInfo.exclusive, + false, qInfo.args) + checkError(err, "Error declaring queue passive", c.log) + if q.Consumers == 0 { + break + } else { + logOnce.MaybeExecute(func() { c.log.info(fmt.Sprintf("Consumer passive on queue %s", queue)) }) + time.Sleep(activePassiveRetryInterval) + } + } + } - delivery, err := c.channel.Consume(queue, s, !ack, activePassive, false, false, args) + consumerId := getUniqueId() + + delivery, err := c.channel.Consume(queue, consumerId, !ack, activePassive, false, false, args) checkError(err, "Error starting the consumer", c.log) + if activePassive { + c.log.info(fmt.Sprintf("Consumer active on queue %s", queue)) + } + maybeAckMessage := func(m amqp.Delivery) { if ack { err := m.Ack(false) @@ -118,5 +137,6 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, concurr }(delivery) } - return s + c.consumerRunning = true + return consumerId } diff --git a/executeOnce.go b/executeOnce.go new file mode 100644 index 0000000..50c5501 --- /dev/null +++ b/executeOnce.go @@ -0,0 +1,12 @@ +package rabbit + +type executeOnce struct { + executed bool +} + +func (e *executeOnce) MaybeExecute(f func()) { + if !e.executed { + f() + e.executed = true + } +} diff --git a/topology.go b/topology.go index 78b7288..39d069f 100644 --- a/topology.go +++ b/topology.go @@ -12,6 +12,7 @@ type topology struct { func (r *rabbit) topologyConfiguration() *topology { channel, err := r.connection.Channel() checkError(err, "Error creating topology channel", r.log) + Queues = make(map[string]Queue) return &topology{channel, r.log} } @@ -24,6 +25,7 @@ func (t *topology) DeclareExchange(name, kind string, durable, autoDelete, inter func (t *topology) DeclareQueue(name string, durable, autoDelete, exclusive bool, args map[string]interface{}) *topology { _, err := t.channel.QueueDeclare(name, durable, autoDelete, exclusive, false, args) checkError(err, "Error creating queue", t.log) + Queues[name] = Queue{name, durable, autoDelete, exclusive, args} return t } From 506bbfa37d40ec6b71b9a0128238f07cd79cc6ff Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Tue, 16 Jan 2018 20:35:08 +0100 Subject: [PATCH 20/30] make it better --- consumer.go | 37 ++++++++++++++----------------------- deliveryEx.go | 25 +++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 23 deletions(-) create mode 100644 deliveryEx.go diff --git a/consumer.go b/consumer.go index 8c913ad..92c87c1 100644 --- a/consumer.go +++ b/consumer.go @@ -69,6 +69,18 @@ func (c *consumer) AddHandler(messageType string, concreteType reflect.Type, han c.types[messageType] = concreteType } +func (c *consumer) handle(w amqp.Delivery, ack bool) { + handler := c.handlers[w.Type] + obj, err := deserialize(w.Body, ContentType(w.ContentType), c.types[w.Type]) + if err != nil { + c.log.err(fmt.Sprintf("MessageID=%s, CorrelationId=%s, could not deserialize the message, requeueing...", w.MessageId, w.CorrelationId)) + (&envelope{&w}).maybeNackMessage(ack, c.log) + } + + handler(obj) //TODO + (&envelope{&w}).maybeAckMessage(ack, c.log) +} + //StartConsuming will start a new consumer //concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, @@ -103,36 +115,15 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP c.log.info(fmt.Sprintf("Consumer active on queue %s", queue)) } - maybeAckMessage := func(m amqp.Delivery) { - if ack { - err := m.Ack(false) - checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not ack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), c.log) - } - } - - maybeNackMessage := func(m amqp.Delivery) { - if ack { - err := m.Nack(false, true) - checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not nack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), c.log) - } - } - for i := 0; i < concurrentConsumers; i++ { go func(work <-chan amqp.Delivery) { for w := range work { if !c.handlerExists(w.Type) { - maybeAckMessage(w) + (&envelope{&w}).maybeAckMessage(ack, c.log) continue } - handler := c.handlers[w.Type] - obj, err := deserialize(w.Body, ContentType(w.ContentType), c.types[w.Type]) - if err != nil { - c.log.err(fmt.Sprintf("MessageID=%s, CorrelationId=%s, could not deserialize the message, requeueing...", w.MessageId, w.CorrelationId)) - maybeNackMessage(w) - } - handler(obj) //TODO implement response - maybeAckMessage(w) + c.handle(w, ack) } }(delivery) } diff --git a/deliveryEx.go b/deliveryEx.go new file mode 100644 index 0000000..f327ff3 --- /dev/null +++ b/deliveryEx.go @@ -0,0 +1,25 @@ +package rabbit + +import ( + "fmt" + + "github.com/streadway/amqp" +) + +type envelope struct { + *amqp.Delivery +} + +func (m *envelope) maybeAckMessage(ack bool, log *rabbitLogger) { + if ack { + err := m.Ack(false) + checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not ack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), log) + } +} + +func (m *envelope) maybeNackMessage(ack bool, log *rabbitLogger) { + if ack { + err := m.Nack(false, true) + checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not nack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), log) + } +} From 5546ddb6b9fe4fb9759695ec8836187d94cf1bde Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Tue, 16 Jan 2018 22:10:45 +0100 Subject: [PATCH 21/30] return response --- common.go | 4 ++- connectionManager.go | 12 +++---- consumer.go | 79 +++++++++++++++++++++++++++++++++++--------- deliveryEx.go | 9 ++++- init.go | 4 +-- producer.go | 21 ++++++------ types.go | 22 ++++++------ 7 files changed, 103 insertions(+), 48 deletions(-) diff --git a/common.go b/common.go index da0f239..fb292e6 100644 --- a/common.go +++ b/common.go @@ -26,7 +26,9 @@ func checkError(err error, additionalData string, lg *rabbitLogger) { func checkErrorLight(err error, additionalData string, lg *rabbitLogger) { if err != nil { - lg.warn(fmt.Sprintf("%s: %s", additionalData, err.Error())) + if lg.logLevel >= Warn { + lg.warn(fmt.Sprintf("%s: %s", additionalData, err.Error())) + } } } diff --git a/connectionManager.go b/connectionManager.go index 90dac2a..61c3cb6 100644 --- a/connectionManager.go +++ b/connectionManager.go @@ -6,18 +6,16 @@ import "fmt" type rabbit struct { connection *amqp.Connection log *rabbitLogger - logLevel LogLevel } -func initialize(endpoint string, log *rabbitLogger, logLevel LogLevel) rabbit { +func initialize(endpoint string, log *rabbitLogger) rabbit { conn, err := amqp.Dial(endpoint) checkError(err, "error during connection", log) go func() { ch := make(chan *amqp.Error) conn.NotifyClose(ch) err := <-ch - log.err(fmt.Sprintf("Connection lost - Error=%s", err.Error())) - panic("connection lost") + checkError(err, "Connection lost!", log) }() go func() { @@ -25,11 +23,13 @@ func initialize(endpoint string, log *rabbitLogger, logLevel LogLevel) rabbit { conn.NotifyBlocked(ch) for { status := <-ch - log.warn(fmt.Sprintf("connection blocked detected - block enabled: %t, reason: %s", status.Active, status.Reason)) + if log.logLevel >= Warn { + log.warn(fmt.Sprintf("connection blocked detected - block enabled: %t, reason: %s", status.Active, status.Reason)) + } } }() - return rabbit{conn, log, logLevel} + return rabbit{conn, log} } func (r *rabbit) close() { diff --git a/consumer.go b/consumer.go index 92c87c1..450cdd5 100644 --- a/consumer.go +++ b/consumer.go @@ -14,7 +14,7 @@ type Handler func(message interface{}) HandlerResponse type IConsumer interface { AddHandler(messageType string, concreteType reflect.Type, handler Handler) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, - concurrentConsumers int, args map[string]interface{}) string + concurrentConsumers, retryTimesOnError int, args map[string]interface{}) string } type consumer struct { @@ -36,8 +36,7 @@ func (r *rabbit) configureConsumer(prefetch int) IConsumer { ch := make(chan *amqp.Error) channel.NotifyClose(ch) err := <-ch - r.log.err(fmt.Sprintf("Channel closed - Error=%s", err.Error())) - panic("Channel closed") + checkError(err, "Channel closed", r.log) }() go func() { @@ -45,7 +44,9 @@ func (r *rabbit) configureConsumer(prefetch int) IConsumer { channel.NotifyFlow(ch) for { status := <-ch - r.log.warn(fmt.Sprintf("channel flow detected - flow enabled: %t", status)) + if r.log.logLevel >= Warn { + r.log.warn(fmt.Sprintf("channel flow detected - flow enabled: %t", status)) + } } }() @@ -61,30 +62,72 @@ func (c *consumer) handlerExists(messageType string) bool { func (c *consumer) AddHandler(messageType string, concreteType reflect.Type, handler Handler) { if c.handlerExists(messageType) { - err := fmt.Sprintf("messageType %s already mapped", messageType) - c.log.err(err) - panic(err) + err := fmt.Errorf("messageType %s already mapped", messageType) + checkError(err, "", c.log) } c.handlers[messageType] = handler c.types[messageType] = concreteType } -func (c *consumer) handle(w amqp.Delivery, ack bool) { +func (c *consumer) handle(w amqp.Delivery, ack bool, retried, maxRetry int) { + if w.Redelivered { + if c.log.logLevel >= Info { + c.log.info(fmt.Sprintf("MessageID=%s, CorrelationId=%s, has been redelivered", + w.MessageId, w.CorrelationId)) + } + } handler := c.handlers[w.Type] obj, err := deserialize(w.Body, ContentType(w.ContentType), c.types[w.Type]) if err != nil { - c.log.err(fmt.Sprintf("MessageID=%s, CorrelationId=%s, could not deserialize the message, requeueing...", w.MessageId, w.CorrelationId)) - (&envelope{&w}).maybeNackMessage(ack, c.log) + if c.log.logLevel >= Error { + c.log.err(fmt.Sprintf("MessageID=%s, CorrelationId=%s, could not deserialize the message, requeueing...", + w.MessageId, w.CorrelationId)) + } + (&envelope{&w}).maybeRequeueMessage(ack, c.log) } - handler(obj) //TODO - (&envelope{&w}).maybeAckMessage(ack, c.log) + response := handler(obj) + switch response { + case Completed: + if c.log.logLevel >= Debug { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, completed.", + w.MessageId, w.CorrelationId)) + } + (&envelope{&w}).maybeAckMessage(ack, c.log) + case Requeue: + if c.log.logLevel >= Debug { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, requeueing message...", + w.MessageId, w.CorrelationId)) + } + (&envelope{&w}).maybeRequeueMessage(ack, c.log) + case Reject: + if c.log.logLevel >= Info { + c.log.info(fmt.Sprintf("MessageId=%s, CorrelationId=%s, rejecting message...", + w.MessageId, w.CorrelationId)) + } + (&envelope{&w}).maybeRejectMessage(ack, c.log) + case Err: + if retried > maxRetry-1 { + if c.log.logLevel >= Warn { + c.log.warn(fmt.Sprintf("MessageId=%s, CorrelationId=%s, max retry reached, rejecting message...", + w.MessageId, w.CorrelationId)) + } + (&envelope{&w}).maybeRejectMessage(ack, c.log) + } else { + time.Sleep(time.Duration(retried*200) * time.Microsecond) + if c.log.logLevel >= Debug { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, retry=%d times, retrying due to error...", + w.MessageId, w.CorrelationId, retried)) + } + c.handle(w, ack, retried+1, maxRetry) + } + } } //StartConsuming will start a new consumer //concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, - concurrentConsumers int, args map[string]interface{}) string { + concurrentConsumers, retryTimesOnError int, args map[string]interface{}) string { if c.consumerRunning { err := errors.New("Consumer already running, please configure a new consumer for concurrent processing") checkError(err, "Error starting the consumer", c.log) @@ -100,7 +143,9 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP if q.Consumers == 0 { break } else { - logOnce.MaybeExecute(func() { c.log.info(fmt.Sprintf("Consumer passive on queue %s", queue)) }) + if c.log.logLevel >= Info { + logOnce.MaybeExecute(func() { c.log.info(fmt.Sprintf("Consumer passive on queue %s", queue)) }) + } time.Sleep(activePassiveRetryInterval) } } @@ -112,7 +157,9 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP checkError(err, "Error starting the consumer", c.log) if activePassive { - c.log.info(fmt.Sprintf("Consumer active on queue %s", queue)) + if c.log.logLevel >= Info { + c.log.info(fmt.Sprintf("Consumer active on queue %s", queue)) + } } for i := 0; i < concurrentConsumers; i++ { @@ -123,7 +170,7 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP continue } - c.handle(w, ack) + c.handle(w, ack, 0, retryTimesOnError) } }(delivery) } diff --git a/deliveryEx.go b/deliveryEx.go index f327ff3..c5a30b8 100644 --- a/deliveryEx.go +++ b/deliveryEx.go @@ -17,9 +17,16 @@ func (m *envelope) maybeAckMessage(ack bool, log *rabbitLogger) { } } -func (m *envelope) maybeNackMessage(ack bool, log *rabbitLogger) { +func (m *envelope) maybeRequeueMessage(ack bool, log *rabbitLogger) { if ack { err := m.Nack(false, true) checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not nack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), log) } } + +func (m *envelope) maybeRejectMessage(ack bool, log *rabbitLogger) { + if ack { + err := m.Nack(false, false) + checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not nack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), log) + } +} diff --git a/init.go b/init.go index ec36409..1338919 100644 --- a/init.go +++ b/init.go @@ -2,8 +2,8 @@ package rabbit var _r rabbit -func Initialize(log *rabbitLogger, endpoint string, logLevel LogLevel) { - _r = initialize(endpoint, log, logLevel) +func Initialize(log *rabbitLogger, endpoint string) { + _r = initialize(endpoint, log) } func TopologyConfiguration() *topology { diff --git a/producer.go b/producer.go index c956236..952c41f 100644 --- a/producer.go +++ b/producer.go @@ -37,7 +37,6 @@ type producer struct { exchangeName string deliveryMode DeliveryMode log *rabbitLogger - logLevel LogLevel confirmPublish bool confirms []chan amqp.Confirmation producers []chan sendMessage @@ -46,9 +45,8 @@ type producer struct { func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode, confirmPublish bool) IProducer { if numberOfProducers < 1 { - msg := "numberOfProducers is less than 1" - r.log.err(msg) - panic(msg) + err := errors.New("numberOfProducers is less than 1") + checkError(err, "", r.log) } channels := make([]*amqp.Channel, numberOfProducers, numberOfProducers) @@ -70,8 +68,7 @@ func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, d ch := make(chan *amqp.Error) channel.NotifyClose(ch) err := <-ch - r.log.err(fmt.Sprintf("Channel closed - Error=%s", err.Error())) - panic("Channel closed") + checkError(err, "Channel closed!", r.log) }() go func() { @@ -79,7 +76,9 @@ func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, d channel.NotifyFlow(ch) for { status := <-ch - r.log.warn(fmt.Sprintf("channel flow detected - flow enabled: %t", status)) + if r.log.logLevel >= Warn { + r.log.warn(fmt.Sprintf("channel flow detected - flow enabled: %t", status)) + } } }() @@ -87,16 +86,16 @@ func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, d go func(i int) { for { s := <-producers[i] - send(&s) + send(&s, r.log.logLevel) } }(i) } - return &producer{numberOfProducers, channels, 0, exchangeName, deliveryMode, r.log, r.logLevel, confirmPublish, + return &producer{numberOfProducers, channels, 0, exchangeName, deliveryMode, r.log, confirmPublish, confirms, producers} } -func send(s *sendMessage) { +func send(s *sendMessage, logLevel LogLevel) { serialized, err := serialize(s.message, s.contentType) checkError(err, "json serializer error", s.producer.log) @@ -104,7 +103,7 @@ func send(s *sendMessage) { WhenValue("", func() interface{} { return reflect.TypeOf(s.message).String() }). ResultOrDefault(s.messageType).(string) - if s.producer.logLevel >= Debug { + if logLevel >= Debug { s.producer.log.debug(fmt.Sprintf("Sending Message %s: %s", mt, serialized)) } diff --git a/types.go b/types.go index b75f77e..60c53fb 100644 --- a/types.go +++ b/types.go @@ -3,15 +3,16 @@ package rabbit type logger func(string) type rabbitLogger struct { - fatal logger - err logger - info logger - warn logger - debug logger + fatal logger + err logger + info logger + warn logger + debug logger + logLevel LogLevel } -func CreateLogger(debug, info, warn, err, fatal logger) *rabbitLogger { - return &rabbitLogger{fatal, err, info, warn, debug} +func CreateLogger(debug, info, warn, err, fatal logger, logLevel LogLevel) *rabbitLogger { + return &rabbitLogger{fatal, err, info, warn, debug, logLevel} } type ContentType string @@ -42,8 +43,7 @@ type HandlerResponse int const ( Completed HandlerResponse = iota - RequeueInHead - RequeueInTail - RejectMessage - Retry + Requeue + Reject + Err ) From 2077c09622e42a8c83ca5f3bcff4929551918820 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Thu, 18 Jan 2018 01:44:56 +0100 Subject: [PATCH 22/30] partitioner --- consumer.go | 119 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 107 insertions(+), 12 deletions(-) diff --git a/consumer.go b/consumer.go index 450cdd5..0036af7 100644 --- a/consumer.go +++ b/consumer.go @@ -4,8 +4,10 @@ import ( "errors" "fmt" "reflect" + "sync/atomic" "time" + "github.com/jd78/partitioner" "github.com/streadway/amqp" ) @@ -15,6 +17,9 @@ type IConsumer interface { AddHandler(messageType string, concreteType reflect.Type, handler Handler) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, concurrentConsumers, retryTimesOnError int, args map[string]interface{}) string + StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, + concurrentConsumers, retryTimesOnError, partitions, maxWaitingTimeRetryOnPartitionFailMilliseconds int, + partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string } type consumer struct { @@ -25,6 +30,22 @@ type consumer struct { consumerRunning bool } +var roundrobin int64 + +type partition struct { + message interface{} + partitionResolver map[reflect.Type]func(message interface{}) int64 +} + +func (p partition) GetPartition() int64 { + t := reflect.TypeOf(p.message) + if _, exists := p.partitionResolver[t]; exists { + return p.partitionResolver[t](p.message) + } + + return atomic.AddInt64(&roundrobin, 1) +} + func (r *rabbit) configureConsumer(prefetch int) IConsumer { channel, err := r.connection.Channel() checkError(err, "Error creating the producing channel", r.log) @@ -69,7 +90,7 @@ func (c *consumer) AddHandler(messageType string, concreteType reflect.Type, han c.types[messageType] = concreteType } -func (c *consumer) handle(w amqp.Delivery, ack bool, retried, maxRetry int) { +func (c *consumer) handle(w amqp.Delivery, message interface{}, ack bool, retried, maxRetry int) { if w.Redelivered { if c.log.logLevel >= Info { c.log.info(fmt.Sprintf("MessageID=%s, CorrelationId=%s, has been redelivered", @@ -77,16 +98,8 @@ func (c *consumer) handle(w amqp.Delivery, ack bool, retried, maxRetry int) { } } handler := c.handlers[w.Type] - obj, err := deserialize(w.Body, ContentType(w.ContentType), c.types[w.Type]) - if err != nil { - if c.log.logLevel >= Error { - c.log.err(fmt.Sprintf("MessageID=%s, CorrelationId=%s, could not deserialize the message, requeueing...", - w.MessageId, w.CorrelationId)) - } - (&envelope{&w}).maybeRequeueMessage(ack, c.log) - } - response := handler(obj) + response := handler(message) switch response { case Completed: if c.log.logLevel >= Debug { @@ -119,11 +132,22 @@ func (c *consumer) handle(w amqp.Delivery, ack bool, retried, maxRetry int) { c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, retry=%d times, retrying due to error...", w.MessageId, w.CorrelationId, retried)) } - c.handle(w, ack, retried+1, maxRetry) + c.handle(w, message, ack, retried+1, maxRetry) } } } +func (c *consumer) deserializeMessage(w amqp.Delivery) (interface{}, error) { + obj, err := deserialize(w.Body, ContentType(w.ContentType), c.types[w.Type]) + if err != nil { + if c.log.logLevel >= Error { + c.log.err(fmt.Sprintf("MessageID=%s, CorrelationId=%s, could not deserialize the message, requeueing...", + w.MessageId, w.CorrelationId)) + } + } + return obj, err +} + //StartConsuming will start a new consumer //concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, @@ -170,7 +194,13 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP continue } - c.handle(w, ack, 0, retryTimesOnError) + message, err := c.deserializeMessage(w) + if err != nil { + (&envelope{&w}).maybeRequeueMessage(ack, c.log) + continue + } + + c.handle(w, message, ack, 0, retryTimesOnError) } }(delivery) } @@ -178,3 +208,68 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP c.consumerRunning = true return consumerId } + +//StartConsuming will start a new consumer +//concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel +func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, + concurrentConsumers, retryTimesOnError, partitions, maxWaitingTimeRetryOnPartitionFailMilliseconds int, + partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string { + if c.consumerRunning { + err := errors.New("Consumer already running, please configure a new consumer for concurrent processing") + checkError(err, "Error starting the consumer", c.log) + } + + if activePassive { + logOnce := executeOnce{} + for { + qInfo := Queues[queue] + q, err := c.channel.QueueDeclarePassive(qInfo.name, qInfo.durable, qInfo.autoDelete, qInfo.exclusive, + false, qInfo.args) + checkError(err, "Error declaring queue passive", c.log) + if q.Consumers == 0 { + break + } else { + if c.log.logLevel >= Info { + logOnce.MaybeExecute(func() { c.log.info(fmt.Sprintf("Consumer passive on queue %s", queue)) }) + } + time.Sleep(activePassiveRetryInterval) + } + } + } + + consumerId := getUniqueId() + + delivery, err := c.channel.Consume(queue, consumerId, !ack, activePassive, false, false, args) + checkError(err, "Error starting the consumer", c.log) + + if activePassive { + if c.log.logLevel >= Info { + c.log.info(fmt.Sprintf("Consumer active on queue %s", queue)) + } + } + + part := partitioner.CreatePartitioner(partitions, maxWaitingTimeRetryOnPartitionFailMilliseconds) + + go func(work <-chan amqp.Delivery) { + for w := range work { + if !c.handlerExists(w.Type) { + (&envelope{&w}).maybeAckMessage(ack, c.log) + continue + } + + message, err := c.deserializeMessage(w) + if err != nil { + (&envelope{&w}).maybeRequeueMessage(ack, c.log) + continue + } + + part.HandleInSequence(func(done chan bool) { + c.handle(w, message, ack, 0, retryTimesOnError) + done <- true + }, partition{message, partitionResolver}) + } + }(delivery) + + c.consumerRunning = true + return consumerId +} From 0a47567b3633dd472556865ed888bd9eebfe321d Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Thu, 18 Jan 2018 02:23:18 +0100 Subject: [PATCH 23/30] bugfix --- consumer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index 0036af7..cdb4d60 100644 --- a/consumer.go +++ b/consumer.go @@ -263,8 +263,9 @@ func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive boo continue } + cw := w part.HandleInSequence(func(done chan bool) { - c.handle(w, message, ack, 0, retryTimesOnError) + c.handle(cw, message, ack, 0, retryTimesOnError) done <- true }, partition{message, partitionResolver}) } From b30493d660c8ad30fdd8070a8f6da840f2c2217e Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Fri, 19 Jan 2018 23:40:59 +0100 Subject: [PATCH 24/30] error handling --- consumer.go | 52 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/consumer.go b/consumer.go index cdb4d60..c3f23e6 100644 --- a/consumer.go +++ b/consumer.go @@ -15,10 +15,10 @@ type Handler func(message interface{}) HandlerResponse type IConsumer interface { AddHandler(messageType string, concreteType reflect.Type, handler Handler) - StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, + StartConsuming(queue string, ack, activePassive, enableRetries bool, activePassiveRetryInterval, requeueTimeIntervalOnError time.Duration, concurrentConsumers, retryTimesOnError int, args map[string]interface{}) string - StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, - concurrentConsumers, retryTimesOnError, partitions, maxWaitingTimeRetryOnPartitionFailMilliseconds int, + StartConsumingPartitions(queue string, ack, activePassive, enableRetries bool, activePassiveRetryInterval, requeueTimeIntervalOnError, + maxWaitingTimeRetryIntervalOnPartitionError time.Duration, retryTimesOnError, partitions int, partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string } @@ -90,7 +90,8 @@ func (c *consumer) AddHandler(messageType string, concreteType reflect.Type, han c.types[messageType] = concreteType } -func (c *consumer) handle(w amqp.Delivery, message interface{}, ack bool, retried, maxRetry int) { +func (c *consumer) handle(w amqp.Delivery, message interface{}, ack, enableRetries bool, retried, maxRetry int, + requeueTimeMillisecondsOnError time.Duration) { if w.Redelivered { if c.log.logLevel >= Info { c.log.info(fmt.Sprintf("MessageID=%s, CorrelationId=%s, has been redelivered", @@ -120,19 +121,26 @@ func (c *consumer) handle(w amqp.Delivery, message interface{}, ack bool, retrie } (&envelope{&w}).maybeRejectMessage(ack, c.log) case Err: - if retried > maxRetry-1 { - if c.log.logLevel >= Warn { - c.log.warn(fmt.Sprintf("MessageId=%s, CorrelationId=%s, max retry reached, rejecting message...", - w.MessageId, w.CorrelationId)) + if enableRetries { + if retried > maxRetry-1 { + if c.log.logLevel >= Warn { + c.log.warn(fmt.Sprintf("MessageId=%s, CorrelationId=%s, max retry reached, rejecting message...", + w.MessageId, w.CorrelationId)) + } + (&envelope{&w}).maybeRejectMessage(ack, c.log) + } else { + time.Sleep(time.Duration(retried*200) * time.Microsecond) + if c.log.logLevel >= Debug { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, retry=%d times, retrying due to error...", + w.MessageId, w.CorrelationId, retried)) + } + c.handle(w, message, ack, enableRetries, retried+1, maxRetry, requeueTimeMillisecondsOnError) } - (&envelope{&w}).maybeRejectMessage(ack, c.log) } else { - time.Sleep(time.Duration(retried*200) * time.Microsecond) - if c.log.logLevel >= Debug { - c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, retry=%d times, retrying due to error...", - w.MessageId, w.CorrelationId, retried)) - } - c.handle(w, message, ack, retried+1, maxRetry) + go func() { + time.Sleep(requeueTimeMillisecondsOnError) + (&envelope{&w}).maybeRequeueMessage(ack, c.log) + }() } } } @@ -150,7 +158,7 @@ func (c *consumer) deserializeMessage(w amqp.Delivery) (interface{}, error) { //StartConsuming will start a new consumer //concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel -func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, +func (c *consumer) StartConsuming(queue string, ack, activePassive, enableRetries bool, activePassiveRetryInterval, requeueTimeIntervalOnError time.Duration, concurrentConsumers, retryTimesOnError int, args map[string]interface{}) string { if c.consumerRunning { err := errors.New("Consumer already running, please configure a new consumer for concurrent processing") @@ -200,7 +208,7 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP continue } - c.handle(w, message, ack, 0, retryTimesOnError) + c.handle(w, message, ack, enableRetries, 0, retryTimesOnError, requeueTimeIntervalOnError) } }(delivery) } @@ -211,9 +219,9 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP //StartConsuming will start a new consumer //concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel -func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, - concurrentConsumers, retryTimesOnError, partitions, maxWaitingTimeRetryOnPartitionFailMilliseconds int, - partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string { +func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive, enableRetries bool, activePassiveRetryInterval, + requeueTimeIntervalOnError, maxWaitingTimeRetryIntervalOnPartitionError time.Duration, + retryTimesOnError, partitions int, partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string { if c.consumerRunning { err := errors.New("Consumer already running, please configure a new consumer for concurrent processing") checkError(err, "Error starting the consumer", c.log) @@ -248,7 +256,7 @@ func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive boo } } - part := partitioner.CreatePartitioner(partitions, maxWaitingTimeRetryOnPartitionFailMilliseconds) + part := partitioner.CreatePartitioner(partitions, maxWaitingTimeRetryIntervalOnPartitionError) go func(work <-chan amqp.Delivery) { for w := range work { @@ -265,7 +273,7 @@ func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive boo cw := w part.HandleInSequence(func(done chan bool) { - c.handle(cw, message, ack, 0, retryTimesOnError) + c.handle(cw, message, ack, enableRetries, 0, retryTimesOnError, requeueTimeIntervalOnError) done <- true }, partition{message, partitionResolver}) } From fcd76838c429f5833127e5a096cd4e815232223a Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Fri, 19 Jan 2018 23:57:51 +0100 Subject: [PATCH 25/30] minor --- consumer.go | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/consumer.go b/consumer.go index c3f23e6..e2fb895 100644 --- a/consumer.go +++ b/consumer.go @@ -27,7 +27,7 @@ type consumer struct { types map[string]reflect.Type log *rabbitLogger channel *amqp.Channel - consumerRunning bool + consumerRunning map[string]bool } var roundrobin int64 @@ -73,7 +73,7 @@ func (r *rabbit) configureConsumer(prefetch int) IConsumer { h := make(map[string]Handler) t := make(map[string]reflect.Type) - return &consumer{h, t, r.log, channel, false} + return &consumer{h, t, r.log, channel, make(map[string]bool)} } func (c *consumer) handlerExists(messageType string) bool { @@ -158,10 +158,19 @@ func (c *consumer) deserializeMessage(w amqp.Delivery) (interface{}, error) { //StartConsuming will start a new consumer //concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel +//queue: queue name +//ack: true enables ack +//enableReties: enable message retries in case of handler error (will preserve the order) +//activePassive: enables acrive and sleepy passive consumers +//activePassiveRetryInterval: time interval checking if the queue has a consumer +//requeueTimeIntervalOnError: time interval requeueing a message in case of handler error (message ordering will be lost). Takes effect if enableRetries is false +//concurrentConsumers: number of consumers +//retryTimesOnError: number of retries before discarding a message. Takes effect if enableRetries is true +//args: consumer args func (c *consumer) StartConsuming(queue string, ack, activePassive, enableRetries bool, activePassiveRetryInterval, requeueTimeIntervalOnError time.Duration, concurrentConsumers, retryTimesOnError int, args map[string]interface{}) string { - if c.consumerRunning { - err := errors.New("Consumer already running, please configure a new consumer for concurrent processing") + if _, exists := c.consumerRunning[queue]; exists { + err := errors.New("Consumer already running, please configure a new consumer for concurrent processing or set the concurrentConsumers") checkError(err, "Error starting the consumer", c.log) } @@ -213,17 +222,29 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive, enableRetrie }(delivery) } - c.consumerRunning = true + c.consumerRunning[queue] = true return consumerId } //StartConsuming will start a new consumer //concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel +//queue: queue name +//ack: true enables ack +//enableReties: enable message retries in case of handler error (will preserve the order) +//activePassive: enables acrive and sleepy passive consumers +//activePassiveRetryInterval: time interval checking if the queue has a consumer +//requeueTimeIntervalOnError: time interval requeueing a message in case of handler error (message ordering will be lost). Takes effect if enableRetries is false +//maxWaitingTimeRetryIntervalOnPartitionError: Sleep time between retries in case of handler error +//concurrentConsumers: number of consumers +//retryTimesOnError: number of retries before discarding a message. Takes effect if enableRetries is true +//partitions: number of consurrent/consistent partitions +//partitionResolver: map[reflect.Type]func(message interface{}) int64, for each message type specify a function that will return the key used to partition +//args: consumer args func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive, enableRetries bool, activePassiveRetryInterval, requeueTimeIntervalOnError, maxWaitingTimeRetryIntervalOnPartitionError time.Duration, retryTimesOnError, partitions int, partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string { - if c.consumerRunning { - err := errors.New("Consumer already running, please configure a new consumer for concurrent processing") + if _, exists := c.consumerRunning[queue]; exists { + err := errors.New("Consumer already running, please configure a new consumer for concurrent processing or set partitions") checkError(err, "Error starting the consumer", c.log) } @@ -279,6 +300,6 @@ func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive, en } }(delivery) - c.consumerRunning = true + c.consumerRunning[queue] = true return consumerId } From 7049b108ea53dd3cc71d0be9c72c7790ca5dd584 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Sat, 20 Jan 2018 19:40:46 +0100 Subject: [PATCH 26/30] retry moved --- consumer.go | 56 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/consumer.go b/consumer.go index e2fb895..12fdb85 100644 --- a/consumer.go +++ b/consumer.go @@ -15,16 +15,18 @@ type Handler func(message interface{}) HandlerResponse type IConsumer interface { AddHandler(messageType string, concreteType reflect.Type, handler Handler) - StartConsuming(queue string, ack, activePassive, enableRetries bool, activePassiveRetryInterval, requeueTimeIntervalOnError time.Duration, - concurrentConsumers, retryTimesOnError int, args map[string]interface{}) string - StartConsumingPartitions(queue string, ack, activePassive, enableRetries bool, activePassiveRetryInterval, requeueTimeIntervalOnError, - maxWaitingTimeRetryIntervalOnPartitionError time.Duration, retryTimesOnError, partitions int, + AddRetryHandler(messageType string, concreteType reflect.Type, handler Handler, maxRetries int) + StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval, requeueTimeIntervalOnError time.Duration, + concurrentConsumers int, args map[string]interface{}) string + StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval, requeueTimeIntervalOnError, + maxWaitingTimeRetryIntervalOnPartitionError time.Duration, partitions int, partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string } type consumer struct { handlers map[string]Handler types map[string]reflect.Type + maxRetries map[string]int log *rabbitLogger channel *amqp.Channel consumerRunning map[string]bool @@ -73,7 +75,8 @@ func (r *rabbit) configureConsumer(prefetch int) IConsumer { h := make(map[string]Handler) t := make(map[string]reflect.Type) - return &consumer{h, t, r.log, channel, make(map[string]bool)} + retries := make(map[string]int) + return &consumer{h, t, retries, r.log, channel, make(map[string]bool)} } func (c *consumer) handlerExists(messageType string) bool { @@ -81,6 +84,11 @@ func (c *consumer) handlerExists(messageType string) bool { return exists } +func (c *consumer) getMaxRetries(messageType string) (bool, int) { + maxRetries, exists := c.maxRetries[messageType] + return exists, maxRetries +} + func (c *consumer) AddHandler(messageType string, concreteType reflect.Type, handler Handler) { if c.handlerExists(messageType) { err := fmt.Errorf("messageType %s already mapped", messageType) @@ -90,7 +98,18 @@ func (c *consumer) AddHandler(messageType string, concreteType reflect.Type, han c.types[messageType] = concreteType } -func (c *consumer) handle(w amqp.Delivery, message interface{}, ack, enableRetries bool, retried, maxRetry int, +//maxRetries: number of retries before discarding a message. Takes effect if enableRetries is true +func (c *consumer) AddRetryHandler(messageType string, concreteType reflect.Type, handler Handler, maxRetries int) { + if c.handlerExists(messageType) { + err := fmt.Errorf("messageType %s already mapped", messageType) + checkError(err, "", c.log) + } + c.handlers[messageType] = handler + c.types[messageType] = concreteType + c.maxRetries[messageType] = maxRetries +} + +func (c *consumer) handle(w amqp.Delivery, message interface{}, ack bool, retried int, requeueTimeMillisecondsOnError time.Duration) { if w.Redelivered { if c.log.logLevel >= Info { @@ -121,20 +140,21 @@ func (c *consumer) handle(w amqp.Delivery, message interface{}, ack, enableRetri } (&envelope{&w}).maybeRejectMessage(ack, c.log) case Err: - if enableRetries { - if retried > maxRetry-1 { + retryEnabled, maxRetries := c.getMaxRetries(w.Type) + if retryEnabled { + if retried > maxRetries-1 { if c.log.logLevel >= Warn { c.log.warn(fmt.Sprintf("MessageId=%s, CorrelationId=%s, max retry reached, rejecting message...", w.MessageId, w.CorrelationId)) } (&envelope{&w}).maybeRejectMessage(ack, c.log) } else { - time.Sleep(time.Duration(retried*200) * time.Microsecond) + time.Sleep(time.Duration(retried*100) * time.Millisecond) if c.log.logLevel >= Debug { c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, retry=%d times, retrying due to error...", w.MessageId, w.CorrelationId, retried)) } - c.handle(w, message, ack, enableRetries, retried+1, maxRetry, requeueTimeMillisecondsOnError) + c.handle(w, message, ack, retried+1, requeueTimeMillisecondsOnError) } } else { go func() { @@ -160,15 +180,13 @@ func (c *consumer) deserializeMessage(w amqp.Delivery) (interface{}, error) { //concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel //queue: queue name //ack: true enables ack -//enableReties: enable message retries in case of handler error (will preserve the order) //activePassive: enables acrive and sleepy passive consumers //activePassiveRetryInterval: time interval checking if the queue has a consumer //requeueTimeIntervalOnError: time interval requeueing a message in case of handler error (message ordering will be lost). Takes effect if enableRetries is false //concurrentConsumers: number of consumers -//retryTimesOnError: number of retries before discarding a message. Takes effect if enableRetries is true //args: consumer args -func (c *consumer) StartConsuming(queue string, ack, activePassive, enableRetries bool, activePassiveRetryInterval, requeueTimeIntervalOnError time.Duration, - concurrentConsumers, retryTimesOnError int, args map[string]interface{}) string { +func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval, requeueTimeIntervalOnError time.Duration, + concurrentConsumers int, args map[string]interface{}) string { if _, exists := c.consumerRunning[queue]; exists { err := errors.New("Consumer already running, please configure a new consumer for concurrent processing or set the concurrentConsumers") checkError(err, "Error starting the consumer", c.log) @@ -217,7 +235,7 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive, enableRetrie continue } - c.handle(w, message, ack, enableRetries, 0, retryTimesOnError, requeueTimeIntervalOnError) + c.handle(w, message, ack, 0, requeueTimeIntervalOnError) } }(delivery) } @@ -230,19 +248,17 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive, enableRetrie //concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel //queue: queue name //ack: true enables ack -//enableReties: enable message retries in case of handler error (will preserve the order) //activePassive: enables acrive and sleepy passive consumers //activePassiveRetryInterval: time interval checking if the queue has a consumer //requeueTimeIntervalOnError: time interval requeueing a message in case of handler error (message ordering will be lost). Takes effect if enableRetries is false //maxWaitingTimeRetryIntervalOnPartitionError: Sleep time between retries in case of handler error //concurrentConsumers: number of consumers -//retryTimesOnError: number of retries before discarding a message. Takes effect if enableRetries is true //partitions: number of consurrent/consistent partitions //partitionResolver: map[reflect.Type]func(message interface{}) int64, for each message type specify a function that will return the key used to partition //args: consumer args -func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive, enableRetries bool, activePassiveRetryInterval, +func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval, requeueTimeIntervalOnError, maxWaitingTimeRetryIntervalOnPartitionError time.Duration, - retryTimesOnError, partitions int, partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string { + partitions int, partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string { if _, exists := c.consumerRunning[queue]; exists { err := errors.New("Consumer already running, please configure a new consumer for concurrent processing or set partitions") checkError(err, "Error starting the consumer", c.log) @@ -294,7 +310,7 @@ func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive, en cw := w part.HandleInSequence(func(done chan bool) { - c.handle(cw, message, ack, enableRetries, 0, retryTimesOnError, requeueTimeIntervalOnError) + c.handle(cw, message, ack, 0, requeueTimeIntervalOnError) done <- true }, partition{message, partitionResolver}) } From 432e0ff3f236b063ee0531b413b2d3c9917950fd Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Sat, 20 Jan 2018 19:53:07 +0100 Subject: [PATCH 27/30] waiting error moved --- consumer.go | 39 +++++++++++++++++++-------------------- init.go | 9 +++++++-- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/consumer.go b/consumer.go index 12fdb85..db8e992 100644 --- a/consumer.go +++ b/consumer.go @@ -16,20 +16,21 @@ type Handler func(message interface{}) HandlerResponse type IConsumer interface { AddHandler(messageType string, concreteType reflect.Type, handler Handler) AddRetryHandler(messageType string, concreteType reflect.Type, handler Handler, maxRetries int) - StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval, requeueTimeIntervalOnError time.Duration, + StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, concurrentConsumers int, args map[string]interface{}) string - StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval, requeueTimeIntervalOnError, + StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval, maxWaitingTimeRetryIntervalOnPartitionError time.Duration, partitions int, partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string } type consumer struct { - handlers map[string]Handler - types map[string]reflect.Type - maxRetries map[string]int - log *rabbitLogger - channel *amqp.Channel - consumerRunning map[string]bool + handlers map[string]Handler + types map[string]reflect.Type + maxRetries map[string]int + log *rabbitLogger + channel *amqp.Channel + consumerRunning map[string]bool + requeueWaitingTimeOnError time.Duration } var roundrobin int64 @@ -48,7 +49,8 @@ func (p partition) GetPartition() int64 { return atomic.AddInt64(&roundrobin, 1) } -func (r *rabbit) configureConsumer(prefetch int) IConsumer { +//requeueWaitingTimeOnError: time interval requeueing a message in case of handler error (message ordering will be lost). Takes effect if enableRetries is false +func (r *rabbit) configureConsumer(prefetch int, requeueWaitingTimeOnError time.Duration) IConsumer { channel, err := r.connection.Channel() checkError(err, "Error creating the producing channel", r.log) @@ -76,7 +78,7 @@ func (r *rabbit) configureConsumer(prefetch int) IConsumer { h := make(map[string]Handler) t := make(map[string]reflect.Type) retries := make(map[string]int) - return &consumer{h, t, retries, r.log, channel, make(map[string]bool)} + return &consumer{h, t, retries, r.log, channel, make(map[string]bool), requeueWaitingTimeOnError} } func (c *consumer) handlerExists(messageType string) bool { @@ -109,8 +111,7 @@ func (c *consumer) AddRetryHandler(messageType string, concreteType reflect.Type c.maxRetries[messageType] = maxRetries } -func (c *consumer) handle(w amqp.Delivery, message interface{}, ack bool, retried int, - requeueTimeMillisecondsOnError time.Duration) { +func (c *consumer) handle(w amqp.Delivery, message interface{}, ack bool, retried int) { if w.Redelivered { if c.log.logLevel >= Info { c.log.info(fmt.Sprintf("MessageID=%s, CorrelationId=%s, has been redelivered", @@ -154,11 +155,11 @@ func (c *consumer) handle(w amqp.Delivery, message interface{}, ack bool, retrie c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, retry=%d times, retrying due to error...", w.MessageId, w.CorrelationId, retried)) } - c.handle(w, message, ack, retried+1, requeueTimeMillisecondsOnError) + c.handle(w, message, ack, retried+1) } } else { go func() { - time.Sleep(requeueTimeMillisecondsOnError) + time.Sleep(c.requeueWaitingTimeOnError) (&envelope{&w}).maybeRequeueMessage(ack, c.log) }() } @@ -182,10 +183,9 @@ func (c *consumer) deserializeMessage(w amqp.Delivery) (interface{}, error) { //ack: true enables ack //activePassive: enables acrive and sleepy passive consumers //activePassiveRetryInterval: time interval checking if the queue has a consumer -//requeueTimeIntervalOnError: time interval requeueing a message in case of handler error (message ordering will be lost). Takes effect if enableRetries is false //concurrentConsumers: number of consumers //args: consumer args -func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval, requeueTimeIntervalOnError time.Duration, +func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, concurrentConsumers int, args map[string]interface{}) string { if _, exists := c.consumerRunning[queue]; exists { err := errors.New("Consumer already running, please configure a new consumer for concurrent processing or set the concurrentConsumers") @@ -235,7 +235,7 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP continue } - c.handle(w, message, ack, 0, requeueTimeIntervalOnError) + c.handle(w, message, ack, 0) } }(delivery) } @@ -250,14 +250,13 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP //ack: true enables ack //activePassive: enables acrive and sleepy passive consumers //activePassiveRetryInterval: time interval checking if the queue has a consumer -//requeueTimeIntervalOnError: time interval requeueing a message in case of handler error (message ordering will be lost). Takes effect if enableRetries is false //maxWaitingTimeRetryIntervalOnPartitionError: Sleep time between retries in case of handler error //concurrentConsumers: number of consumers //partitions: number of consurrent/consistent partitions //partitionResolver: map[reflect.Type]func(message interface{}) int64, for each message type specify a function that will return the key used to partition //args: consumer args func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval, - requeueTimeIntervalOnError, maxWaitingTimeRetryIntervalOnPartitionError time.Duration, + maxWaitingTimeRetryIntervalOnPartitionError time.Duration, partitions int, partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string { if _, exists := c.consumerRunning[queue]; exists { err := errors.New("Consumer already running, please configure a new consumer for concurrent processing or set partitions") @@ -310,7 +309,7 @@ func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive boo cw := w part.HandleInSequence(func(done chan bool) { - c.handle(cw, message, ack, 0, requeueTimeIntervalOnError) + c.handle(cw, message, ack, 0) done <- true }, partition{message, partitionResolver}) } diff --git a/init.go b/init.go index 1338919..1a83576 100644 --- a/init.go +++ b/init.go @@ -1,5 +1,9 @@ package rabbit +import ( + "time" +) + var _r rabbit func Initialize(log *rabbitLogger, endpoint string) { @@ -14,6 +18,7 @@ func ConfigureProducer(numberOfProducers int, exchangeName string, deliveryMode return _r.configureProducer(numberOfProducers, exchangeName, deliveryMode, confirmPublish) } -func ConfigureConsumer(preferch int) IConsumer { - return _r.configureConsumer(preferch) +//requeueWaitingTimeOnError: time interval requeueing a message in case of handler error (message ordering will be lost). Takes effect if enableRetries is false +func ConfigureConsumer(preferch int, requeueWaitingTimeOnError time.Duration) IConsumer { + return _r.configureConsumer(preferch, requeueWaitingTimeOnError) } From 949fe272588b08e3b9fce1eb8d5641ba0f237d87 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Sun, 21 Jan 2018 01:57:25 +0100 Subject: [PATCH 28/30] protobuf support --- consumer.go | 12 ++++++++++++ producer.go | 3 ++- serializer.go | 16 ++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index db8e992..cfdc7f5 100644 --- a/consumer.go +++ b/consumer.go @@ -86,6 +86,14 @@ func (c *consumer) handlerExists(messageType string) bool { return exists } +func (c *consumer) GetHandlerKeys() []string { + keys := make([]string, 0, len(c.handlers)) + for k := range c.handlers { + keys = append(keys, k) + } + return keys +} + func (c *consumer) getMaxRetries(messageType string) (bool, int) { maxRetries, exists := c.maxRetries[messageType] return exists, maxRetries @@ -225,6 +233,8 @@ func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activeP go func(work <-chan amqp.Delivery) { for w := range work { if !c.handlerExists(w.Type) { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, no handler registered for %s, registeredTypes=%v", + w.MessageId, w.CorrelationId, w.Type, c.GetHandlerKeys())) (&envelope{&w}).maybeAckMessage(ack, c.log) continue } @@ -297,6 +307,8 @@ func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive boo go func(work <-chan amqp.Delivery) { for w := range work { if !c.handlerExists(w.Type) { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, no handler registered for %s, registeredTypes=%v", + w.MessageId, w.CorrelationId, w.Type, c.GetHandlerKeys())) (&envelope{&w}).maybeAckMessage(ack, c.log) continue } diff --git a/producer.go b/producer.go index 952c41f..b6c92ae 100644 --- a/producer.go +++ b/producer.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "reflect" + "strings" "sync/atomic" "time" @@ -100,7 +101,7 @@ func send(s *sendMessage, logLevel LogLevel) { checkError(err, "json serializer error", s.producer.log) mt := pattern.ResultMatch(s.messageType). - WhenValue("", func() interface{} { return reflect.TypeOf(s.message).String() }). + WhenValue("", func() interface{} { return strings.Replace(reflect.TypeOf(s.message).String(), "*", "", 1) }). ResultOrDefault(s.messageType).(string) if logLevel >= Debug { diff --git a/serializer.go b/serializer.go index 132bb78..8cc285c 100644 --- a/serializer.go +++ b/serializer.go @@ -3,6 +3,8 @@ package rabbit import ( "encoding/json" "reflect" + + "github.com/golang/protobuf/proto" ) func serialize(message interface{}, contentType ContentType) ([]byte, error) { @@ -14,6 +16,8 @@ func serialize(message interface{}, contentType ContentType) ([]byte, error) { switch contentType { case Json: return serializeJson() + case Protobuf: + return proto.Marshal(message.(proto.Message)) default: //use json return serializeJson() } @@ -30,9 +34,21 @@ func deserialize(message []byte, contentType ContentType, concreteType reflect.T return noPointer, nil } + deserializeProtobuf := func() (interface{}, error) { + pointer := reflect.New(concreteType).Interface().(proto.Message) + err := proto.Unmarshal(message, pointer) + if err != nil { + return nil, err + } + noPointer := reflect.Indirect(reflect.ValueOf(pointer)).Interface() + return noPointer, nil + } + switch contentType { case Json: return deserializeJson() + case Protobuf: + return deserializeProtobuf() default: return deserializeJson() } From 8890afdc356d867cd966329f766c4dbfb2de70d2 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Sun, 21 Jan 2018 21:49:46 +0100 Subject: [PATCH 29/30] inject header in channel --- consumer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consumer.go b/consumer.go index cfdc7f5..606c696 100644 --- a/consumer.go +++ b/consumer.go @@ -11,7 +11,7 @@ import ( "github.com/streadway/amqp" ) -type Handler func(message interface{}) HandlerResponse +type Handler func(message interface{}, header map[string]interface{}) HandlerResponse type IConsumer interface { AddHandler(messageType string, concreteType reflect.Type, handler Handler) @@ -128,7 +128,7 @@ func (c *consumer) handle(w amqp.Delivery, message interface{}, ack bool, retrie } handler := c.handlers[w.Type] - response := handler(message) + response := handler(message, w.Headers) switch response { case Completed: if c.log.logLevel >= Debug { From 7c7f5226eee813fc3adee0691a2768913af055db Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Fri, 2 Feb 2018 22:16:37 +0100 Subject: [PATCH 30/30] readme --- README.md | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..fe29b63 --- /dev/null +++ b/README.md @@ -0,0 +1,133 @@ +# Rebbit + +[![Build Status](https://travis-ci.org/jd78/rabbit.svg?branch=master)](https://travis-ci.org/jd78/rabbit) + +Rabbit is built on top of streadway/amqp RabbitMQ client (https://github.com/streadway/amqp) and brings the following functionalities: + +- Can produce and consume messages in jSON and Protobuf. +- Can produce and consume concurrently, specifying the wanted number. The producer is concurrent round robin, the consumer is a concurrent multichannel. +- Partition consumer, can consume messages in sequences while concurrent. Assigning a partition id to a message, this will be consumed by the same channel, in order. +- Dynamic handlers. +- Message retries. +- Can discard or requeue a message just returning the wanted behaviour from the handler. +- Active and sleepy passive consumer. +- Topology configurator. +- Injection logger. + +#### Logger injection and rabbit initializer + +```go + +import "github.com/jd78/rabbit" + +logger := rabbit.CreateLogger( + func(l string) { log.Println(l) }, //debug + func(l string) { log.Println(l) }, //info + func(l string) { log.Println(l) }, //warn + func(l string) { log.Fatal(l) }, //error + func(l string) { log.Println(l) }, //fata; + rabbit.Debug //logLevel + ) + +rabbit.Initialize(logger, "amqp://username:password@localhost:5672/") +``` + +#### Topology configuration + +It's possible to configure the topology, creating exchnges, queues and bindings. + +```go + +rabbit.TopologyConfiguration(). + DeclareExchange("test.output", "topic", true, false, false, nil). + DeclareExchange("consumer.output", "fanout", true, false, false, nil). + DeclareQueue("test.inbound", true, false, false, nil). + BindQueue("test.inbound", "#", "test.output", nil). + DeclareQueue("consumer.inbound", true, false, false, map[string]interface{}{ + "x-message-ttl": int32(600000), + }). + BindQueue("consumer.inbound", "", "consumer.output", nil). + Complete() +``` + +#### Producer + +It's necessary to configure a producer for each output exchange you want to send messages to. +Each producer has at least one rabbit channel. You can specify how many concurrent and round robin producers you want, keeping in mind that each producer specified in the parameter "numberOfProducers" will create it's own rabbit channel. This has been done to ensure that the publish confirmation works correctly. Two messages cannot be sent at the same time for the same channel. Sent are queued in a round robin way. + +```go + +//params: number of producers, output exchange, publish type (Transient or Persistent), confirm publish +testOutputProducer := rabbit.ConfigureProducer(1, "test.output", rabbit.Transient, true) +consumerProducer := rabbit.ConfigureProducer(3, "consumer.output", rabbit.Transient, true) + +``` + +### Message types and message handlers + +Supposing we have the following message type we want to publish and consume + +```go + +type TestMessage struct { + Id int + Name string +} + +``` + +Firstly we want to create a handler for this message + +```go + +func TestMessageHandler(test string, producer rabbit.IProducer) func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse { + return func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse { + testMessage := message.(TestMessage) + log.Println("executing testMessage, test: " + test) + log.Println(testMessage) + log.Println(fmt.Sprintf("header: %v", header)) + time.Sleep(500 * time.Millisecond) + err := producer.Send(message, "", "test", "correlation", "", nil, rabbit.Json) + if err != nil { + log.Println("Error sending the message") + return rabbit.Err + } + return rabbit.Completed + } +} + +``` + +A message handler can have parameters or not. You might want to inject your repository, a rabbit producer, etc. +A handler always return the function func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse, where message and handler are respectively the message and the handler that will be injected by the rabbit library; and rabbit.HandlerResponse is the return status of the handler that will be read by the rabbit library to perfom specific operations like ack, reqeueing, etc. + +rabbit.HandlerResponse can be: + - rabbit.Completed, if all good, the message will be eventually acked + - rabbit.Reject, if you want to reject the message + - rabbit.Requeue, if you with to requeue the message and re-consume it again + - rabbit.Err, an error occurred. The message will be requeued or retried N time based on the client configuration that we are going to discuss later. + + In the previous example, we are just handling the TestMessage writing some log and publishing the same message again. + + ### Consumer + + ```go +//params: prefetch, requeue waiting time on error +consumer := rabbit.ConfigureConsumer(100, 5*time.Second) + + ``` + + We can specify a prefetch and the requeue waiting time on error. The second parameter is the waiting time before the message gets requeued when you return rabbit.Err from a handler. + + Below is how you have to register your handlers + + ```go + +h := TestMessageHandler("teststring", producer) //I'm passing a string and the producer dependency +consumer.AddHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h) + + ``` + + main.TestMessage is the type that will be send as envelope type, and the reflected type as second parameter is the is the contract representing the message. Finally, as third parameter, we pass the handler that will handle the message. + + \ No newline at end of file