diff --git a/README.md b/README.md index fe29b63..cbd31d4 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,9 @@ # Rebbit -[![Build Status](https://travis-ci.org/jd78/rabbit.svg?branch=master)](https://travis-ci.org/jd78/rabbit) - Rabbit is built on top of streadway/amqp RabbitMQ client (https://github.com/streadway/amqp) and brings the following functionalities: - Can produce and consume messages in jSON and Protobuf. -- Can produce and consume concurrently, specifying the wanted number. The producer is concurrent round robin, the consumer is a concurrent multichannel. +- Can produce and consume concurrently, specifying the number of consumers/producers. The producer is concurrent round robin, the consumer is a concurrent multichannels. - Partition consumer, can consume messages in sequences while concurrent. Assigning a partition id to a message, this will be consumed by the same channel, in order. - Dynamic handlers. - Message retries. @@ -53,7 +51,7 @@ rabbit.TopologyConfiguration(). #### Producer It's necessary to configure a producer for each output exchange you want to send messages to. -Each producer has at least one rabbit channel. You can specify how many concurrent and round robin producers you want, keeping in mind that each producer specified in the parameter "numberOfProducers" will create it's own rabbit channel. This has been done to ensure that the publish confirmation works correctly. Two messages cannot be sent at the same time for the same channel. Sent are queued in a round robin way. +Each producer has at least one rabbit channel. You can specify how many concurrent and round robin producers you want, keeping in mind that each producer specified in the parameter "numberOfProducers" will create it's own rabbit channel. This has been done to ensure that the publish confirmation works correctly. Two messages cannot be sent at the same time for the same channel. Sends are queued in a round robin way. ```go @@ -99,7 +97,7 @@ func TestMessageHandler(test string, producer rabbit.IProducer) func(message int ``` A message handler can have parameters or not. You might want to inject your repository, a rabbit producer, etc. -A handler always return the function func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse, where message and handler are respectively the message and the handler that will be injected by the rabbit library; and rabbit.HandlerResponse is the return status of the handler that will be read by the rabbit library to perfom specific operations like ack, reqeueing, etc. +A handler always returns the function func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse, where message and handler are respectively the message and the handler that will be injected by the rabbit library; and rabbit.HandlerResponse is the return status of the handler that will be read by the rabbit library to perfom specific operations like ack, reqeueing, etc. rabbit.HandlerResponse can be: - rabbit.Completed, if all good, the message will be eventually acked @@ -123,11 +121,38 @@ consumer := rabbit.ConfigureConsumer(100, 5*time.Second) ```go -h := TestMessageHandler("teststring", producer) //I'm passing a string and the producer dependency +h := TestMessageHandler("teststring", consumerProducer) //I'm passing a string and the producer dependency consumer.AddHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h) ``` main.TestMessage is the type that will be send as envelope type, and the reflected type as second parameter is the is the contract representing the message. Finally, as third parameter, we pass the handler that will handle the message. - \ No newline at end of file + At this point, to start the consumer, without using the partitioner, it's enough to do + + ```go + +//parameters: queue name, ack, use active passive, passive check for consumer interval, concurrent consumers, amqp consumer args + consumer.StartConsuming("test.inbound", true, true, 1000*time.Millisecond, 1, nil) + + ``` + +If you want to use the partitioner instead you need to configure the partition resolver. + +```go + +partitionerResolver := make(map[reflect.Type]func(message interface{}) int64) +partitionerResolver[reflect.TypeOf(TestMessage{})] = func(message interface{}) int64 { return int64(message.(TestMessage).Id) } + +//parameters: queue name, ack, use active passive, passive check for consumer interval, max time waiting on partition handler error, +//number or partitions, resolver, amqp consumer args. +consumer.StartConsumingPartitions("test.inbound", true, true, + 3000*time.Millisecond, 5*time.Second, 30, partitionerResolver, nil) + +``` + +Be aware that if a partition is in error, no messages will be consumed for that partition but will be queued until the message in error gets correctly consumed. + +### Demo + +A simple demo is located in the example folder diff --git a/example/example b/example/example new file mode 100755 index 0000000..e5cd8e2 Binary files /dev/null and b/example/example differ diff --git a/example/handler.go b/example/handler.go new file mode 100644 index 0000000..775a664 --- /dev/null +++ b/example/handler.go @@ -0,0 +1,34 @@ +package main + +import ( + "fmt" + "log" + "rabbit" + "time" +) + +func TestMessageHandler(test string, producer rabbit.IProducer) func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse { + return func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse { + testMessage := message.(TestMessage) + log.Println("executing testMessage, test: " + test) + log.Println(testMessage) + log.Println(fmt.Sprintf("header: %v", header)) + time.Sleep(500 * time.Millisecond) + err := producer.Send(message, "", "test", "correlation", "", nil, rabbit.Json) + if err != nil { + log.Println("Error sending the message") + return rabbit.Err + } + return rabbit.Completed + } +} + +func ProtoMessageHandler() func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse { + return func(message interface{}, header map[string]interface{}) rabbit.HandlerResponse { + testMessage := message.(Test) + log.Printf("Id: %d", testMessage.GetId()) + log.Printf("Label: %s", testMessage.GetLabel()) + time.Sleep(500 * time.Millisecond) + return rabbit.Completed + } +} diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..5b0780b --- /dev/null +++ b/example/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "fmt" + "log" + "rabbit" + "reflect" + "runtime" + "time" + + "github.com/golang/protobuf/proto" +) + +type TestMessage struct { + Id int + Name string +} + +var roundrobin int64 = 0 + +func main() { + + runtime.GOMAXPROCS(4) + + logger := rabbit.CreateLogger( + func(l string) { log.Println(l) }, + func(l string) { log.Println(l) }, + func(l string) { log.Println(l) }, + func(l string) { log.Fatal(l) }, + func(l string) { log.Println(l) }, + rabbit.Debug) + + rabbit.Initialize(logger, "amqp://guest:guest@localhost:5672/") + rabbit.TopologyConfiguration(). + DeclareExchange("test.output", "topic", true, false, false, nil). + DeclareExchange("consumer.output", "fanout", true, false, false, nil). + DeclareQueue("test.inbound", true, false, false, nil). + BindQueue("test.inbound", "#", "test.output", nil). + DeclareQueue("consumer.inbound", true, false, false, map[string]interface{}{ + "x-message-ttl": int32(600000), + }). + BindQueue("consumer.inbound", "", "consumer.output", nil). + Complete() + + producer := rabbit.ConfigureProducer(3, "test.output", rabbit.Transient, true) + consumerProducer := rabbit.ConfigureProducer(3, "consumer.output", rabbit.Transient, true) + + consumer := rabbit.ConfigureConsumer(100, 5*time.Second) + h := TestMessageHandler("teststring", consumerProducer) + consumer.AddHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h) + + protoH := ProtoMessageHandler() + consumer.AddHandler("main.Test", reflect.TypeOf(Test{}), protoH) + + //consumer.AddRetryHandler("main.TestMessage", reflect.TypeOf(TestMessage{}), h, 10) + partitionerResolver := make(map[reflect.Type]func(message interface{}) int64) + partitionerResolver[reflect.TypeOf(TestMessage{})] = func(message interface{}) int64 { return int64(message.(TestMessage).Id) } + //consumer.StartConsuming("test.inbound", true, true, 1000*time.Millisecond, 1, nil) + consumer.StartConsumingPartitions("test.inbound", true, true, + 3000*time.Millisecond, 5*time.Second, 30, partitionerResolver, nil) + + for i := 0; i < 1; i++ { + go func(i int) { + message := TestMessage{i, "test"} + err := producer.Send(message, "testmessage.1", fmt.Sprintf("%d", i), "correlation", "", nil, rabbit.Json) + if err != nil { + log.Fatal(err) + } + + protoMessage := &Test{ + Id: proto.Int32(int32(i)), + Label: proto.String("test protobuf"), + } + err2 := producer.Send(protoMessage, "protoMessage.1", fmt.Sprintf("%d", i), "correlation", "", nil, rabbit.Protobuf) + if err2 != nil { + log.Fatal(err2) + } + }(i) + } + + fmt.Scanln() +} diff --git a/example/protoMessage.pb.go b/example/protoMessage.pb.go new file mode 100644 index 0000000..d405224 --- /dev/null +++ b/example/protoMessage.pb.go @@ -0,0 +1,69 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: protoMessage.proto + +/* +Package main is a generated protocol buffer package. + +It is generated from these files: + protoMessage.proto + +It has these top-level messages: + Test +*/ +package main + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Test struct { + Id *int32 `protobuf:"varint,1,req,name=Id" json:"Id,omitempty"` + Label *string `protobuf:"bytes,2,req,name=label" json:"label,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Test) Reset() { *m = Test{} } +func (m *Test) String() string { return proto.CompactTextString(m) } +func (*Test) ProtoMessage() {} +func (*Test) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Test) GetId() int32 { + if m != nil && m.Id != nil { + return *m.Id + } + return 0 +} + +func (m *Test) GetLabel() string { + if m != nil && m.Label != nil { + return *m.Label + } + return "" +} + +func init() { + proto.RegisterType((*Test)(nil), "main.Test") +} + +func init() { proto.RegisterFile("protoMessage.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 89 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2a, 0x28, 0xca, 0x2f, + 0xc9, 0xf7, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0xd5, 0x03, 0x73, 0x84, 0x58, 0x72, 0x13, 0x33, + 0xf3, 0x94, 0x74, 0xb8, 0x58, 0x42, 0x52, 0x8b, 0x4b, 0x84, 0xf8, 0xb8, 0x98, 0x3c, 0x53, 0x24, + 0x18, 0x15, 0x98, 0x34, 0x58, 0x83, 0x98, 0x3c, 0x53, 0x84, 0x44, 0xb8, 0x58, 0x73, 0x12, 0x93, + 0x52, 0x73, 0x24, 0x98, 0x14, 0x98, 0x34, 0x38, 0x83, 0x20, 0x1c, 0x40, 0x00, 0x00, 0x00, 0xff, + 0xff, 0x12, 0x80, 0xd2, 0x78, 0x48, 0x00, 0x00, 0x00, +} diff --git a/example/protoMessage.proto b/example/protoMessage.proto new file mode 100644 index 0000000..b41ac06 --- /dev/null +++ b/example/protoMessage.proto @@ -0,0 +1,7 @@ +syntax = "proto2"; +package main; + +message Test { + required int32 Id = 1; + required string label = 2; +} \ No newline at end of file