Skip to content

Commit

Permalink
feat(inputs.amqp_consumer): Add support to rabbitmq stream queue (inf…
Browse files Browse the repository at this point in the history
  • Loading branch information
massimogallina authored and powersj committed Jul 5, 2023
1 parent 9c05ab3 commit 6fcc5b9
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
4 changes: 4 additions & 0 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## If true, queue will be passively declared.
# queue_passive = false

## Additional arguments when consuming from Queue
# queue_consume_arguments = { }
# queue_consume_arguments = {"x-stream-offset" = "first"}

## A binding between the exchange and queue using this binding key is
## created. If unset, no binding is created.
binding_key = "#"
Expand Down
26 changes: 16 additions & 10 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ type AMQPConsumer struct {
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`

// Queue Name
Queue string `toml:"queue"`
QueueDurability string `toml:"queue_durability"`
QueuePassive bool `toml:"queue_passive"`
Queue string `toml:"queue"`
QueueDurability string `toml:"queue_durability"`
QueuePassive bool `toml:"queue_passive"`
QueueConsumeArguments map[string]string `toml:"queue_consume_arguments"`

// Binding Key
BindingKey string `toml:"binding_key"`
Expand Down Expand Up @@ -284,14 +285,19 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return nil, fmt.Errorf("failed to set QoS: %w", err)
}

consumeArgs := make(amqp.Table, len(a.QueueConsumeArguments))
for k, v := range a.QueueConsumeArguments {
consumeArgs[k] = v
}

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
consumeArgs, // arguments
)
if err != nil {
return nil, fmt.Errorf("failed establishing connection to queue: %w", err)
Expand Down
4 changes: 4 additions & 0 deletions plugins/inputs/amqp_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
## If true, queue will be passively declared.
# queue_passive = false

## Additional arguments when consuming from Queue
# queue_consume_arguments = { }
# queue_consume_arguments = {"x-stream-offset" = "first"}

## A binding between the exchange and queue using this binding key is
## created. If unset, no binding is created.
binding_key = "#"
Expand Down

0 comments on commit 6fcc5b9

Please sign in to comment.