diff --git a/.chloggen/elasticsearchexporter_index-error-hint.yaml b/.chloggen/elasticsearchexporter_index-error-hint.yaml new file mode 100644 index 000000000000..fad70f77154f --- /dev/null +++ b/.chloggen/elasticsearchexporter_index-error-hint.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add hint in error logs for TSDB version_conflict_engine_exception error + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35546] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 31d2cae89c6f..1f7c9f5fb9c5 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -337,4 +337,14 @@ Otherwise, it is mapped to an empty string (""). #### `@timestamp` -In case the record contains `timestamp`, this value is used. Otherwise, the `observed timestamp` is used. \ No newline at end of file +In case the record contains `timestamp`, this value is used. Otherwise, the `observed timestamp` is used. + +## Known issues + +### version_conflict_engine_exception + +When sending high traffic of metrics to a TSDB metrics data stream, e.g. using OTel mapping mode to a 8.16 Elasticsearch, it is possible to get error logs "failed to index document" with `error.type` "version_conflict_engine_exception" and `error.reason` containing "version conflict, document already exists". It is due to Elasticsearch grouping metrics with the same dimensions, whether it is the same or different metric name, using `@timestamp` in milliseconds precision as opposed to nanoseconds in elasticsearchexporter. + +This will be fixed in a future version of Elasticsearch. A possible workaround would be to use a transform processor to truncate the timestamp, but this will cause duplicate data to be dropped silently. + +However, if `@timestamp` precision is not the problem, check your metrics pipeline setup for misconfiguration that causes an actual violation of the [single writer principle](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#single-writer). diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 93e909cc3a3b..471ddc2dc7b9 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -8,6 +8,7 @@ import ( "errors" "io" "runtime" + "strings" "sync" "sync/atomic" "time" @@ -329,12 +330,22 @@ func flushBulkIndexer( logger.Error("bulk indexer flush error", zap.Error(err)) } for _, resp := range stat.FailedDocs { - logger.Error( - "failed to index document", + fields := []zap.Field{ zap.String("index", resp.Index), zap.String("error.type", resp.Error.Type), zap.String("error.reason", resp.Error.Reason), - ) + } + if hint := getErrorHint(resp.Index, resp.Error.Type); hint != "" { + fields = append(fields, zap.String("hint", hint)) + } + logger.Error("failed to index document", fields...) } return stat, err } + +func getErrorHint(index, errorType string) string { + if strings.HasPrefix(index, ".ds-metrics-") && errorType == "version_conflict_engine_exception" { + return "check the \"Known issues\" section of Elasticsearch Exporter docs" + } + return "" +} diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index a70cdecc25dd..b417942734d6 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -119,6 +119,8 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { tests := []struct { name string roundTripFunc func(*http.Request) (*http.Response, error) + wantMessage string + wantFields []zap.Field }{ { name: "500", @@ -129,6 +131,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { Body: io.NopCloser(strings.NewReader("error")), }, nil }, + wantMessage: "bulk indexer flush error", }, { name: "429", @@ -139,12 +142,27 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { Body: io.NopCloser(strings.NewReader("error")), }, nil }, + wantMessage: "bulk indexer flush error", }, { name: "transport error", roundTripFunc: func(*http.Request) (*http.Response, error) { return nil, errors.New("transport error") }, + wantMessage: "bulk indexer flush error", + }, + { + name: "known version conflict error", + roundTripFunc: func(*http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: 200, + Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, + Body: io.NopCloser(strings.NewReader( + `{"items":[{"create":{"_index":".ds-metrics-generic.otel-default","status":400,"error":{"type":"version_conflict_engine_exception","reason":""}}}]}`)), + }, nil + }, + wantMessage: "failed to index document", + wantFields: []zap.Field{zap.String("hint", "check the \"Known issues\" section of Elasticsearch Exporter docs")}, }, } @@ -169,7 +187,11 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) assert.NoError(t, bulkIndexer.Close(context.Background())) - assert.Equal(t, 1, observed.FilterMessage("bulk indexer flush error").Len()) + messages := observed.FilterMessage(tt.wantMessage) + require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All()) + for _, wantField := range tt.wantFields { + assert.Equal(t, 1, messages.FilterField(wantField).Len(), "message with field not found; observed.All()=%v", observed.All()) + } }) } }