From 83e501f11f4de96e7c2c7239bd77010b0e8b8600 Mon Sep 17 00:00:00 2001 From: Jean-Yves NOLEN Date: Sat, 4 Jan 2025 00:28:13 +0100 Subject: [PATCH] feat: Add Sending Queue Option for OpenSearch (#33919) **Description:** Add Sending Queue to enable persistent queue in case of upstream failure **Documentation:** Add the options in the exporter README --- .../feature_opensearch-sending-queue.yaml | 21 +++++++++++++++++++ exporter/opensearchexporter/README.md | 3 +++ exporter/opensearchexporter/config.go | 1 + exporter/opensearchexporter/factory.go | 2 ++ 4 files changed, 27 insertions(+) create mode 100644 .chloggen/feature_opensearch-sending-queue.yaml diff --git a/.chloggen/feature_opensearch-sending-queue.yaml b/.chloggen/feature_opensearch-sending-queue.yaml new file mode 100644 index 000000000000..a7bbdd779411 --- /dev/null +++ b/.chloggen/feature_opensearch-sending-queue.yaml @@ -0,0 +1,21 @@ +# Use this changelog template to create an entry for release notes. +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opensearchexporter +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add Sending Queue to enable persistent queue in case of upstream failure" +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33919] +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/exporter/opensearchexporter/README.md b/exporter/opensearchexporter/README.md index 21f5cc63d350..cdc862798c89 100644 --- a/exporter/opensearchexporter/README.md +++ b/exporter/opensearchexporter/README.md @@ -38,6 +38,9 @@ Supports standard TLS settings as part of HTTP settings. See [TLS Configuration/ ### Retry Options - `retry_on_failure`: See [retry_on_failure](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) +### Sending Queue Options +- `sending_queue`: See [sending_queue](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) + ### Timeout Options - `timeout` : See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) diff --git a/exporter/opensearchexporter/config.go b/exporter/opensearchexporter/config.go index e31fdb37f807..b3108fa2a747 100644 --- a/exporter/opensearchexporter/config.go +++ b/exporter/opensearchexporter/config.go @@ -32,6 +32,7 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` MappingsSettings `mapstructure:"mapping"` + QueueConfig exporterhelper.QueueConfig `mapstructure:"sending_queue"` // The Observability indices would follow the recommended for immutable data stream ingestion pattern using // the data_stream concepts. See https://opensearch.org/docs/latest/dashboards/im-dashboards/datastream/ diff --git a/exporter/opensearchexporter/factory.go b/exporter/opensearchexporter/factory.go index 4e577884f20c..55e3e19520eb 100644 --- a/exporter/opensearchexporter/factory.go +++ b/exporter/opensearchexporter/factory.go @@ -51,6 +51,7 @@ func createTracesExporter(ctx context.Context, exporterhelper.WithStart(te.Start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithRetry(c.BackOffConfig), + exporterhelper.WithQueue(c.QueueConfig), exporterhelper.WithTimeout(c.TimeoutSettings)) } @@ -66,5 +67,6 @@ func createLogsExporter(ctx context.Context, exporterhelper.WithStart(le.Start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithRetry(c.BackOffConfig), + exporterhelper.WithQueue(c.QueueConfig), exporterhelper.WithTimeout(c.TimeoutSettings)) }