Skip to content

Commit

Permalink
Constant for consumer timeout and doc updates
Browse files Browse the repository at this point in the history
Documentation to queue argument constants was missing links to the
constant themselves. Go Doc allows to link to a specific symbol using
squared brackets.

Added as well a constant for SAC in CQ and QQ.

Signed-off-by: Aitor Pérez Cedres <acedres@vmware.com>
  • Loading branch information
Zerpet committed Jun 21, 2023
1 parent e834fc5 commit 4f02d9d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 13 deletions.
19 changes: 19 additions & 0 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,27 @@ func ExampleChannel_QueueDeclare_classicQueueV2() {
false, // exclusive
false, // noWait
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeClassic,
amqp.QueueVersionArg: 2,
},
)
log.Printf("Declared Classic Queue v2: %s", q.Name)
}

func ExampleChannel_QueueDeclare_consumerTimeout() {
conn, _ := amqp.Dial("amqp://localhost")
ch, _ := conn.Channel()
// this works only with RabbitMQ 3.12+
q, _ := ch.QueueDeclare(
"my-classic-queue-v2", // queue name
true, // durable
false, // auto-delete
false, // exclusive
false, // noWait
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeQuorum, // also works with classic queues
amqp.ConsumerTimeoutArg: 600_000, // 10 minute consumer timeout
},
)
log.Printf("Declared Classic Queue v2: %s", q.Name)
}
45 changes: 32 additions & 13 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"
)

// DefaultExchange is the default direct exchange that binds every queue by its
// name. Applications can route to a queue using the queue name as routing key.
const DefaultExchange = ""

// Constants for standard AMQP 0-9-1 exchange types.
Expand Down Expand Up @@ -214,36 +216,48 @@ type Decimal struct {
// Most common queue argument keys in queue declaration. For a comprehensive list
// of queue arguments, visit [RabbitMQ Queue docs].
//
// QueueTypeArg queue argument is used to declare quorum and stream queues.
// Accepted values are QueueTypeClassic (default), QueueTypeQuorum and
// QueueTypeStream. [Quorum Queues] accept (almost) all queue arguments as their
// [QueueTypeArg] queue argument is used to declare quorum and stream queues.
// Accepted values are [QueueTypeClassic] (default), [QueueTypeQuorum] and
// [QueueTypeStream]. [Quorum Queues] accept (almost) all queue arguments as their
// Classic Queues counterparts. Check [feature comparison] docs for more
// information.
//
// Queues can define their [max length] using QueueMaxLenArg and
// QueueMaxLenBytesArg queue arguments. Overflow behaviour is set using
// QueueOverflowArg. Accepted values are QueueOverflowDropHead (default),
// QueueOverflowRejectPublish and QueueOverflowRejectPublishDLX.
// Queues can define their [max length] using [QueueMaxLenArg] and
// [QueueMaxLenBytesArg] queue arguments. Overflow behaviour is set using
// [QueueOverflowArg]. Accepted values are [QueueOverflowDropHead] (default),
// [QueueOverflowRejectPublish] and [QueueOverflowRejectPublishDLX].
//
// [Queue TTL] can be defined using QueueTTLArg. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using QueueMessageTTLArg.
// This will set a time-to-live for **messages** in the queue.
// [Queue TTL] can be defined using [QueueTTLArg]. That is, the time-to-live for an
// unused queue. [Queue Message TTL] can be defined using [QueueMessageTTLArg].
// This will set a time-to-live for messages in the queue.
//
// [Stream retention] can be configured using StreamMaxLenBytesArg, to set the
// [Stream retention] can be configured using [StreamMaxLenBytesArg], to set the
// maximum size of the stream. Please note that stream queues always keep, at
// least, one segment. [Stream retention] can also be set using StreamMaxAgeArg,
// least, one segment. [Stream retention] can also be set using [StreamMaxAgeArg],
// to set time-based retention. Values are string with unit suffix. Valid
// suffixes are Y, M, D, h, m, s. E.g. "7D" for one week. The maximum segment
// size can be set using StreamMaxSegmentSizeBytesArg. The default value is
// size can be set using [StreamMaxSegmentSizeBytesArg]. The default value is
// 500_000_000 bytes ~= 500 megabytes
//
// Starting with RabbitMQ 3.12, consumer timeout can be configured as a queue
// argument. This is the timeout for a consumer to acknowledge a message. The
// value is the time in milliseconds. The timeout is evaluated periodically,
// at one minute intervals. Values lower than one minute are not supported.
// See the [consumer timeout] guide for more information.
//
// [Single Active Consumer] on quorum and classic queues can be configured
// using [SingleActiveConsumerArg]. This argument expects a boolean value. It is
// false by default.
//
// [RabbitMQ Queue docs]: https://rabbitmq.com/queues.html
// [Stream retention]: https://rabbitmq.com/streams.html#retention
// [max length]: https://rabbitmq.com/maxlength.html
// [Queue TTL]: https://rabbitmq.com/ttl.html#queue-ttl
// [Queue Message TTL]: https://rabbitmq.com/ttl.html#per-queue-message-ttl
// [Quorum Queues]: https://rabbitmq.com/quorum-queues.html
// [feature comparison]: https://rabbitmq.com/quorum-queues.html#feature-comparison
// [consumer timeout]: https://rabbitmq.com/consumers.html#acknowledgement-timeout
// [Single Active Consumer]: https://rabbitmq.com/consumers.html#single-active-consumer
const (
QueueTypeArg = "x-queue-type"
QueueMaxLenArg = "x-max-length"
Expand All @@ -256,6 +270,9 @@ const (
StreamMaxSegmentSizeBytesArg = "x-stream-max-segment-size-bytes"
// QueueVersionArg declares the Classic Queue version to use. Expects an integer, either 1 or 2.
QueueVersionArg = "x-queue-version"
// ConsumerTimeoutArg is available in RabbitMQ 3.12+ as a queue argument.
ConsumerTimeoutArg = "x-consumer-timeout"
SingleActiveConsumerArg = "x-single-active-consumer"
)

// Values for queue arguments. Use as values for queue arguments during queue declaration.
Expand All @@ -267,6 +284,8 @@ const (
// amqp.QueueMaxLenArg: 100,
// amqp.QueueTTLArg: 1800000,
// }
//
// Refer to [Channel.QueueDeclare] for more examples.
const (
QueueTypeClassic = "classic"
QueueTypeQuorum = "quorum"
Expand Down

0 comments on commit 4f02d9d

Please sign in to comment.