From 4f02d9d59bd99b491ab1b41e3be7e55257e7d8a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aitor=20P=C3=A9rez=20Cedres?= Date: Wed, 21 Jun 2023 11:29:30 +0100 Subject: [PATCH 1/2] Constant for consumer timeout and doc updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Documentation to queue argument constants was missing links to the constant themselves. Go Doc allows to link to a specific symbol using squared brackets. Added as well a constant for SAC in CQ and QQ. Signed-off-by: Aitor Pérez Cedres --- examples_test.go | 19 +++++++++++++++++++ types.go | 45 ++++++++++++++++++++++++++++++++------------- 2 files changed, 51 insertions(+), 13 deletions(-) diff --git a/examples_test.go b/examples_test.go index 7c44bb1..39a064a 100644 --- a/examples_test.go +++ b/examples_test.go @@ -550,8 +550,27 @@ func ExampleChannel_QueueDeclare_classicQueueV2() { false, // exclusive false, // noWait amqp.Table{ + amqp.QueueTypeArg: amqp.QueueTypeClassic, amqp.QueueVersionArg: 2, }, ) log.Printf("Declared Classic Queue v2: %s", q.Name) } + +func ExampleChannel_QueueDeclare_consumerTimeout() { + conn, _ := amqp.Dial("amqp://localhost") + ch, _ := conn.Channel() + // this works only with RabbitMQ 3.12+ + q, _ := ch.QueueDeclare( + "my-classic-queue-v2", // queue name + true, // durable + false, // auto-delete + false, // exclusive + false, // noWait + amqp.Table{ + amqp.QueueTypeArg: amqp.QueueTypeQuorum, // also works with classic queues + amqp.ConsumerTimeoutArg: 600_000, // 10 minute consumer timeout + }, + ) + log.Printf("Declared Classic Queue v2: %s", q.Name) +} diff --git a/types.go b/types.go index 42a1c69..8f43a72 100644 --- a/types.go +++ b/types.go @@ -11,6 +11,8 @@ import ( "time" ) +// DefaultExchange is the default direct exchange that binds every queue by its +// name. Applications can route to a queue using the queue name as routing key. const DefaultExchange = "" // Constants for standard AMQP 0-9-1 exchange types. @@ -214,29 +216,39 @@ type Decimal struct { // Most common queue argument keys in queue declaration. For a comprehensive list // of queue arguments, visit [RabbitMQ Queue docs]. // -// QueueTypeArg queue argument is used to declare quorum and stream queues. -// Accepted values are QueueTypeClassic (default), QueueTypeQuorum and -// QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their +// [QueueTypeArg] queue argument is used to declare quorum and stream queues. +// Accepted values are [QueueTypeClassic] (default), [QueueTypeQuorum] and +// [QueueTypeStream]. [Quorum Queues] accept (almost) all queue arguments as their // Classic Queues counterparts. Check [feature comparison] docs for more // information. // -// Queues can define their [max length] using QueueMaxLenArg and -// QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using -// QueueOverflowArg. Accepted values are QueueOverflowDropHead (default), -// QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX. +// Queues can define their [max length] using [QueueMaxLenArg] and +// [QueueMaxLenBytesArg] queue arguments. Overflow behaviour is set using +// [QueueOverflowArg]. Accepted values are [QueueOverflowDropHead] (default), +// [QueueOverflowRejectPublish] and [QueueOverflowRejectPublishDLX]. // -// [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an -// unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg. -// This will set a time-to-live for **messages** in the queue. +// [Queue TTL] can be defined using [QueueTTLArg]. That is, the time-to-live for an +// unused queue. [Queue Message TTL] can be defined using [QueueMessageTTLArg]. +// This will set a time-to-live for messages in the queue. // -// [Stream retention] can be configured using StreamMaxLenBytesArg, to set the +// [Stream retention] can be configured using [StreamMaxLenBytesArg], to set the // maximum size of the stream. Please note that stream queues always keep, at -// least, one segment. [Stream retention] can also be set using StreamMaxAgeArg, +// least, one segment. [Stream retention] can also be set using [StreamMaxAgeArg], // to set time-based retention. Values are string with unit suffix. Valid // suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment -// size can be set using StreamMaxSegmentSizeBytesArg. The default value is +// size can be set using [StreamMaxSegmentSizeBytesArg]. The default value is // 500_000_000 bytes ~= 500 megabytes // +// Starting with RabbitMQ 3.12, consumer timeout can be configured as a queue +// argument. This is the timeout for a consumer to acknowledge a message. The +// value is the time in milliseconds. The timeout is evaluated periodically, +// at one minute intervals. Values lower than one minute are not supported. +// See the [consumer timeout] guide for more information. +// +// [Single Active Consumer] on quorum and classic queues can be configured +// using [SingleActiveConsumerArg]. This argument expects a boolean value. It is +// false by default. +// // [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html // [Stream retention]: https://rabbitmq.com/streams.html#retention // [max length]: https://rabbitmq.com/maxlength.html @@ -244,6 +256,8 @@ type Decimal struct { // [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl // [Quorum Queues]: https://rabbitmq.com/quorum-queues.html // [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison +// [consumer timeout]: https://rabbitmq.com/consumers.html#acknowledgement-timeout +// [Single Active Consumer]: https://rabbitmq.com/consumers.html#single-active-consumer const ( QueueTypeArg = "x-queue-type" QueueMaxLenArg = "x-max-length" @@ -256,6 +270,9 @@ const ( StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes" // QueueVersionArg declares the Classic Queue version to use. Expects an integer, either 1 or 2. QueueVersionArg = "x-queue-version" + // ConsumerTimeoutArg is available in RabbitMQ 3.12+ as a queue argument. + ConsumerTimeoutArg = "x-consumer-timeout" + SingleActiveConsumerArg = "x-single-active-consumer" ) // Values for queue arguments. Use as values for queue arguments during queue declaration. @@ -267,6 +284,8 @@ const ( // amqp.QueueMaxLenArg: 100, // amqp.QueueTTLArg: 1800000, // } +// +// Refer to [Channel.QueueDeclare] for more examples. const ( QueueTypeClassic = "classic" QueueTypeQuorum = "quorum" From f23ccf73b45e03142d0a913973bc0e8558c0355f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aitor=20P=C3=A9rez=20Cedres?= Date: Wed, 21 Jun 2023 13:15:10 +0100 Subject: [PATCH 2/2] Fix panic in tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reported in https://github.com/rabbitmq/amqp091-go/actions/runs/5333022714/jobs/9662961511 ``` panic: runtime error: invalid memory address or nil pointer dereference panic: runtime error: invalid memory address or nil pointer dereference [signal 0xc0000005 code=0x0 addr=0x0 pc=0x7e012c] goroutine 3810 [running]: github.com/rabbitmq/amqp091-go.(*Connection).IsClosed(...) D:/a/amqp091-go/amqp091-go/connection.go:455 github.com/rabbitmq/amqp091-go.(*Connection).Close(0x0) D:/a/amqp091-go/amqp091-go/connection.go:388 +0x4c panic({0x86a960, 0xb32080}) C:/hostedtoolcache/windows/go/1.20.5/x64/src/runtime/panic.go:890 +0x263 github.com/rabbitmq/amqp091-go.(*Connection).ConnectionState(...) D:/a/amqp091-go/amqp091-go/connection.go:317 github.com/rabbitmq/amqp091-go.TestTLSHandshake.func2() D:/a/amqp091-go/amqp091-go/tls_test.go:111 +0x27b created by github.com/rabbitmq/amqp091-go.TestTLSHandshake D:/a/amqp091-go/amqp091-go/tls_test.go:104 +0x3e6 exit status 2 ``` If DialTLS returns an error, c will be nil, causing a panic later on in c.ConnectionState() Signed-off-by: Aitor Pérez Cedres --- tls_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tls_test.go b/tls_test.go index bbd0826..44c5fb9 100644 --- a/tls_test.go +++ b/tls_test.go @@ -66,7 +66,7 @@ func (s *tlsServer) Serve(t *testing.T) { } func startTLSServer(t *testing.T, cfg *tls.Config) tlsServer { - l, err := tls.Listen("tcp", "127.0.0.1:0", cfg) + l, err := tls.Listen("tcp", "127.0.0.1:3456", cfg) if err != nil { t.Fatalf("TLS server Listen error: %+v", err) } @@ -105,6 +105,7 @@ func TestTLSHandshake(t *testing.T) { c, err := DialTLS(srv.URL, tlsClientConfig(t)) if err != nil { errs <- fmt.Errorf("expected to open a TLS connection, got err: %v", err) + return } defer c.Close()