From e05ad99045ba93171bd947a2e7b829ef724b1040 Mon Sep 17 00:00:00 2001 From: Gennaro Del Sorbo Date: Mon, 12 Feb 2018 22:23:43 +0100 Subject: [PATCH] Rabbit (#1) * starting * topology * topology * producer * producer * producer * messagerype * confirms * producer make it better * bugfix + consumer * temp consumer * working * deserializer * error handling * bugfix + correlation * make log better * default serializer * echange bind * active passive * make it better * return response * partitioner * bugfix * error handling * minor * retry moved * waiting error moved * protobuf support * inject header in channel * readme --- README.md | 133 +++++++++++++++++ common.go | 39 +++++ connectionManager.go | 37 +++++ consumer.go | 332 +++++++++++++++++++++++++++++++++++++++++++ deliveryEx.go | 32 +++++ executeOnce.go | 12 ++ init.go | 24 ++++ producer.go | 167 ++++++++++++++++++++++ serializer.go | 55 +++++++ topology.go | 46 ++++++ types.go | 49 +++++++ 11 files changed, 926 insertions(+) create mode 100644 README.md create mode 100644 common.go create mode 100644 connectionManager.go create mode 100644 consumer.go create mode 100644 deliveryEx.go create mode 100644 executeOnce.go create mode 100644 init.go create mode 100644 producer.go create mode 100644 serializer.go create mode 100644 topology.go create mode 100644 types.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..fe29b63 --- /dev/null +++ b/README.md @@ -0,0 +1,133 @@ +# Rebbit + +[![Build Status](https://travis-ci.org/jd78/rabbit.svg?branch=master)](https://travis-ci.org/jd78/rabbit) + +Rabbit is built on top of streadway/amqp RabbitMQ client (https://github.com/streadway/amqp) and brings the following functionalities: + +- Can produce and consume messages in jSON and Protobuf. +- Can produce and consume concurrently, specifying the wanted number. The producer is concurrent round robin, the consumer is a concurrent multichannel. +- Partition consumer, can consume messages in sequences while concurrent. Assigning a partition id to a message, this will be consumed by the same channel, in order. +- Dynamic handlers. +- Message retries. +- Can discard or requeue a message just returning the wanted behaviour from the handler. +- Active and sleepy passive consumer. +- Topology configurator. +- Injection logger. + +#### Logger injection and rabbit initializer + +```go + +import "github.com/jd78/rabbit" + +logger := rabbit.CreateLogger( + func(l string) { log.Println(l) }, //debug + func(l string) { log.Println(l) }, //info + func(l string) { log.Println(l) }, //warn + func(l string) { log.Fatal(l) }, //error + func(l string) { log.Println(l) }, //fata; + rabbit.Debug //logLevel + ) + +rabbit.Initialize(logger, "amqp://username:password@localhost:5672/") +``` + +#### Topology configuration + +It's possible to configure the topology, creating exchnges, queues and bindings. + +```go + +rabbit.TopologyConfiguration(). + DeclareExchange("test.output", "topic", true, false, false, nil). + DeclareExchange("consumer.output", "fanout", true, false, false, nil). + DeclareQueue("test.inbound", true, false, false, nil). + BindQueue("test.inbound", "#", "test.output", nil). + DeclareQueue("consumer.inbound", true, false, false, map[string]interface{}{ + "x-message-ttl": int32(600000), + }). + BindQueue("consumer.inbound", "", "consumer.output", nil). + Complete() +``` + +#### Producer + +It's necessary to configure a producer for each output exchange you want to send messages to. +Each producer has at least one rabbit channel. You can specify how many concurrent and round robin producers you want, keeping in mind that each producer specified in the parameter "numberOfProducers" will create it's own rabbit channel. This has been done to ensure that the publish confirmation works correctly. Two messages cannot be sent at the same time for the same channel. Sent are queued in a round robin way. + +```go + +//params: number of producers, output exchange, publish type (Transient or Persistent), confirm publish +testOutputProducer := rabbit.ConfigureProducer(1, "test.output", rabbit.Transient, true) +consumerProducer := rabbit.ConfigureProducer(3, "consumer.output", rabbit.Transient, true) + +``` + +### Message types and message handlers + +Supposing we have the following message type we want to publish and consume + +```go + +type TestMessage struct { + Id int + Name string +} + +``` + +Firstly we want to create a handler for this message + +```go + +func TestMessageHandler(test string, producer rabbit.IProducer) func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse { + return func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse { + testMessage := message.(TestMessage) + log.Println("executing testMessage, test: " + test) + log.Println(testMessage) + log.Println(fmt.Sprintf("header: %v", header)) + time.Sleep(500 * time.Millisecond) + err := producer.Send(message, "", "test", "correlation", "", nil, rabbit.Json) + if err != nil { + log.Println("Error sending the message") + return rabbit.Err + } + return rabbit.Completed + } +} + +``` + +A message handler can have parameters or not. You might want to inject your repository, a rabbit producer, etc. +A handler always return the function func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse, where message and handler are respectively the message and the handler that will be injected by the rabbit library; and rabbit.HandlerResponse is the return status of the handler that will be read by the rabbit library to perfom specific operations like ack, reqeueing, etc. + +rabbit.HandlerResponse can be: + - rabbit.Completed, if all good, the message will be eventually acked + - rabbit.Reject, if you want to reject the message + - rabbit.Requeue, if you with to requeue the message and re-consume it again + - rabbit.Err, an error occurred. The message will be requeued or retried N time based on the client configuration that we are going to discuss later. + + In the previous example, we are just handling the TestMessage writing some log and publishing the same message again. + + ### Consumer + + ```go +//params: prefetch, requeue waiting time on error +consumer := rabbit.ConfigureConsumer(100, 5*time.Second) + + ``` + + We can specify a prefetch and the requeue waiting time on error. The second parameter is the waiting time before the message gets requeued when you return rabbit.Err from a handler. + + Below is how you have to register your handlers + + ```go + +h := TestMessageHandler("teststring", producer) //I'm passing a string and the producer dependency +consumer.AddHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h) + + ``` + + main.TestMessage is the type that will be send as envelope type, and the reflected type as second parameter is the is the contract representing the message. Finally, as third parameter, we pass the handler that will handle the message. + + \ No newline at end of file diff --git a/common.go b/common.go new file mode 100644 index 0000000..fb292e6 --- /dev/null +++ b/common.go @@ -0,0 +1,39 @@ +package rabbit + +import ( + "crypto/rand" + "encoding/hex" + "fmt" +) + +type Queue struct { + name string + durable bool + autoDelete bool + exclusive bool + args map[string]interface{} +} + +var Queues map[string]Queue + +func checkError(err error, additionalData string, lg *rabbitLogger) { + if err != nil { + l := fmt.Sprintf("%s: %s", additionalData, err.Error()) + lg.fatal(l) + panic(l) + } +} + +func checkErrorLight(err error, additionalData string, lg *rabbitLogger) { + if err != nil { + if lg.logLevel >= Warn { + lg.warn(fmt.Sprintf("%s: %s", additionalData, err.Error())) + } + } +} + +func getUniqueId() string { + b := make([]byte, 4) + rand.Read(b) + return hex.EncodeToString(b) +} diff --git a/connectionManager.go b/connectionManager.go new file mode 100644 index 0000000..61c3cb6 --- /dev/null +++ b/connectionManager.go @@ -0,0 +1,37 @@ +package rabbit + +import "github.com/streadway/amqp" +import "fmt" + +type rabbit struct { + connection *amqp.Connection + log *rabbitLogger +} + +func initialize(endpoint string, log *rabbitLogger) rabbit { + conn, err := amqp.Dial(endpoint) + checkError(err, "error during connection", log) + go func() { + ch := make(chan *amqp.Error) + conn.NotifyClose(ch) + err := <-ch + checkError(err, "Connection lost!", log) + }() + + go func() { + ch := make(chan amqp.Blocking) + conn.NotifyBlocked(ch) + for { + status := <-ch + if log.logLevel >= Warn { + log.warn(fmt.Sprintf("connection blocked detected - block enabled: %t, reason: %s", status.Active, status.Reason)) + } + } + }() + + return rabbit{conn, log} +} + +func (r *rabbit) close() { + r.connection.Close() +} diff --git a/consumer.go b/consumer.go new file mode 100644 index 0000000..606c696 --- /dev/null +++ b/consumer.go @@ -0,0 +1,332 @@ +package rabbit + +import ( + "errors" + "fmt" + "reflect" + "sync/atomic" + "time" + + "github.com/jd78/partitioner" + "github.com/streadway/amqp" +) + +type Handler func(message interface{}, header map[string]interface{}) HandlerResponse + +type IConsumer interface { + AddHandler(messageType string, concreteType reflect.Type, handler Handler) + AddRetryHandler(messageType string, concreteType reflect.Type, handler Handler, maxRetries int) + StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, + concurrentConsumers int, args map[string]interface{}) string + StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval, + maxWaitingTimeRetryIntervalOnPartitionError time.Duration, partitions int, + partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string +} + +type consumer struct { + handlers map[string]Handler + types map[string]reflect.Type + maxRetries map[string]int + log *rabbitLogger + channel *amqp.Channel + consumerRunning map[string]bool + requeueWaitingTimeOnError time.Duration +} + +var roundrobin int64 + +type partition struct { + message interface{} + partitionResolver map[reflect.Type]func(message interface{}) int64 +} + +func (p partition) GetPartition() int64 { + t := reflect.TypeOf(p.message) + if _, exists := p.partitionResolver[t]; exists { + return p.partitionResolver[t](p.message) + } + + return atomic.AddInt64(&roundrobin, 1) +} + +//requeueWaitingTimeOnError: time interval requeueing a message in case of handler error (message ordering will be lost). Takes effect if enableRetries is false +func (r *rabbit) configureConsumer(prefetch int, requeueWaitingTimeOnError time.Duration) IConsumer { + channel, err := r.connection.Channel() + checkError(err, "Error creating the producing channel", r.log) + + qErr := channel.Qos(prefetch, 0, false) + checkError(qErr, "Error assigning prefetch on the channel", r.log) + + go func() { + ch := make(chan *amqp.Error) + channel.NotifyClose(ch) + err := <-ch + checkError(err, "Channel closed", r.log) + }() + + go func() { + ch := make(chan bool) + channel.NotifyFlow(ch) + for { + status := <-ch + if r.log.logLevel >= Warn { + r.log.warn(fmt.Sprintf("channel flow detected - flow enabled: %t", status)) + } + } + }() + + h := make(map[string]Handler) + t := make(map[string]reflect.Type) + retries := make(map[string]int) + return &consumer{h, t, retries, r.log, channel, make(map[string]bool), requeueWaitingTimeOnError} +} + +func (c *consumer) handlerExists(messageType string) bool { + _, exists := c.handlers[messageType] + return exists +} + +func (c *consumer) GetHandlerKeys() []string { + keys := make([]string, 0, len(c.handlers)) + for k := range c.handlers { + keys = append(keys, k) + } + return keys +} + +func (c *consumer) getMaxRetries(messageType string) (bool, int) { + maxRetries, exists := c.maxRetries[messageType] + return exists, maxRetries +} + +func (c *consumer) AddHandler(messageType string, concreteType reflect.Type, handler Handler) { + if c.handlerExists(messageType) { + err := fmt.Errorf("messageType %s already mapped", messageType) + checkError(err, "", c.log) + } + c.handlers[messageType] = handler + c.types[messageType] = concreteType +} + +//maxRetries: number of retries before discarding a message. Takes effect if enableRetries is true +func (c *consumer) AddRetryHandler(messageType string, concreteType reflect.Type, handler Handler, maxRetries int) { + if c.handlerExists(messageType) { + err := fmt.Errorf("messageType %s already mapped", messageType) + checkError(err, "", c.log) + } + c.handlers[messageType] = handler + c.types[messageType] = concreteType + c.maxRetries[messageType] = maxRetries +} + +func (c *consumer) handle(w amqp.Delivery, message interface{}, ack bool, retried int) { + if w.Redelivered { + if c.log.logLevel >= Info { + c.log.info(fmt.Sprintf("MessageID=%s, CorrelationId=%s, has been redelivered", + w.MessageId, w.CorrelationId)) + } + } + handler := c.handlers[w.Type] + + response := handler(message, w.Headers) + switch response { + case Completed: + if c.log.logLevel >= Debug { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, completed.", + w.MessageId, w.CorrelationId)) + } + (&envelope{&w}).maybeAckMessage(ack, c.log) + case Requeue: + if c.log.logLevel >= Debug { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, requeueing message...", + w.MessageId, w.CorrelationId)) + } + (&envelope{&w}).maybeRequeueMessage(ack, c.log) + case Reject: + if c.log.logLevel >= Info { + c.log.info(fmt.Sprintf("MessageId=%s, CorrelationId=%s, rejecting message...", + w.MessageId, w.CorrelationId)) + } + (&envelope{&w}).maybeRejectMessage(ack, c.log) + case Err: + retryEnabled, maxRetries := c.getMaxRetries(w.Type) + if retryEnabled { + if retried > maxRetries-1 { + if c.log.logLevel >= Warn { + c.log.warn(fmt.Sprintf("MessageId=%s, CorrelationId=%s, max retry reached, rejecting message...", + w.MessageId, w.CorrelationId)) + } + (&envelope{&w}).maybeRejectMessage(ack, c.log) + } else { + time.Sleep(time.Duration(retried*100) * time.Millisecond) + if c.log.logLevel >= Debug { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, retry=%d times, retrying due to error...", + w.MessageId, w.CorrelationId, retried)) + } + c.handle(w, message, ack, retried+1) + } + } else { + go func() { + time.Sleep(c.requeueWaitingTimeOnError) + (&envelope{&w}).maybeRequeueMessage(ack, c.log) + }() + } + } +} + +func (c *consumer) deserializeMessage(w amqp.Delivery) (interface{}, error) { + obj, err := deserialize(w.Body, ContentType(w.ContentType), c.types[w.Type]) + if err != nil { + if c.log.logLevel >= Error { + c.log.err(fmt.Sprintf("MessageID=%s, CorrelationId=%s, could not deserialize the message, requeueing...", + w.MessageId, w.CorrelationId)) + } + } + return obj, err +} + +//StartConsuming will start a new consumer +//concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel +//queue: queue name +//ack: true enables ack +//activePassive: enables acrive and sleepy passive consumers +//activePassiveRetryInterval: time interval checking if the queue has a consumer +//concurrentConsumers: number of consumers +//args: consumer args +func (c *consumer) StartConsuming(queue string, ack, activePassive bool, activePassiveRetryInterval time.Duration, + concurrentConsumers int, args map[string]interface{}) string { + if _, exists := c.consumerRunning[queue]; exists { + err := errors.New("Consumer already running, please configure a new consumer for concurrent processing or set the concurrentConsumers") + checkError(err, "Error starting the consumer", c.log) + } + + if activePassive { + logOnce := executeOnce{} + for { + qInfo := Queues[queue] + q, err := c.channel.QueueDeclarePassive(qInfo.name, qInfo.durable, qInfo.autoDelete, qInfo.exclusive, + false, qInfo.args) + checkError(err, "Error declaring queue passive", c.log) + if q.Consumers == 0 { + break + } else { + if c.log.logLevel >= Info { + logOnce.MaybeExecute(func() { c.log.info(fmt.Sprintf("Consumer passive on queue %s", queue)) }) + } + time.Sleep(activePassiveRetryInterval) + } + } + } + + consumerId := getUniqueId() + + delivery, err := c.channel.Consume(queue, consumerId, !ack, activePassive, false, false, args) + checkError(err, "Error starting the consumer", c.log) + + if activePassive { + if c.log.logLevel >= Info { + c.log.info(fmt.Sprintf("Consumer active on queue %s", queue)) + } + } + + for i := 0; i < concurrentConsumers; i++ { + go func(work <-chan amqp.Delivery) { + for w := range work { + if !c.handlerExists(w.Type) { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, no handler registered for %s, registeredTypes=%v", + w.MessageId, w.CorrelationId, w.Type, c.GetHandlerKeys())) + (&envelope{&w}).maybeAckMessage(ack, c.log) + continue + } + + message, err := c.deserializeMessage(w) + if err != nil { + (&envelope{&w}).maybeRequeueMessage(ack, c.log) + continue + } + + c.handle(w, message, ack, 0) + } + }(delivery) + } + + c.consumerRunning[queue] = true + return consumerId +} + +//StartConsuming will start a new consumer +//concurrentConsumers will create concurrent go routines that will read from the delivery rabbit channel +//queue: queue name +//ack: true enables ack +//activePassive: enables acrive and sleepy passive consumers +//activePassiveRetryInterval: time interval checking if the queue has a consumer +//maxWaitingTimeRetryIntervalOnPartitionError: Sleep time between retries in case of handler error +//concurrentConsumers: number of consumers +//partitions: number of consurrent/consistent partitions +//partitionResolver: map[reflect.Type]func(message interface{}) int64, for each message type specify a function that will return the key used to partition +//args: consumer args +func (c *consumer) StartConsumingPartitions(queue string, ack, activePassive bool, activePassiveRetryInterval, + maxWaitingTimeRetryIntervalOnPartitionError time.Duration, + partitions int, partitionResolver map[reflect.Type]func(message interface{}) int64, args map[string]interface{}) string { + if _, exists := c.consumerRunning[queue]; exists { + err := errors.New("Consumer already running, please configure a new consumer for concurrent processing or set partitions") + checkError(err, "Error starting the consumer", c.log) + } + + if activePassive { + logOnce := executeOnce{} + for { + qInfo := Queues[queue] + q, err := c.channel.QueueDeclarePassive(qInfo.name, qInfo.durable, qInfo.autoDelete, qInfo.exclusive, + false, qInfo.args) + checkError(err, "Error declaring queue passive", c.log) + if q.Consumers == 0 { + break + } else { + if c.log.logLevel >= Info { + logOnce.MaybeExecute(func() { c.log.info(fmt.Sprintf("Consumer passive on queue %s", queue)) }) + } + time.Sleep(activePassiveRetryInterval) + } + } + } + + consumerId := getUniqueId() + + delivery, err := c.channel.Consume(queue, consumerId, !ack, activePassive, false, false, args) + checkError(err, "Error starting the consumer", c.log) + + if activePassive { + if c.log.logLevel >= Info { + c.log.info(fmt.Sprintf("Consumer active on queue %s", queue)) + } + } + + part := partitioner.CreatePartitioner(partitions, maxWaitingTimeRetryIntervalOnPartitionError) + + go func(work <-chan amqp.Delivery) { + for w := range work { + if !c.handlerExists(w.Type) { + c.log.debug(fmt.Sprintf("MessageId=%s, CorrelationId=%s, no handler registered for %s, registeredTypes=%v", + w.MessageId, w.CorrelationId, w.Type, c.GetHandlerKeys())) + (&envelope{&w}).maybeAckMessage(ack, c.log) + continue + } + + message, err := c.deserializeMessage(w) + if err != nil { + (&envelope{&w}).maybeRequeueMessage(ack, c.log) + continue + } + + cw := w + part.HandleInSequence(func(done chan bool) { + c.handle(cw, message, ack, 0) + done <- true + }, partition{message, partitionResolver}) + } + }(delivery) + + c.consumerRunning[queue] = true + return consumerId +} diff --git a/deliveryEx.go b/deliveryEx.go new file mode 100644 index 0000000..c5a30b8 --- /dev/null +++ b/deliveryEx.go @@ -0,0 +1,32 @@ +package rabbit + +import ( + "fmt" + + "github.com/streadway/amqp" +) + +type envelope struct { + *amqp.Delivery +} + +func (m *envelope) maybeAckMessage(ack bool, log *rabbitLogger) { + if ack { + err := m.Ack(false) + checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not ack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), log) + } +} + +func (m *envelope) maybeRequeueMessage(ack bool, log *rabbitLogger) { + if ack { + err := m.Nack(false, true) + checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not nack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), log) + } +} + +func (m *envelope) maybeRejectMessage(ack bool, log *rabbitLogger) { + if ack { + err := m.Nack(false, false) + checkErrorLight(err, fmt.Sprintf("MessageId=%s, CorrelationId=%s, could not nack the message, it will be eventually requeued", m.MessageId, m.CorrelationId), log) + } +} diff --git a/executeOnce.go b/executeOnce.go new file mode 100644 index 0000000..50c5501 --- /dev/null +++ b/executeOnce.go @@ -0,0 +1,12 @@ +package rabbit + +type executeOnce struct { + executed bool +} + +func (e *executeOnce) MaybeExecute(f func()) { + if !e.executed { + f() + e.executed = true + } +} diff --git a/init.go b/init.go new file mode 100644 index 0000000..1a83576 --- /dev/null +++ b/init.go @@ -0,0 +1,24 @@ +package rabbit + +import ( + "time" +) + +var _r rabbit + +func Initialize(log *rabbitLogger, endpoint string) { + _r = initialize(endpoint, log) +} + +func TopologyConfiguration() *topology { + return _r.topologyConfiguration() +} + +func ConfigureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode, confirmPublish bool) IProducer { + return _r.configureProducer(numberOfProducers, exchangeName, deliveryMode, confirmPublish) +} + +//requeueWaitingTimeOnError: time interval requeueing a message in case of handler error (message ordering will be lost). Takes effect if enableRetries is false +func ConfigureConsumer(preferch int, requeueWaitingTimeOnError time.Duration) IConsumer { + return _r.configureConsumer(preferch, requeueWaitingTimeOnError) +} diff --git a/producer.go b/producer.go new file mode 100644 index 0000000..b6c92ae --- /dev/null +++ b/producer.go @@ -0,0 +1,167 @@ +package rabbit + +import ( + "errors" + "fmt" + "reflect" + "strings" + "sync/atomic" + "time" + + pattern "github.com/jd78/gopatternmatching" + + "github.com/streadway/amqp" +) + +type IProducer interface { + Send(message interface{}, routingKey, messageID, correlationId, messageType string, header map[string]interface{}, + contentType ContentType) error +} + +type sendMessage struct { + message interface{} + routingKey string + messageID string + messageType string + header map[string]interface{} + contentType ContentType + responseChannel *chan error + producer *producer + producerIndex int + correlationId string +} + +type producer struct { + numberOfProducers int + channels []*amqp.Channel + roundRobin int32 + exchangeName string + deliveryMode DeliveryMode + log *rabbitLogger + confirmPublish bool + confirms []chan amqp.Confirmation + producers []chan sendMessage +} + +func (r *rabbit) configureProducer(numberOfProducers int, exchangeName string, deliveryMode DeliveryMode, + confirmPublish bool) IProducer { + if numberOfProducers < 1 { + err := errors.New("numberOfProducers is less than 1") + checkError(err, "", r.log) + } + + channels := make([]*amqp.Channel, numberOfProducers, numberOfProducers) + confirms := make([]chan amqp.Confirmation, numberOfProducers, numberOfProducers) + producers := make([]chan sendMessage, numberOfProducers, numberOfProducers) + for i := 0; i < numberOfProducers; i++ { + channel, err := r.connection.Channel() + + checkError(err, "Error creating the producing channel", r.log) + channels[i] = channel + + if confirmPublish { + confirms[i] = channel.NotifyPublish(make(chan amqp.Confirmation, 1)) + err := channel.Confirm(false) + checkError(err, "failed to create channel confirmation", r.log) + } + + go func() { + ch := make(chan *amqp.Error) + channel.NotifyClose(ch) + err := <-ch + checkError(err, "Channel closed!", r.log) + }() + + go func() { + ch := make(chan bool) + channel.NotifyFlow(ch) + for { + status := <-ch + if r.log.logLevel >= Warn { + r.log.warn(fmt.Sprintf("channel flow detected - flow enabled: %t", status)) + } + } + }() + + producers[i] = make(chan sendMessage, 1) + go func(i int) { + for { + s := <-producers[i] + send(&s, r.log.logLevel) + } + }(i) + } + + return &producer{numberOfProducers, channels, 0, exchangeName, deliveryMode, r.log, confirmPublish, + confirms, producers} +} + +func send(s *sendMessage, logLevel LogLevel) { + serialized, err := serialize(s.message, s.contentType) + checkError(err, "json serializer error", s.producer.log) + + mt := pattern.ResultMatch(s.messageType). + WhenValue("", func() interface{} { return strings.Replace(reflect.TypeOf(s.message).String(), "*", "", 1) }). + ResultOrDefault(s.messageType).(string) + + if logLevel >= Debug { + s.producer.log.debug(fmt.Sprintf("Sending Message %s: %s", mt, serialized)) + } + + pErr := s.producer.channels[s.producerIndex].Publish(s.producer.exchangeName, s.routingKey, false, false, amqp.Publishing{ + Headers: s.header, + ContentType: string(s.contentType), + DeliveryMode: uint8(s.producer.deliveryMode), + MessageId: s.messageID, + Timestamp: time.Now().UTC(), + Type: mt, + Body: serialized, + CorrelationId: s.correlationId, + }) + + if pErr != nil { + *s.responseChannel <- pErr + return + } + + if s.producer.confirmPublish { + if confirmed := <-s.producer.confirms[s.producerIndex]; confirmed.Ack { + *s.responseChannel <- nil + return + } + + *s.responseChannel <- errors.New("unable to publish") + return + } + + *s.responseChannel <- nil +} + +func (p *producer) getNext() int { + return int(atomic.AddInt32(&p.roundRobin, 1)) +} + +//Send a message. +//messageType: if empty the message type will be reflected from the message +func (p *producer) Send(message interface{}, routingKey, messageID, correlationId, messageType string, header map[string]interface{}, contentType ContentType) error { + i := p.getNext() % p.numberOfProducers + response := make(chan error, 1) + + //p.log.info(fmt.Sprintf("message id %s, assigned to worker %d", messageID, i)) + + s := sendMessage{ + producerIndex: i, + contentType: contentType, + header: header, + message: message, + messageID: messageID, + messageType: messageType, + producer: p, + responseChannel: &response, + routingKey: routingKey, + correlationId: correlationId, + } + + p.producers[i] <- s + return <-response +} diff --git a/serializer.go b/serializer.go new file mode 100644 index 0000000..8cc285c --- /dev/null +++ b/serializer.go @@ -0,0 +1,55 @@ +package rabbit + +import ( + "encoding/json" + "reflect" + + "github.com/golang/protobuf/proto" +) + +func serialize(message interface{}, contentType ContentType) ([]byte, error) { + serializeJson := func() ([]byte, error) { + serialized, err := json.Marshal(message) + return serialized, err + } + + switch contentType { + case Json: + return serializeJson() + case Protobuf: + return proto.Marshal(message.(proto.Message)) + default: //use json + return serializeJson() + } +} + +func deserialize(message []byte, contentType ContentType, concreteType reflect.Type) (interface{}, error) { + deserializeJson := func() (interface{}, error) { + pointer := reflect.New(concreteType).Interface() + err := json.Unmarshal(message, &pointer) + if err != nil { + return nil, err + } + noPointer := reflect.Indirect(reflect.ValueOf(pointer)).Interface() + return noPointer, nil + } + + deserializeProtobuf := func() (interface{}, error) { + pointer := reflect.New(concreteType).Interface().(proto.Message) + err := proto.Unmarshal(message, pointer) + if err != nil { + return nil, err + } + noPointer := reflect.Indirect(reflect.ValueOf(pointer)).Interface() + return noPointer, nil + } + + switch contentType { + case Json: + return deserializeJson() + case Protobuf: + return deserializeProtobuf() + default: + return deserializeJson() + } +} diff --git a/topology.go b/topology.go new file mode 100644 index 0000000..39d069f --- /dev/null +++ b/topology.go @@ -0,0 +1,46 @@ +package rabbit + +import ( + "github.com/streadway/amqp" +) + +type topology struct { + channel *amqp.Channel + log *rabbitLogger +} + +func (r *rabbit) topologyConfiguration() *topology { + channel, err := r.connection.Channel() + checkError(err, "Error creating topology channel", r.log) + Queues = make(map[string]Queue) + return &topology{channel, r.log} +} + +func (t *topology) DeclareExchange(name, kind string, durable, autoDelete, internal bool, args map[string]interface{}) *topology { + err := t.channel.ExchangeDeclare(name, kind, durable, autoDelete, internal, false, args) + checkError(err, "Error creating exchange", t.log) + return t +} + +func (t *topology) DeclareQueue(name string, durable, autoDelete, exclusive bool, args map[string]interface{}) *topology { + _, err := t.channel.QueueDeclare(name, durable, autoDelete, exclusive, false, args) + checkError(err, "Error creating queue", t.log) + Queues[name] = Queue{name, durable, autoDelete, exclusive, args} + return t +} + +func (t *topology) BindQueue(name, routingKey, exchangeName string, args map[string]interface{}) *topology { + err := t.channel.QueueBind(name, routingKey, exchangeName, false, args) + checkError(err, "Error creating the queue bind", t.log) + return t +} + +func (t *topology) BindExchange(source, destination, routingKey string, args map[string]interface{}) *topology { + err := t.channel.ExchangeBind(destination, routingKey, source, false, args) + checkError(err, "Error creating the exchange bind", t.log) + return t +} + +func (t *topology) Complete() { + t.channel.Close() +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..60c53fb --- /dev/null +++ b/types.go @@ -0,0 +1,49 @@ +package rabbit + +type logger func(string) + +type rabbitLogger struct { + fatal logger + err logger + info logger + warn logger + debug logger + logLevel LogLevel +} + +func CreateLogger(debug, info, warn, err, fatal logger, logLevel LogLevel) *rabbitLogger { + return &rabbitLogger{fatal, err, info, warn, debug, logLevel} +} + +type ContentType string + +const ( + Json ContentType = "json" + Protobuf ContentType = "protobuf" +) + +type DeliveryMode uint8 + +const ( + Transient DeliveryMode = 0 + Persistent DeliveryMode = 1 +) + +type LogLevel int + +const ( + Fatal LogLevel = iota + Error + Warn + Info + Debug +) + +type HandlerResponse int + +const ( + Completed HandlerResponse = iota + Requeue + Reject + Err +)