diff --git a/.chloggen/elasticsearchexporter_support_logstash_index_format.yaml b/.chloggen/elasticsearchexporter_support_logstash_index_format.yaml new file mode 100755 index 000000000000..dedf91eb324d --- /dev/null +++ b/.chloggen/elasticsearchexporter_support_logstash_index_format.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Logstash format compatibility. Traces or Logs data can be written into an index in logstash format. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29624] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index 5663b42eec9c..db14fe96c32e 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -436,6 +436,7 @@ require ( github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect github.com/leoluk/perflib_exporter v0.2.1 // indirect + github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/lib/pq v1.10.9 // indirect github.com/lightstep/go-expohisto v1.0.0 // indirect github.com/linkedin/goavro/v2 v2.9.8 // indirect diff --git a/cmd/configschema/go.sum b/cmd/configschema/go.sum index cfe6c7d824d6..5bfdac8f6cba 100644 --- a/cmd/configschema/go.sum +++ b/cmd/configschema/go.sum @@ -1077,6 +1077,10 @@ github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b h1:11UHH39 github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b/go.mod h1:WZxr2/6a/Ar9bMDc2rN/LJrE/hF6bXE4LPyDSIxwAfg= github.com/leoluk/perflib_exporter v0.2.1 h1:/3/ut1k/jFt5p4ypjLZKDHDqlXAK6ERZPVWtwdI389I= github.com/leoluk/perflib_exporter v0.2.1/go.mod h1:MinSWm88jguXFFrGsP56PtleUb4Qtm4tNRH/wXNXRTI= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= +github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ= +github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 4bb0921bf915..52812aafbdcc 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -484,6 +484,7 @@ require ( github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect github.com/leoluk/perflib_exporter v0.2.1 // indirect + github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/lib/pq v1.10.9 // indirect github.com/lightstep/go-expohisto v1.0.0 // indirect github.com/linkedin/goavro/v2 v2.9.8 // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index 4e12c1d0eb6f..221f5195eaa9 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -1073,6 +1073,10 @@ github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 h1:bCiVCRC github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165/go.mod h1:WZxr2/6a/Ar9bMDc2rN/LJrE/hF6bXE4LPyDSIxwAfg= github.com/leoluk/perflib_exporter v0.2.1 h1:/3/ut1k/jFt5p4ypjLZKDHDqlXAK6ERZPVWtwdI389I= github.com/leoluk/perflib_exporter v0.2.1/go.mod h1:MinSWm88jguXFFrGsP56PtleUb4Qtm4tNRH/wXNXRTI= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= +github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ= +github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 1a5d2e0f02c8..caed01f44022 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -44,6 +44,12 @@ This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www takes resource or span attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` resulting dynamically prefixed / suffixed indexing based on `traces_index`. (priority: resource attribute > span attribute) - `enabled`(default=false): Enable/Disable dynamic index for trace spans +- `logstash_format` (optional): Logstash format compatibility. Traces or Logs data can be written into an index in logstash format. + - `enabled`(default=false): Enable/Disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `traces/logs_index` or `traces/logs_dynamic_index` as prefix and the date, + e.g: If `traces/logs_index` or `traces/logs_dynamic_index` is equals to `otlp-generic-default` your index will become `otlp-generic-default-YYYY.MM.DD`. + The last string appended belongs to the date when the data is being generated. + - `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date. + - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. - `pipeline` (optional): Optional [Ingest Node](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html) pipeline ID used for processing documents published by the exporter. - `flush`: Event bulk buffer flush settings diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 460d605fe92c..2240bf3acd2a 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -57,10 +57,17 @@ type Config struct { Pipeline string `mapstructure:"pipeline"` HTTPClientSettings `mapstructure:",squash"` - Discovery DiscoverySettings `mapstructure:"discover"` - Retry RetrySettings `mapstructure:"retry"` - Flush FlushSettings `mapstructure:"flush"` - Mapping MappingsSettings `mapstructure:"mapping"` + Discovery DiscoverySettings `mapstructure:"discover"` + Retry RetrySettings `mapstructure:"retry"` + Flush FlushSettings `mapstructure:"flush"` + Mapping MappingsSettings `mapstructure:"mapping"` + LogstashFormat LogstashFormatSettings `mapstructure:"logstash_format"` +} + +type LogstashFormatSettings struct { + Enabled bool `mapstructure:"enabled"` + PrefixSeparator string `mapstructure:"prefix_separator"` + DateFormat string `mapstructure:"date_format"` } type DynamicIndexSetting struct { diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index bfe9e638e8e6..91ccfe638ef8 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -67,6 +67,11 @@ func TestLoad_DeprecatedIndexConfigOption(t *testing.T) { Dedup: true, Dedot: true, }, + LogstashFormat: LogstashFormatSettings{ + Enabled: false, + PrefixSeparator: "-", + DateFormat: "%Y.%m.%d", + }, }) } @@ -76,6 +81,10 @@ func TestLoadConfig(t *testing.T) { defaultCfg := createDefaultConfig() defaultCfg.(*Config).Endpoints = []string{"https://elastic.example.com:9200"} + defaultLogstashFormatCfg := createDefaultConfig() + defaultLogstashFormatCfg.(*Config).Endpoints = []string{"http://localhost:9200"} + defaultLogstashFormatCfg.(*Config).LogstashFormat.Enabled = true + tests := []struct { configFile string id component.ID @@ -129,6 +138,11 @@ func TestLoadConfig(t *testing.T) { Dedup: true, Dedot: true, }, + LogstashFormat: LogstashFormatSettings{ + Enabled: false, + PrefixSeparator: "-", + DateFormat: "%Y.%m.%d", + }, }, }, { @@ -174,8 +188,18 @@ func TestLoadConfig(t *testing.T) { Dedup: true, Dedot: true, }, + LogstashFormat: LogstashFormatSettings{ + Enabled: false, + PrefixSeparator: "-", + DateFormat: "%Y.%m.%d", + }, }, }, + { + id: component.NewIDWithName(metadata.Type, "logstash_format"), + configFile: "config.yaml", + expected: defaultLogstashFormatCfg, + }, } for _, tt := range tests { diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 1f951700d51b..8c49dc87ed46 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -55,6 +55,11 @@ func createDefaultConfig() component.Config { Dedup: true, Dedot: true, }, + LogstashFormat: LogstashFormatSettings{ + Enabled: false, + PrefixSeparator: "-", + DateFormat: "%Y.%m.%d", + }, } } @@ -71,17 +76,17 @@ func createLogsExporter( set.Logger.Warn("index option are deprecated and replaced with logs_index and traces_index.") } - exporter, err := newLogsExporter(set.Logger, cf) + logsExporter, err := newLogsExporter(set.Logger, cf) if err != nil { - return nil, fmt.Errorf("cannot configure Elasticsearch logs exporter: %w", err) + return nil, fmt.Errorf("cannot configure Elasticsearch logsExporter: %w", err) } return exporterhelper.NewLogsExporter( ctx, set, cfg, - exporter.pushLogsData, - exporterhelper.WithShutdown(exporter.Shutdown), + logsExporter.pushLogsData, + exporterhelper.WithShutdown(logsExporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings), ) } @@ -91,15 +96,15 @@ func createTracesExporter(ctx context.Context, cfg component.Config) (exporter.Traces, error) { cf := cfg.(*Config) - exporter, err := newTracesExporter(set.Logger, cf) + tracesExporter, err := newTracesExporter(set.Logger, cf) if err != nil { - return nil, fmt.Errorf("cannot configure Elasticsearch traces exporter: %w", err) + return nil, fmt.Errorf("cannot configure Elasticsearch tracesExporter: %w", err) } return exporterhelper.NewTracesExporter( ctx, set, cfg, - exporter.pushTraceData, - exporterhelper.WithShutdown(exporter.Shutdown), + tracesExporter.pushTraceData, + exporterhelper.WithShutdown(tracesExporter.Shutdown), exporterhelper.WithQueue(cf.QueueSettings)) } diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index 2174f258e5c2..edc477a8fe72 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-structform v0.0.10 + github.com/lestrrat-go/strftime v1.0.6 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.90.1 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.90.1 github.com/stretchr/testify v1.8.4 @@ -34,6 +35,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.90.2-0.20231201205146-6e2fdc755b34 // indirect diff --git a/exporter/elasticsearchexporter/go.sum b/exporter/elasticsearchexporter/go.sum index 5c3e621fa667..25da71e123af 100644 --- a/exporter/elasticsearchexporter/go.sum +++ b/exporter/elasticsearchexporter/go.sum @@ -67,6 +67,10 @@ github.com/knadh/koanf/v2 v2.0.1 h1:1dYGITt1I23x8cfx8ZnldtezdyaZtfAuRtIFOiRzK7g= github.com/knadh/koanf/v2 v2.0.1/go.mod h1:ZeiIlIDXTE7w1lMT6UVcNiRAS2/rCeLn/GdLNvY1Dus= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= +github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ= +github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= @@ -79,6 +83,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= diff --git a/exporter/elasticsearchexporter/logs_exporter.go b/exporter/elasticsearchexporter/logs_exporter.go index 7511988bc2ba..b74a5f942ae2 100644 --- a/exporter/elasticsearchexporter/logs_exporter.go +++ b/exporter/elasticsearchexporter/logs_exporter.go @@ -9,6 +9,7 @@ import ( "context" "errors" "fmt" + "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -18,9 +19,10 @@ import ( type elasticsearchLogsExporter struct { logger *zap.Logger - index string - dynamicIndex bool - maxAttempts int + index string + logstashFormat LogstashFormatSettings + dynamicIndex bool + maxAttempts int client *esClientCurrent bulkIndexer esBulkIndexerCurrent @@ -62,10 +64,11 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporte client: client, bulkIndexer: bulkIndexer, - index: indexStr, - dynamicIndex: cfg.LogsDynamicIndex.Enabled, - maxAttempts: maxAttempts, - model: model, + index: indexStr, + dynamicIndex: cfg.LogsDynamicIndex.Enabled, + maxAttempts: maxAttempts, + model: model, + logstashFormat: cfg.LogstashFormat, } return esLogsExp, nil } @@ -109,6 +112,14 @@ func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) } + if e.logstashFormat.Enabled { + formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) + if err != nil { + return err + } + fIndex = formattedIndex + } + document, err := e.model.encodeLog(resource, record, scope) if err != nil { return fmt.Errorf("Failed to encode log event: %w", err) diff --git a/exporter/elasticsearchexporter/logs_exporter_test.go b/exporter/elasticsearchexporter/logs_exporter_test.go index 48fa8a06cc0b..c7ae40c8783c 100644 --- a/exporter/elasticsearchexporter/logs_exporter_test.go +++ b/exporter/elasticsearchexporter/logs_exporter_test.go @@ -10,6 +10,7 @@ import ( "fmt" "net/http" "runtime" + "strings" "sync" "sync/atomic" "testing" @@ -210,6 +211,80 @@ func TestExporter_PushEvent(t *testing.T) { rec.WaitItems(1) }) + t.Run("publish with logstash index format enabled and dynamic index disabled", func(t *testing.T) { + var defaultCfg Config + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.Nil(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.Nil(t, err) + + create := jsonVal["create"].(map[string]any) + + assert.Equal(t, strings.Contains(create["_index"].(string), defaultCfg.LogsIndex), true) + + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogstashFormat.Enabled = true + cfg.LogsIndex = "not-used-index" + defaultCfg = *cfg + }) + + mustSendLogsWithAttributes(t, exporter, nil, nil) + + rec.WaitItems(1) + }) + + t.Run("publish with logstash index format enabled and dynamic index enabled", func(t *testing.T) { + var ( + prefix = "resprefix-" + suffix = "-attrsuffix" + index = "someindex" + ) + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.Nil(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.Nil(t, err) + + create := jsonVal["create"].(map[string]any) + expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) + + assert.Equal(t, strings.Contains(create["_index"].(string), expected), true) + + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsIndex = index + cfg.LogsDynamicIndex.Enabled = true + cfg.LogstashFormat.Enabled = true + }) + + mustSendLogsWithAttributes(t, exporter, + map[string]string{ + indexPrefix: "attrprefix-", + indexSuffix: suffix, + }, + map[string]string{ + indexPrefix: prefix, + }, + ) + rec.WaitItems(1) + }) + t.Run("retry http request", func(t *testing.T) { failures := 0 rec := newBulkRecorder() diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index 60ba25ebb476..792640222adb 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -40,3 +40,7 @@ elasticsearch/log: max_requests: 5 sending_queue: enabled: true +elasticsearch/logstash_format: + endpoints: [http://localhost:9200] + logstash_format: + enabled: true diff --git a/exporter/elasticsearchexporter/trace_exporter.go b/exporter/elasticsearchexporter/trace_exporter.go index ef421951ed4c..bfa3c485271f 100644 --- a/exporter/elasticsearchexporter/trace_exporter.go +++ b/exporter/elasticsearchexporter/trace_exporter.go @@ -9,6 +9,7 @@ import ( "context" "errors" "fmt" + "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" @@ -18,9 +19,10 @@ import ( type elasticsearchTracesExporter struct { logger *zap.Logger - index string - dynamicIndex bool - maxAttempts int + index string + logstashFormat LogstashFormatSettings + dynamicIndex bool + maxAttempts int client *esClientCurrent bulkIndexer esBulkIndexerCurrent @@ -54,10 +56,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExp client: client, bulkIndexer: bulkIndexer, - index: cfg.TracesIndex, - dynamicIndex: cfg.TracesDynamicIndex.Enabled, - maxAttempts: maxAttempts, - model: model, + index: cfg.TracesIndex, + dynamicIndex: cfg.TracesDynamicIndex.Enabled, + maxAttempts: maxAttempts, + model: model, + logstashFormat: cfg.LogstashFormat, }, nil } @@ -101,6 +104,14 @@ func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resou fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) } + if e.logstashFormat.Enabled { + formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) + if err != nil { + return err + } + fIndex = formattedIndex + } + document, err := e.model.encodeSpan(resource, span, scope) if err != nil { return fmt.Errorf("Failed to encode trace record: %w", err) diff --git a/exporter/elasticsearchexporter/traces_exporter_test.go b/exporter/elasticsearchexporter/traces_exporter_test.go index 7c40be6b4d61..8cbc5c3e640b 100644 --- a/exporter/elasticsearchexporter/traces_exporter_test.go +++ b/exporter/elasticsearchexporter/traces_exporter_test.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "runtime" + "strings" "sync" "sync/atomic" "testing" @@ -200,6 +201,82 @@ func TestExporter_PushTraceRecord(t *testing.T) { rec.WaitItems(1) }) + t.Run("publish with logstash format index", func(t *testing.T) { + var defaultCfg Config + + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.Nil(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.Nil(t, err) + + create := jsonVal["create"].(map[string]any) + + assert.Equal(t, strings.Contains(create["_index"].(string), defaultCfg.TracesIndex), true) + + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.LogstashFormat.Enabled = true + cfg.TracesIndex = "not-used-index" + defaultCfg = *cfg + }) + + mustSendTracesWithAttributes(t, exporter, nil, nil) + + rec.WaitItems(1) + }) + + t.Run("publish with logstash format index and dynamic index enabled ", func(t *testing.T) { + var ( + prefix = "resprefix-" + suffix = "-attrsuffix" + index = "someindex" + ) + + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + data, err := docs[0].Action.MarshalJSON() + assert.Nil(t, err) + + jsonVal := map[string]any{} + err = json.Unmarshal(data, &jsonVal) + assert.Nil(t, err) + + create := jsonVal["create"].(map[string]any) + expected := fmt.Sprintf("%s%s%s", prefix, index, suffix) + + assert.Equal(t, strings.Contains(create["_index"].(string), expected), true) + + return itemsAllOK(docs) + }) + + exporter := newTestTracesExporter(t, server.URL, func(cfg *Config) { + cfg.TracesIndex = index + cfg.TracesDynamicIndex.Enabled = true + cfg.LogstashFormat.Enabled = true + }) + + mustSendTracesWithAttributes(t, exporter, + map[string]string{ + indexPrefix: "attrprefix-", + indexSuffix: suffix, + }, + map[string]string{ + indexPrefix: prefix, + }, + ) + rec.WaitItems(1) + }) + t.Run("retry http request", func(t *testing.T) { failures := 0 rec := newBulkRecorder() diff --git a/exporter/elasticsearchexporter/util.go b/exporter/elasticsearchexporter/util.go new file mode 100644 index 000000000000..e5b398082b3b --- /dev/null +++ b/exporter/elasticsearchexporter/util.go @@ -0,0 +1,28 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + +import ( + "bytes" + "fmt" + "time" + + "github.com/lestrrat-go/strftime" +) + +func generateIndexWithLogstashFormat(index string, conf *LogstashFormatSettings, t time.Time) (string, error) { + if conf.Enabled { + partIndex := fmt.Sprintf("%s%s", index, conf.PrefixSeparator) + var buf bytes.Buffer + p, err := strftime.New(fmt.Sprintf("%s%s", partIndex, conf.DateFormat)) + if err != nil { + return partIndex, err + } + if err = p.Format(&buf, t); err != nil { + return partIndex, err + } + index = buf.String() + } + return index, nil +} diff --git a/exporter/elasticsearchexporter/utils_test.go b/exporter/elasticsearchexporter/utils_test.go index 3309d9fb9a26..8d58441edb1c 100644 --- a/exporter/elasticsearchexporter/utils_test.go +++ b/exporter/elasticsearchexporter/utils_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" @@ -251,3 +252,28 @@ func fillResourceAttributeMap(attrs pcommon.Map, mp map[string]string) { attrs.PutStr(k, v) } } + +func TestGetSuffixTime(t *testing.T) { + defaultCfg := createDefaultConfig().(*Config) + defaultCfg.LogstashFormat.Enabled = true + testTime := time.Date(2023, 12, 2, 10, 10, 10, 1, time.UTC) + index, err := generateIndexWithLogstashFormat(defaultCfg.LogsIndex, &defaultCfg.LogstashFormat, testTime) + assert.Nil(t, err) + assert.Equal(t, index, "logs-generic-default-2023.12.02") + + defaultCfg.LogsIndex = "logstash" + defaultCfg.LogstashFormat.PrefixSeparator = "." + otelLogsIndex, err := generateIndexWithLogstashFormat(defaultCfg.LogsIndex, &defaultCfg.LogstashFormat, testTime) + assert.Nil(t, err) + assert.Equal(t, otelLogsIndex, "logstash.2023.12.02") + + defaultCfg.LogstashFormat.DateFormat = "%Y-%m-%d" + newOtelLogsIndex, err := generateIndexWithLogstashFormat(defaultCfg.LogsIndex, &defaultCfg.LogstashFormat, testTime) + assert.Nil(t, err) + assert.Equal(t, newOtelLogsIndex, "logstash.2023-12-02") + + defaultCfg.LogstashFormat.DateFormat = "%d/%m/%Y" + newOtelLogsIndexWithSpecDataFormat, err := generateIndexWithLogstashFormat(defaultCfg.LogsIndex, &defaultCfg.LogstashFormat, testTime) + assert.Nil(t, err) + assert.Equal(t, newOtelLogsIndexWithSpecDataFormat, "logstash.02/12/2023") +} diff --git a/go.mod b/go.mod index fd493d9a379e..50b4ed0c2292 100644 --- a/go.mod +++ b/go.mod @@ -464,6 +464,7 @@ require ( github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect github.com/leoluk/perflib_exporter v0.2.1 // indirect + github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/lib/pq v1.10.9 // indirect github.com/lightstep/go-expohisto v1.0.0 // indirect github.com/linkedin/goavro/v2 v2.9.8 // indirect diff --git a/go.sum b/go.sum index f2217273369b..0f5190965ac2 100644 --- a/go.sum +++ b/go.sum @@ -1082,6 +1082,10 @@ github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 h1:bCiVCRC github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165/go.mod h1:WZxr2/6a/Ar9bMDc2rN/LJrE/hF6bXE4LPyDSIxwAfg= github.com/leoluk/perflib_exporter v0.2.1 h1:/3/ut1k/jFt5p4ypjLZKDHDqlXAK6ERZPVWtwdI389I= github.com/leoluk/perflib_exporter v0.2.1/go.mod h1:MinSWm88jguXFFrGsP56PtleUb4Qtm4tNRH/wXNXRTI= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= +github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ= +github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=