Skip to content

Commit

Permalink
Merge branch 'fix-1299' of https://github.com/sapk-fork/benthos into …
Browse files Browse the repository at this point in the history
…sapk-fork-fix-1299
  • Loading branch information
Jeffail committed Jul 28, 2022
2 parents 694f2de + a8d9122 commit a796675
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
### Added

- Field `batch_size` added to the `generate` input.
- The `amqp_0_9` output now supports setting the `timeout` of publish.

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ require (
github.com/prometheus/common v0.32.1
github.com/pusher/pusher-http-go v4.0.1+incompatible
github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc
github.com/rabbitmq/amqp091-go v1.3.4
github.com/rabbitmq/amqp091-go v1.4.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/rickb777/date v1.17.0
github.com/robfig/cron/v3 v3.0.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -911,8 +911,8 @@ github.com/pusher/pusher-http-go v4.0.1+incompatible h1:4u6tomPG1WhHaST7Wi9mw83Y
github.com/pusher/pusher-http-go v4.0.1+incompatible/go.mod h1:XAv1fxRmVTI++2xsfofDhg7whapsLRG/gH/DXbF3a18=
github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc h1:hK577yxEJ2f5s8w2iy2KimZmgrdAUZUNftE1ESmg2/Q=
github.com/quipo/dependencysolver v0.0.0-20170801134659-2b009cb4ddcc/go.mod h1:OQt6Zo5B3Zs+C49xul8kcHo+fZ1mCLPvd0LFxiZ2DHc=
github.com/rabbitmq/amqp091-go v1.3.4 h1:tXuIslN1nhDqs2t6Jrz3BAoqvt4qIZzxvdbdcxWtHYU=
github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rabbitmq/amqp091-go v1.4.0 h1:T2G+J9W9OY4p64Di23J6yH7tOkMocgnESvYeBjuG9cY=
github.com/rabbitmq/amqp091-go v1.4.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rickb777/date v1.17.0 h1:Qk1MUtTLFfIWYhRaNRyk1t7LmjfkjOEELacQPsoh7Nw=
Expand Down Expand Up @@ -1099,6 +1099,8 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
Expand Down
2 changes: 2 additions & 0 deletions internal/component/output/config_amqp_0_9.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type AMQPConfig struct {
Mandatory bool `json:"mandatory" yaml:"mandatory"`
Immediate bool `json:"immediate" yaml:"immediate"`
TLS btls.Config `json:"tls" yaml:"tls"`
Timeout string `json:"timeout" yaml:"timeout"`
}

// NewAMQPConfig creates a new AMQPConfig with default values.
Expand All @@ -53,5 +54,6 @@ func NewAMQPConfig() AMQPConfig {
Mandatory: false,
Immediate: false,
TLS: btls.NewConfig(),
Timeout: "",
}
}
26 changes: 24 additions & 2 deletions internal/impl/amqp09/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ The fields 'key' and 'type' can be dynamically set using function interpolations
docs.FieldBool("persistent", "Whether message delivery should be persistent (transient by default).").Advanced().HasDefault(false),
docs.FieldBool("mandatory", "Whether to set the mandatory flag on published messages. When set if a published message is routed to zero queues it is returned.").Advanced().HasDefault(false),
docs.FieldBool("immediate", "Whether to set the immediate flag on published messages. When set if there are no ready consumers of a queue then the message is dropped instead of waiting.").Advanced().HasDefault(false),
docs.FieldString("timeout", "The maximum period to wait before abandoning it and reattempting. If not set, wait indefinitely.").Advanced().HasDefault(""),
btls.FieldSpec(),
),
Categories: []string{
Expand Down Expand Up @@ -107,17 +108,26 @@ type amqp09Writer struct {
conn *amqp.Connection
amqpChan *amqp.Channel
returnChan <-chan amqp.Return
timeout time.Duration

deliveryMode uint8

connLock sync.RWMutex
}

func newAMQP09Writer(mgr bundle.NewManagement, conf output.AMQPConfig, log log.Modular) (*amqp09Writer, error) {
var timeout time.Duration
if tout := conf.Timeout; len(tout) > 0 {
var err error
if timeout, err = time.ParseDuration(tout); err != nil {
return nil, fmt.Errorf("failed to parse timeout period string: %v", err)
}
}
a := amqp09Writer{
log: log,
conf: conf,
deliveryMode: amqp.Transient,
timeout: timeout,
}
var err error
if a.metaFilter, err = conf.Metadata.Filter(); err != nil {
Expand Down Expand Up @@ -223,7 +233,7 @@ func (a *amqp09Writer) disconnect() error {
return nil
}

func (a *amqp09Writer) WriteWithContext(ctx context.Context, msg *message.Batch) error {
func (a *amqp09Writer) WriteWithContext(wctx context.Context, msg *message.Batch) error {
a.connLock.RLock()
conn := a.conn
amqpChan := a.amqpChan
Expand All @@ -234,6 +244,17 @@ func (a *amqp09Writer) WriteWithContext(ctx context.Context, msg *message.Batch)
return component.ErrNotConnected
}

var ctx context.Context
if a.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(
wctx, a.timeout,
)
defer cancel()
} else {
ctx = wctx
}

return output.IterateBatchedSend(msg, func(i int, p *message.Part) error {
bindingKey := strings.ReplaceAll(a.key.String(i, msg), "/", ".")
msgType := strings.ReplaceAll(a.msgType.String(i, msg), "/", ".")
Expand All @@ -258,7 +279,8 @@ func (a *amqp09Writer) WriteWithContext(ctx context.Context, msg *message.Batch)
return nil
})

conf, err := amqpChan.PublishWithDeferredConfirm(
conf, err := amqpChan.PublishWithDeferredConfirmWithContext(
ctx,
a.conf.Exchange, // publish to an exchange
bindingKey, // routing to 0 or more queues
a.conf.Mandatory, // mandatory
Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/outputs/amqp_0_9.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ output:
persistent: false
mandatory: false
immediate: false
timeout: ""
tls:
enabled: false
skip_cert_verify: false
Expand Down Expand Up @@ -263,6 +264,14 @@ Whether to set the immediate flag on published messages. When set if there are n
Type: `bool`
Default: `false`

### `timeout`

The maximum period to wait before abandoning it and reattempting. If not set, wait indefinitely.


Type: `string`
Default: `""`

### `tls`

Custom TLS settings can be used to override system defaults.
Expand Down

0 comments on commit a796675

Please sign in to comment.