Skip to content

Commit

Permalink
✨ amqp_0_9 output add timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Antoine GIRARD committed Jul 25, 2022
1 parent 21f4061 commit a8d9122
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file.
- The `mongodb` processor and output default `write_concern.w_timeout` empty value no longer causes configuration issues.
- Field `message_name` added to the logger config.
- The `amqp_1` input and output should no longer spam logs with timeout errors during graceful termination.
- The `amqp_0_9` output now supports setting the `timeout` of publish.

## 4.4.1 - 2022-07-19

Expand Down
2 changes: 2 additions & 0 deletions internal/component/output/config_amqp_0_9.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type AMQPConfig struct {
Mandatory bool `json:"mandatory" yaml:"mandatory"`
Immediate bool `json:"immediate" yaml:"immediate"`
TLS btls.Config `json:"tls" yaml:"tls"`
Timeout string `json:"timeout" yaml:"timeout"`
}

// NewAMQPConfig creates a new AMQPConfig with default values.
Expand All @@ -53,5 +54,6 @@ func NewAMQPConfig() AMQPConfig {
Mandatory: false,
Immediate: false,
TLS: btls.NewConfig(),
Timeout: "",
}
}
23 changes: 22 additions & 1 deletion internal/impl/amqp09/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ The fields 'key' and 'type' can be dynamically set using function interpolations
docs.FieldBool("persistent", "Whether message delivery should be persistent (transient by default).").Advanced().HasDefault(false),
docs.FieldBool("mandatory", "Whether to set the mandatory flag on published messages. When set if a published message is routed to zero queues it is returned.").Advanced().HasDefault(false),
docs.FieldBool("immediate", "Whether to set the immediate flag on published messages. When set if there are no ready consumers of a queue then the message is dropped instead of waiting.").Advanced().HasDefault(false),
docs.FieldString("timeout", "The maximum period to wait before abandoning it and reattempting. If not set, wait indefinitely.").Advanced().HasDefault(""),
btls.FieldSpec(),
),
Categories: []string{
Expand Down Expand Up @@ -107,17 +108,26 @@ type amqp09Writer struct {
conn *amqp.Connection
amqpChan *amqp.Channel
returnChan <-chan amqp.Return
timeout time.Duration

deliveryMode uint8

connLock sync.RWMutex
}

func newAMQP09Writer(mgr bundle.NewManagement, conf output.AMQPConfig, log log.Modular) (*amqp09Writer, error) {
var timeout time.Duration
if tout := conf.Timeout; len(tout) > 0 {
var err error
if timeout, err = time.ParseDuration(tout); err != nil {
return nil, fmt.Errorf("failed to parse timeout period string: %v", err)
}
}
a := amqp09Writer{
log: log,
conf: conf,
deliveryMode: amqp.Transient,
timeout: timeout,
}
var err error
if a.metaFilter, err = conf.Metadata.Filter(); err != nil {
Expand Down Expand Up @@ -223,7 +233,7 @@ func (a *amqp09Writer) disconnect() error {
return nil
}

func (a *amqp09Writer) WriteWithContext(ctx context.Context, msg *message.Batch) error {
func (a *amqp09Writer) WriteWithContext(wctx context.Context, msg *message.Batch) error {
a.connLock.RLock()
conn := a.conn
amqpChan := a.amqpChan
Expand All @@ -234,6 +244,17 @@ func (a *amqp09Writer) WriteWithContext(ctx context.Context, msg *message.Batch)
return component.ErrNotConnected
}

var ctx context.Context
if a.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(
wctx, a.timeout,
)
defer cancel()
} else {
ctx = wctx
}

return output.IterateBatchedSend(msg, func(i int, p *message.Part) error {
bindingKey := strings.ReplaceAll(a.key.String(i, msg), "/", ".")
msgType := strings.ReplaceAll(a.msgType.String(i, msg), "/", ".")
Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/outputs/amqp_0_9.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ output:
persistent: false
mandatory: false
immediate: false
timeout: ""
tls:
enabled: false
skip_cert_verify: false
Expand Down Expand Up @@ -263,6 +264,14 @@ Whether to set the immediate flag on published messages. When set if there are n
Type: `bool`
Default: `false`

### `timeout`

The maximum period to wait before abandoning it and reattempting. If not set, wait indefinitely.


Type: `string`
Default: `""`

### `tls`

Custom TLS settings can be used to override system defaults.
Expand Down

0 comments on commit a8d9122

Please sign in to comment.