From ac74e37be4e69a7d3c01560223e1614294b14d76 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Mon, 27 Jan 2025 17:19:48 +0100 Subject: [PATCH 1/2] Improved grouping of data points into documents This may prevent document rejections in case the same batch contains different resources or scopes that have the same set of attributes --- ...ticsearchexporter_data-point-grouping.yaml | 27 ++++++++ exporter/elasticsearchexporter/exporter.go | 67 ++++++++++++------- .../elasticsearchexporter/exporter_test.go | 34 ++++++++++ 3 files changed, 103 insertions(+), 25 deletions(-) create mode 100644 .chloggen/elasticsearchexporter_data-point-grouping.yaml diff --git a/.chloggen/elasticsearchexporter_data-point-grouping.yaml b/.chloggen/elasticsearchexporter_data-point-grouping.yaml new file mode 100644 index 000000000000..67519e1aa27b --- /dev/null +++ b/.chloggen/elasticsearchexporter_data-point-grouping.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Group data points into a single document even if they are from different but equal resources + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37508] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This may prevent document rejections in case the same batch contains different resources or scopes that have the same set of attributes + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 6022abf30595..420d59586e64 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -193,6 +193,18 @@ func (e *elasticsearchExporter) pushLogRecord( return bulkIndexerSession.Add(ctx, fIndex.Index, docID, buf, nil) } +type dataPointsGroup struct { + resource pcommon.Resource + resourceSchemaURL string + scope pcommon.InstrumentationScope + scopeSchemaURL string + dataPoints []dataPoint +} + +func (p *dataPointsGroup) addDataPoint(dp dataPoint) { + p.dataPoints = append(p.dataPoints, dp) +} + func (e *elasticsearchExporter) pushMetricsData( ctx context.Context, metrics pmetric.Metrics, @@ -206,6 +218,8 @@ func (e *elasticsearchExporter) pushMetricsData( } defer session.End() + groupedDataPointsByIndex := make(map[elasticsearch.Index]map[uint32]*dataPointsGroup) + var validationErrs []error // log instead of returning these so that upstream does not retry var errs []error resourceMetrics := metrics.ResourceMetrics() for i := 0; i < resourceMetrics.Len(); i++ { @@ -214,10 +228,8 @@ func (e *elasticsearchExporter) pushMetricsData( scopeMetrics := resourceMetric.ScopeMetrics() for j := 0; j < scopeMetrics.Len(); j++ { - var validationErrs []error // log instead of returning these so that upstream does not retry scopeMetrics := scopeMetrics.At(j) scope := scopeMetrics.Scope() - groupedDataPointsByIndex := make(map[elasticsearch.Index]map[uint32][]dataPoint) for k := 0; k < scopeMetrics.Metrics().Len(); k++ { metric := scopeMetrics.Metrics().At(k) @@ -228,15 +240,19 @@ func (e *elasticsearchExporter) pushMetricsData( } groupedDataPoints, ok := groupedDataPointsByIndex[fIndex] if !ok { - groupedDataPoints = make(map[uint32][]dataPoint) + groupedDataPoints = make(map[uint32]*dataPointsGroup) groupedDataPointsByIndex[fIndex] = groupedDataPoints } dpHash := e.model.hashDataPoint(dp) - dataPoints, ok := groupedDataPoints[dpHash] + dpGroup, ok := groupedDataPoints[dpHash] if !ok { - groupedDataPoints[dpHash] = []dataPoint{dp} + groupedDataPoints[dpHash] = &dataPointsGroup{ + resource: resource, + scope: scope, + dataPoints: []dataPoint{dp}, + } } else { - groupedDataPoints[dpHash] = append(dataPoints, dp) + dpGroup.addDataPoint(dp) } return nil } @@ -297,30 +313,31 @@ func (e *elasticsearchExporter) pushMetricsData( } } } + } + } - for fIndex, groupedDataPoints := range groupedDataPointsByIndex { - for _, dataPoints := range groupedDataPoints { - buf := e.bufferPool.NewPooledBuffer() - dynamicTemplates, err := e.model.encodeMetrics(resource, resourceMetric.SchemaUrl(), scope, scopeMetrics.SchemaUrl(), dataPoints, &validationErrs, fIndex, buf.Buffer) - if err != nil { - buf.Recycle() - errs = append(errs, err) - continue - } - if err := session.Add(ctx, fIndex.Index, "", buf, dynamicTemplates); err != nil { - // not recycling after Add returns an error as we don't know if it's already recycled - if cerr := ctx.Err(); cerr != nil { - return cerr - } - errs = append(errs, err) - } - } + for fIndex, groupedDataPoints := range groupedDataPointsByIndex { + for _, dpGroup := range groupedDataPoints { + buf := e.bufferPool.NewPooledBuffer() + dynamicTemplates, err := e.model.encodeMetrics( + dpGroup.resource, dpGroup.resourceSchemaURL, dpGroup.scope, dpGroup.scopeSchemaURL, dpGroup.dataPoints, &validationErrs, fIndex, buf.Buffer) + if err != nil { + buf.Recycle() + errs = append(errs, err) + continue } - if len(validationErrs) > 0 { - e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...))) + if err := session.Add(ctx, fIndex.Index, "", buf, dynamicTemplates); err != nil { + // not recycling after Add returns an error as we don't know if it's already recycled + if cerr := ctx.Err(); cerr != nil { + return cerr + } + errs = append(errs, err) } } } + if len(validationErrs) > 0 { + e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...))) + } if err := session.Flush(ctx); err != nil { if cerr := ctx.Err(); cerr != nil { diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 514c4bf95437..ad8388eef7c8 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -1361,6 +1361,40 @@ func TestExporterMetrics(t *testing.T) { assertRecordedItems(t, expected, rec, false) }) + t.Run("otel mode grouping of equal resources", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestMetricsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + }) + + metrics := pmetric.NewMetrics() + for _, n := range []string{"m1", "m2"} { + resourceMetric := metrics.ResourceMetrics().AppendEmpty() + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + + sumMetric := scopeMetric.Metrics().AppendEmpty() + sumMetric.SetName(n) + sumDP := sumMetric.SetEmptySum().DataPoints().AppendEmpty() + sumDP.SetIntValue(0) + } + + mustSendMetrics(t, exporter, metrics) + + expected := []itemRequest{ + { + Action: []byte(`{"create":{"_index":"metrics-generic.otel-default","dynamic_templates":{"metrics.m1":"gauge_long","metrics.m2":"gauge_long"}}}`), + Document: []byte(`{"@timestamp":"0.0","data_stream":{"dataset":"generic.otel","namespace":"default","type":"metrics"},"metrics":{"m1":0,"m2":0},"resource":{},"scope":{}}`), + }, + } + + assertRecordedItems(t, expected, rec, false) + }) + t.Run("otel mode aggregate_metric_double hint", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { From aa91ad995b61cc52af0cf716f1725bbbf8835baf Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Mon, 27 Jan 2025 17:23:10 +0100 Subject: [PATCH 2/2] Fix pr number in changelog --- .chloggen/elasticsearchexporter_data-point-grouping.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/elasticsearchexporter_data-point-grouping.yaml b/.chloggen/elasticsearchexporter_data-point-grouping.yaml index 67519e1aa27b..2c48cfae8728 100644 --- a/.chloggen/elasticsearchexporter_data-point-grouping.yaml +++ b/.chloggen/elasticsearchexporter_data-point-grouping.yaml @@ -10,7 +10,7 @@ component: elasticsearchexporter note: Group data points into a single document even if they are from different but equal resources # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [37508] +issues: [37509] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document.