From cf35733947817207f35af72de3eb4691a8c24095 Mon Sep 17 00:00:00 2001 From: Thomas Hudry Date: Wed, 24 Jul 2019 10:00:27 +0200 Subject: [PATCH 1/3] Add /api/consumers support --- consumers.go | 58 +++++++++++++++++++++++++++++++ rabbithole_test.go | 86 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 consumers.go diff --git a/consumers.go b/consumers.go new file mode 100644 index 0000000..6d72818 --- /dev/null +++ b/consumers.go @@ -0,0 +1,58 @@ +package rabbithole + +import ( + "net/url" +) + +type BriefQueueInfo struct { + Name string `json:"name"` + Vhost string `json:"vhost"` +} + +type BriefChannelDetail struct { + ConnectionName string `json:"connection_name"` + Name string `json:"name"` + Node string `json:"node"` + Number int `json:"number"` + PeerHost string `json:"peer_host"` + PeerPort int `json:"peer_port"` + User string `json:"user"` +} + +type ConsumerInfo struct { + Arguments map[string]interface{} `json:"arguments"` + AckRequired bool `json:"ack_required"` + ChannelDetails BriefChannelDetail `json:"channel_details"` + ConsumerTag string `json:"consumer_tag"` + Exclusive bool `json:"exclusive"` + PrefetchCount int `json:"prefetch_count"` + Queue BriefQueueInfo `json:"queue"` +} + +// ListConsumers lists all consumers in the cluster. +func (c *Client) ListConsumers() (rec []ConsumerInfo, err error) { + req, err := newGETRequest(c, "consumers") + if err != nil { + return []ConsumerInfo{}, err + } + + if err = executeAndParseRequest(c, req, &rec); err != nil { + return []ConsumerInfo{}, err + } + + return rec, nil +} + +// ListConsumersIn lists all consumers in a virtual host. +func (c *Client) ListConsumersIn(vhost string) (rec []ConsumerInfo, err error) { + req, err := newGETRequest(c, "consumers/"+url.PathEscape(vhost)) + if err != nil { + return []ConsumerInfo{}, err + } + + if err = executeAndParseRequest(c, req, &rec); err != nil { + return []ConsumerInfo{}, err + } + + return rec, nil +} diff --git a/rabbithole_test.go b/rabbithole_test.go index ece1b3e..5ecafee 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -707,6 +707,92 @@ var _ = Describe("Rabbithole", func() { }) }) + Context("GET /consumers", func() { + It("returns decoded response", func() { + conn := openConnection("/") + defer conn.Close() + + ch, err := conn.Channel() + Ω(err).Should(BeNil()) + defer ch.Close() + + _, err = ch.QueueDeclare( + "", // name + false, // durable + false, // auto delete + true, // exclusive + false, + nil) + Ω(err).Should(BeNil()) + + _, err = ch.Consume( + "", // queue + "", // consumer + false, // auto ack + false, // exclusive + false, // no local + false, // no wait + amqp.Table{}) + Ω(err).Should(BeNil()) + + // give internal events a moment to be + // handled + awaitEventPropagation() + + cs, err := rmqc.ListConsumers() + Ω(err).Should(BeNil()) + + Ω(len(cs)).Should(Equal(1)) + c := cs[0] + Ω(c.Queue.Name).ShouldNot(Equal("")) + Ω(c.ConsumerTag).ShouldNot(Equal("")) + Ω(c.Exclusive).ShouldNot(BeNil()) + }) + }) + + Context("GET /consumers/{vhost}", func() { + It("returns decoded response", func() { + conn := openConnection("rabbit/hole") + defer conn.Close() + + ch, err := conn.Channel() + Ω(err).Should(BeNil()) + defer ch.Close() + + _, err = ch.QueueDeclare( + "", // name + false, // durable + false, // auto delete + true, // exclusive + false, + nil) + Ω(err).Should(BeNil()) + + _, err = ch.Consume( + "", // queue + "", // consumer + false, // auto ack + false, // exclusive + false, // no local + false, // no wait + amqp.Table{}) + Ω(err).Should(BeNil()) + + // give internal events a moment to be + // handled + awaitEventPropagation() + + cs, err := rmqc.ListConsumers() + Ω(err).Should(BeNil()) + + Ω(len(cs)).Should(Equal(1)) + c := cs[0] + Ω(c.Queue.Name).ShouldNot(Equal("")) + Ω(c.ConsumerTag).ShouldNot(Equal("")) + Ω(c.Exclusive).ShouldNot(BeNil()) + }) + }) + Context("GET /users", func() { It("returns decoded response", func() { xs, err := rmqc.ListUsers() From 13d99fcf29be29e84e8c35e081fb79c47184b9e1 Mon Sep 17 00:00:00 2001 From: Thomas Hudry Date: Wed, 24 Jul 2019 10:30:12 +0200 Subject: [PATCH 2/3] Avoid allocating when returning an error --- consumers.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/consumers.go b/consumers.go index 6d72818..4d8beb8 100644 --- a/consumers.go +++ b/consumers.go @@ -33,26 +33,26 @@ type ConsumerInfo struct { func (c *Client) ListConsumers() (rec []ConsumerInfo, err error) { req, err := newGETRequest(c, "consumers") if err != nil { - return []ConsumerInfo{}, err + return } if err = executeAndParseRequest(c, req, &rec); err != nil { - return []ConsumerInfo{}, err + return } - return rec, nil + return } // ListConsumersIn lists all consumers in a virtual host. func (c *Client) ListConsumersIn(vhost string) (rec []ConsumerInfo, err error) { req, err := newGETRequest(c, "consumers/"+url.PathEscape(vhost)) if err != nil { - return []ConsumerInfo{}, err + return } if err = executeAndParseRequest(c, req, &rec); err != nil { - return []ConsumerInfo{}, err + return } - return rec, nil + return } From 641510a90a319e364d9bc145cd091a35ec87fcde Mon Sep 17 00:00:00 2001 From: Thomas Hudry Date: Wed, 24 Jul 2019 14:29:58 +0200 Subject: [PATCH 3/3] add AcknowledgementMode type --- consumers.go | 21 ++++++++++++++------- rabbithole_test.go | 4 +++- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/consumers.go b/consumers.go index 4d8beb8..ef3cd86 100644 --- a/consumers.go +++ b/consumers.go @@ -4,6 +4,13 @@ import ( "net/url" ) +type AcknowledgementMode bool + +const ( + ManualAcknowledgement AcknowledgementMode = true + AutomaticAcknowledgment AcknowledgementMode = false +) + type BriefQueueInfo struct { Name string `json:"name"` Vhost string `json:"vhost"` @@ -20,13 +27,13 @@ type BriefChannelDetail struct { } type ConsumerInfo struct { - Arguments map[string]interface{} `json:"arguments"` - AckRequired bool `json:"ack_required"` - ChannelDetails BriefChannelDetail `json:"channel_details"` - ConsumerTag string `json:"consumer_tag"` - Exclusive bool `json:"exclusive"` - PrefetchCount int `json:"prefetch_count"` - Queue BriefQueueInfo `json:"queue"` + Arguments map[string]interface{} `json:"arguments"` + AcknowledgementMode AcknowledgementMode `json:"ack_required"` + ChannelDetails BriefChannelDetail `json:"channel_details"` + ConsumerTag string `json:"consumer_tag"` + Exclusive bool `json:"exclusive"` + PrefetchCount int `json:"prefetch_count"` + Queue BriefQueueInfo `json:"queue"` } // ListConsumers lists all consumers in the cluster. diff --git a/rabbithole_test.go b/rabbithole_test.go index 5ecafee..8ae0f47 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -747,6 +747,7 @@ var _ = Describe("Rabbithole", func() { Ω(c.Queue.Name).ShouldNot(Equal("")) Ω(c.ConsumerTag).ShouldNot(Equal("")) Ω(c.Exclusive).ShouldNot(BeNil()) + Ω(c.AcknowledgementMode).Should(Equal(ManualAcknowledgement)) }) }) @@ -771,7 +772,7 @@ var _ = Describe("Rabbithole", func() { _, err = ch.Consume( "", // queue "", // consumer - false, // auto ack + true, // auto ack false, // exclusive false, // no local false, // no wait @@ -790,6 +791,7 @@ var _ = Describe("Rabbithole", func() { Ω(c.Queue.Name).ShouldNot(Equal("")) Ω(c.ConsumerTag).ShouldNot(Equal("")) Ω(c.Exclusive).ShouldNot(BeNil()) + Ω(c.AcknowledgementMode).Should(Equal(AutomaticAcknowledgment)) }) })