Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Logstash format compatibility #29625

Merged
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Logstash format compatibility. Traces or Logs data can be written into an index in logstash format.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29624]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect
github.com/leoluk/perflib_exporter v0.2.1 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/lightstep/go-expohisto v1.0.0 // indirect
github.com/linkedin/goavro/v2 v2.9.8 // indirect
Expand Down
4 changes: 4 additions & 0 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect
github.com/leoluk/perflib_exporter v0.2.1 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/lightstep/go-expohisto v1.0.0 // indirect
github.com/linkedin/goavro/v2 v2.9.8 // indirect
Expand Down
4 changes: 4 additions & 0 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www
takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix`
resulting dynamically prefixed / suffixed indexing based on `traces_index`. (priority: resource attribute > span attribute)
- `enabled`(default=false): Enable/Disable dynamic index for trace spans
- `logstash_format` (optional): Logstash format compatibility. Traces or Logs data can be written into an index in logstash format.
- `enabled`(default=false): Enable/Disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `traces/logs_index` or `traces/logs_dynamic_index` as prefix and the date,
e.g: If `traces/logs_index` or `traces/logs_dynamic_index` is equals to `otlp-generic-default` your index will become `otlp-generic-default-YYYY.MM.DD`.
The last string appended belongs to the date when the data is being generated.
- `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date.
- `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name.
- `pipeline` (optional): Optional [Ingest Node](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html)
pipeline ID used for processing documents published by the exporter.
- `flush`: Event bulk buffer flush settings
Expand Down
15 changes: 11 additions & 4 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,17 @@ type Config struct {
Pipeline string `mapstructure:"pipeline"`

HTTPClientSettings `mapstructure:",squash"`
Discovery DiscoverySettings `mapstructure:"discover"`
Retry RetrySettings `mapstructure:"retry"`
Flush FlushSettings `mapstructure:"flush"`
Mapping MappingsSettings `mapstructure:"mapping"`
Discovery DiscoverySettings `mapstructure:"discover"`
Retry RetrySettings `mapstructure:"retry"`
Flush FlushSettings `mapstructure:"flush"`
Mapping MappingsSettings `mapstructure:"mapping"`
LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"`
}

type LogstashFormatSettings struct {
Enabled bool `mapstructure:"enabled"`
PrefixSeparator string `mapstructure:"prefix_separator"`
DateFormat string `mapstructure:"date_format"`
}

type DynamicIndexSetting struct {
Expand Down
24 changes: 24 additions & 0 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func TestLoad_DeprecatedIndexConfigOption(t *testing.T) {
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
})
}

Expand All @@ -76,6 +81,10 @@ func TestLoadConfig(t *testing.T) {
defaultCfg := createDefaultConfig()
defaultCfg.(*Config).Endpoints = []string{"https://elastic.example.com:9200"}

defaultLogstashFormatCfg := createDefaultConfig()
defaultLogstashFormatCfg.(*Config).Endpoints = []string{"http://localhost:9200"}
defaultLogstashFormatCfg.(*Config).LogstashFormat.Enabled = true

tests := []struct {
configFile string
id component.ID
Expand Down Expand Up @@ -129,6 +138,11 @@ func TestLoadConfig(t *testing.T) {
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
},
},
{
Expand Down Expand Up @@ -174,8 +188,18 @@ func TestLoadConfig(t *testing.T) {
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
},
},
{
id: component.NewIDWithName(metadata.Type, "logstash_format"),
configFile: "config.yaml",
expected: defaultLogstashFormatCfg,
},
}

for _, tt := range tests {
Expand Down
21 changes: 13 additions & 8 deletions exporter/elasticsearchexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func createDefaultConfig() component.Config {
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
}
}

Expand All @@ -71,17 +76,17 @@ func createLogsExporter(
set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.")
}

exporter, err := newLogsExporter(set.Logger, cf)
logsExporter, err := newLogsExporter(set.Logger, cf)
if err != nil {
return nil, fmt.Errorf("cannot configure Elasticsearch logs exporter: %w", err)
return nil, fmt.Errorf("cannot configure Elasticsearch logsExporter: %w", err)
}

return exporterhelper.NewLogsExporter(
ctx,
set,
cfg,
exporter.pushLogsData,
exporterhelper.WithShutdown(exporter.Shutdown),
logsExporter.pushLogsData,
exporterhelper.WithShutdown(logsExporter.Shutdown),
exporterhelper.WithQueue(cf.QueueSettings),
)
}
Expand All @@ -91,15 +96,15 @@ func createTracesExporter(ctx context.Context,
cfg component.Config) (exporter.Traces, error) {

cf := cfg.(*Config)
exporter, err := newTracesExporter(set.Logger, cf)
tracesExporter, err := newTracesExporter(set.Logger, cf)
if err != nil {
return nil, fmt.Errorf("cannot configure Elasticsearch traces exporter: %w", err)
return nil, fmt.Errorf("cannot configure Elasticsearch tracesExporter: %w", err)
}
return exporterhelper.NewTracesExporter(
ctx,
set,
cfg,
exporter.pushTraceData,
exporterhelper.WithShutdown(exporter.Shutdown),
tracesExporter.pushTraceData,
exporterhelper.WithShutdown(tracesExporter.Shutdown),
exporterhelper.WithQueue(cf.QueueSettings))
}
2 changes: 2 additions & 0 deletions exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/elastic/go-structform v0.0.10
github.com/lestrrat-go/strftime v1.0.6
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.90.1
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.90.1
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -34,6 +35,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector v0.90.2-0.20231201205146-6e2fdc755b34 // indirect
Expand Down
6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 18 additions & 7 deletions exporter/elasticsearchexporter/logs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -18,9 +19,10 @@ import (
type elasticsearchLogsExporter struct {
logger *zap.Logger

index string
dynamicIndex bool
maxAttempts int
index string
logstashFormat LogstashFormatSettings
dynamicIndex bool
maxAttempts int

client *esClientCurrent
bulkIndexer esBulkIndexerCurrent
Expand Down Expand Up @@ -62,10 +64,11 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporte
client: client,
bulkIndexer: bulkIndexer,

index: indexStr,
dynamicIndex: cfg.LogsDynamicIndex.Enabled,
maxAttempts: maxAttempts,
model: model,
index: indexStr,
dynamicIndex: cfg.LogsDynamicIndex.Enabled,
maxAttempts: maxAttempts,
model: model,
logstashFormat: cfg.LogstashFormat,
}
return esLogsExp, nil
}
Expand Down Expand Up @@ -109,6 +112,14 @@ func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource
fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}

if e.logstashFormat.Enabled {
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now())
if err != nil {
return err
}
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, record, scope)
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
Expand Down
75 changes: 75 additions & 0 deletions exporter/elasticsearchexporter/logs_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net/http"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -210,6 +211,80 @@ func TestExporter_PushEvent(t *testing.T) {
rec.WaitItems(1)
})

t.Run("publish with logstash index format enabled and dynamic index disabled", func(t *testing.T) {
var defaultCfg Config
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)

data, err := docs[0].Action.MarshalJSON()
assert.Nil(t, err)

jsonVal := map[string]any{}
err = json.Unmarshal(data, &jsonVal)
assert.Nil(t, err)

create := jsonVal["create"].(map[string]any)

assert.Equal(t, strings.Contains(create["_index"].(string), defaultCfg.LogsIndex), true)

return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.LogstashFormat.Enabled = true
cfg.LogsIndex = "not-used-index"
defaultCfg = *cfg
})

mustSendLogsWithAttributes(t, exporter, nil, nil)

rec.WaitItems(1)
})

t.Run("publish with logstash index format enabled and dynamic index enabled", func(t *testing.T) {
var (
prefix = "resprefix-"
suffix = "-attrsuffix"
index = "someindex"
)
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)

data, err := docs[0].Action.MarshalJSON()
assert.Nil(t, err)

jsonVal := map[string]any{}
err = json.Unmarshal(data, &jsonVal)
assert.Nil(t, err)

create := jsonVal["create"].(map[string]any)
expected := fmt.Sprintf("%s%s%s", prefix, index, suffix)

assert.Equal(t, strings.Contains(create["_index"].(string), expected), true)

return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.LogsIndex = index
cfg.LogsDynamicIndex.Enabled = true
cfg.LogstashFormat.Enabled = true
})

mustSendLogsWithAttributes(t, exporter,
map[string]string{
indexPrefix: "attrprefix-",
indexSuffix: suffix,
},
map[string]string{
indexPrefix: prefix,
},
)
rec.WaitItems(1)
})

t.Run("retry http request", func(t *testing.T) {
failures := 0
rec := newBulkRecorder()
Expand Down
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ elasticsearch/log:
max_requests: 5
sending_queue:
enabled: true
elasticsearch/logstash_format:
endpoints: [http://localhost:9200]
logstash_format:
enabled: true
Loading