Skip to content

Commit

Permalink
[exporter/elasticsearch] Add hint in error logs for TSDB version_conf…
Browse files Browse the repository at this point in the history
…lict_engine_exception error (open-telemetry#35548)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Add hint in error logs for TSDB version_conflict_engine_exception error

**Link to tracking Issue:** <Issue number if applicable>
Fixes open-telemetry#35546

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>
  • Loading branch information
carsonip authored and AkhigbeEromo committed Oct 9, 2024
1 parent c64a75e commit 48a25e0
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 5 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_index-error-hint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add hint in error logs for TSDB version_conflict_engine_exception error

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35546]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
12 changes: 11 additions & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,14 @@ Otherwise, it is mapped to an empty string ("").

#### `@timestamp`

In case the record contains `timestamp`, this value is used. Otherwise, the `observed timestamp` is used.
In case the record contains `timestamp`, this value is used. Otherwise, the `observed timestamp` is used.

## Known issues

### version_conflict_engine_exception

When sending high traffic of metrics to a TSDB metrics data stream, e.g. using OTel mapping mode to a 8.16 Elasticsearch, it is possible to get error logs "failed to index document" with `error.type` "version_conflict_engine_exception" and `error.reason` containing "version conflict, document already exists". It is due to Elasticsearch grouping metrics with the same dimensions, whether it is the same or different metric name, using `@timestamp` in milliseconds precision as opposed to nanoseconds in elasticsearchexporter.

This will be fixed in a future version of Elasticsearch. A possible workaround would be to use a transform processor to truncate the timestamp, but this will cause duplicate data to be dropped silently.

However, if `@timestamp` precision is not the problem, check your metrics pipeline setup for misconfiguration that causes an actual violation of the [single writer principle](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#single-writer).
17 changes: 14 additions & 3 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"io"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -329,12 +330,22 @@ func flushBulkIndexer(
logger.Error("bulk indexer flush error", zap.Error(err))
}
for _, resp := range stat.FailedDocs {
logger.Error(
"failed to index document",
fields := []zap.Field{
zap.String("index", resp.Index),
zap.String("error.type", resp.Error.Type),
zap.String("error.reason", resp.Error.Reason),
)
}
if hint := getErrorHint(resp.Index, resp.Error.Type); hint != "" {
fields = append(fields, zap.String("hint", hint))
}
logger.Error("failed to index document", fields...)
}
return stat, err
}

func getErrorHint(index, errorType string) string {
if strings.HasPrefix(index, ".ds-metrics-") && errorType == "version_conflict_engine_exception" {
return "check the \"Known issues\" section of Elasticsearch Exporter docs"
}
return ""
}
24 changes: 23 additions & 1 deletion exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
tests := []struct {
name string
roundTripFunc func(*http.Request) (*http.Response, error)
wantMessage string
wantFields []zap.Field
}{
{
name: "500",
Expand All @@ -129,6 +131,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
Body: io.NopCloser(strings.NewReader("error")),
}, nil
},
wantMessage: "bulk indexer flush error",
},
{
name: "429",
Expand All @@ -139,12 +142,27 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
Body: io.NopCloser(strings.NewReader("error")),
}, nil
},
wantMessage: "bulk indexer flush error",
},
{
name: "transport error",
roundTripFunc: func(*http.Request) (*http.Response, error) {
return nil, errors.New("transport error")
},
wantMessage: "bulk indexer flush error",
},
{
name: "known version conflict error",
roundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(
`{"items":[{"create":{"_index":".ds-metrics-generic.otel-default","status":400,"error":{"type":"version_conflict_engine_exception","reason":""}}}]}`)),
}, nil
},
wantMessage: "failed to index document",
wantFields: []zap.Field{zap.String("hint", "check the \"Known issues\" section of Elasticsearch Exporter docs")},
},
}

Expand All @@ -169,7 +187,11 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
assert.NoError(t, bulkIndexer.Close(context.Background()))
assert.Equal(t, 1, observed.FilterMessage("bulk indexer flush error").Len())
messages := observed.FilterMessage(tt.wantMessage)
require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All())
for _, wantField := range tt.wantFields {
assert.Equal(t, 1, messages.FilterField(wantField).Len(), "message with field not found; observed.All()=%v", observed.All())
}
})
}
}

0 comments on commit 48a25e0

Please sign in to comment.