Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Add hint in error logs for TSDB version_conflict_engine_exception error #35548

Merged
merged 6 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_index-error-hint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add hint in error logs for TSDB version_conflict_engine_exception error

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35546]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
10 changes: 9 additions & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,4 +337,12 @@ Otherwise, it is mapped to an empty string ("").

#### `@timestamp`

In case the record contains `timestamp`, this value is used. Otherwise, the `observed timestamp` is used.
In case the record contains `timestamp`, this value is used. Otherwise, the `observed timestamp` is used.

## Known issues

### version_conflict_engine_exception

When sending high traffic of metrics to a TSDB metrics data stream, e.g. using OTel mapping mode to a 8.16 Elasticsearch, it is possible to get error logs "failed to index document" with `error.type` "version_conflict_engine_exception" and `error.reason` containing "version conflict, document already exists". It is due to Elasticsearch grouping metrics using `@timestamp` in milliseconds precision as opposed to nanoseconds in elasticsearchexporter. This will be fixed in a future version of Elasticsearch.
carsonip marked this conversation as resolved.
Show resolved Hide resolved
andrzej-stencel marked this conversation as resolved.
Show resolved Hide resolved

However, if `@timestamp` precision is not the problem, it is possible that the error indicates an actual violation of the [single writer principle](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#single-writer). Check your metrics pipeline setup for misconfiguration.
17 changes: 14 additions & 3 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"io"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -329,12 +330,22 @@ func flushBulkIndexer(
logger.Error("bulk indexer flush error", zap.Error(err))
}
for _, resp := range stat.FailedDocs {
logger.Error(
"failed to index document",
fields := []zap.Field{
zap.String("index", resp.Index),
zap.String("error.type", resp.Error.Type),
zap.String("error.reason", resp.Error.Reason),
)
}
if hint := getErrorHint(resp.Index, resp.Error.Type); hint != "" {
fields = append(fields, zap.String("hint", hint))
}
logger.Error("failed to index document", fields...)
}
return stat, err
}

func getErrorHint(index, errorType string) string {
if strings.HasPrefix(index, ".ds-metrics-") && errorType == "version_conflict_engine_exception" {
return "check the known issues section of elasticsearchexporter"
carsonip marked this conversation as resolved.
Show resolved Hide resolved
}
return ""
}
24 changes: 23 additions & 1 deletion exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
tests := []struct {
name string
roundTripFunc func(*http.Request) (*http.Response, error)
wantMessage string
wantFields []zap.Field
}{
{
name: "500",
Expand All @@ -129,6 +131,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
Body: io.NopCloser(strings.NewReader("error")),
}, nil
},
wantMessage: "bulk indexer flush error",
},
{
name: "429",
Expand All @@ -139,12 +142,27 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
Body: io.NopCloser(strings.NewReader("error")),
}, nil
},
wantMessage: "bulk indexer flush error",
},
{
name: "transport error",
roundTripFunc: func(*http.Request) (*http.Response, error) {
return nil, errors.New("transport error")
},
wantMessage: "bulk indexer flush error",
},
{
name: "known version conflict error",
roundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(
`{"items":[{"create":{"_index":".ds-metrics-generic.otel-default","status":400,"error":{"type":"version_conflict_engine_exception","reason":""}}}]}`)),
}, nil
},
wantMessage: "failed to index document",
wantFields: []zap.Field{zap.String("hint", "check the known issues section of elasticsearchexporter")},
},
}

Expand All @@ -169,7 +187,11 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
assert.NoError(t, bulkIndexer.Close(context.Background()))
assert.Equal(t, 1, observed.FilterMessage("bulk indexer flush error").Len())
messages := observed.FilterMessage(tt.wantMessage)
require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All())
for _, wantField := range tt.wantFields {
assert.Equal(t, 1, messages.FilterField(wantField).Len(), "message with field not found; observed.All()=%v", observed.All())
}
})
}
}