diff --git a/CHANGELOG.md b/CHANGELOG.md index 92edbf821d..128cda3bb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file. - The `mongodb` processor and output default `write_concern.w_timeout` empty value no longer causes configuration issues. - Field `message_name` added to the logger config. - The `amqp_1` input and output should no longer spam logs with timeout errors during graceful termination. +- The `amqp_0_9` output now supports setting the `timeout` of publish. ## 4.4.1 - 2022-07-19 diff --git a/internal/component/output/config_amqp_0_9.go b/internal/component/output/config_amqp_0_9.go index 5e9726e6db..aea1d61455 100644 --- a/internal/component/output/config_amqp_0_9.go +++ b/internal/component/output/config_amqp_0_9.go @@ -30,6 +30,7 @@ type AMQPConfig struct { Mandatory bool `json:"mandatory" yaml:"mandatory"` Immediate bool `json:"immediate" yaml:"immediate"` TLS btls.Config `json:"tls" yaml:"tls"` + Timeout string `json:"timeout" yaml:"timeout"` } // NewAMQPConfig creates a new AMQPConfig with default values. @@ -53,5 +54,6 @@ func NewAMQPConfig() AMQPConfig { Mandatory: false, Immediate: false, TLS: btls.NewConfig(), + Timeout: "", } } diff --git a/internal/impl/amqp09/output.go b/internal/impl/amqp09/output.go index 10d656d8ec..89ab84d94f 100644 --- a/internal/impl/amqp09/output.go +++ b/internal/impl/amqp09/output.go @@ -79,6 +79,7 @@ The fields 'key' and 'type' can be dynamically set using function interpolations docs.FieldBool("persistent", "Whether message delivery should be persistent (transient by default).").Advanced().HasDefault(false), docs.FieldBool("mandatory", "Whether to set the mandatory flag on published messages. When set if a published message is routed to zero queues it is returned.").Advanced().HasDefault(false), docs.FieldBool("immediate", "Whether to set the immediate flag on published messages. When set if there are no ready consumers of a queue then the message is dropped instead of waiting.").Advanced().HasDefault(false), + docs.FieldString("timeout", "The maximum period to wait before abandoning it and reattempting. If not set, wait indefinitely.").Advanced().HasDefault(""), btls.FieldSpec(), ), Categories: []string{ @@ -107,6 +108,7 @@ type amqp09Writer struct { conn *amqp.Connection amqpChan *amqp.Channel returnChan <-chan amqp.Return + timeout time.Duration deliveryMode uint8 @@ -114,10 +116,18 @@ type amqp09Writer struct { } func newAMQP09Writer(mgr bundle.NewManagement, conf output.AMQPConfig, log log.Modular) (*amqp09Writer, error) { + var timeout time.Duration + if tout := conf.Timeout; len(tout) > 0 { + var err error + if timeout, err = time.ParseDuration(tout); err != nil { + return nil, fmt.Errorf("failed to parse timeout period string: %v", err) + } + } a := amqp09Writer{ log: log, conf: conf, deliveryMode: amqp.Transient, + timeout: timeout, } var err error if a.metaFilter, err = conf.Metadata.Filter(); err != nil { @@ -223,7 +233,7 @@ func (a *amqp09Writer) disconnect() error { return nil } -func (a *amqp09Writer) WriteWithContext(ctx context.Context, msg *message.Batch) error { +func (a *amqp09Writer) WriteWithContext(wctx context.Context, msg *message.Batch) error { a.connLock.RLock() conn := a.conn amqpChan := a.amqpChan @@ -234,6 +244,17 @@ func (a *amqp09Writer) WriteWithContext(ctx context.Context, msg *message.Batch) return component.ErrNotConnected } + var ctx context.Context + if a.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout( + wctx, a.timeout, + ) + defer cancel() + } else { + ctx = wctx + } + return output.IterateBatchedSend(msg, func(i int, p *message.Part) error { bindingKey := strings.ReplaceAll(a.key.String(i, msg), "/", ".") msgType := strings.ReplaceAll(a.msgType.String(i, msg), "/", ".") diff --git a/website/docs/components/outputs/amqp_0_9.md b/website/docs/components/outputs/amqp_0_9.md index 557234f1c3..2816dc061d 100644 --- a/website/docs/components/outputs/amqp_0_9.md +++ b/website/docs/components/outputs/amqp_0_9.md @@ -66,6 +66,7 @@ output: persistent: false mandatory: false immediate: false + timeout: "" tls: enabled: false skip_cert_verify: false @@ -263,6 +264,14 @@ Whether to set the immediate flag on published messages. When set if there are n Type: `bool` Default: `false` +### `timeout` + +The maximum period to wait before abandoning it and reattempting. If not set, wait indefinitely. + + +Type: `string` +Default: `""` + ### `tls` Custom TLS settings can be used to override system defaults.