From 93ecb1456ed2d590b3f4bcf607988baeedb6084f Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 25 Aug 2021 18:43:24 -0500 Subject: [PATCH] [Heartbeat] Fix broken data_stream assignment (#27535) (#27592) This PR fixes the logic behind assigning the data_steam.dataset field. Previously this was static per monitor type, only the index would change, but not the field. This makes the processor more comprehensive handling not just index naming but field generation. This is a breaking change, setting event.dataset consistently to the monitor type rather than the old value of uptime. Since this field is unlikely to have been used by anyone the impact is low, and it will bring us inline with the ECS spec requiring this field to match event.dataset. (cherry picked from commit 12df9f7c36d0f77c5169d157c48be3f241d8d35c) Co-authored-by: Andrew Cholakian --- CHANGELOG.next.asciidoc | 2 + heartbeat/config/config_test.go | 1 + heartbeat/magefile.go | 1 + heartbeat/monitors/factory.go | 86 ++++++++++--------- heartbeat/monitors/factory_test.go | 70 ++++++++++++--- heartbeat/tests/system/test_base.py | 2 +- .../add_data_stream.go} | 51 ++++++----- .../add_data_stream_test.go} | 23 +++-- .../add_data_stream_index/datastream.go | 18 ---- .../monitors/browser/synthexec/enrich.go | 6 +- .../monitors/browser/synthexec/enrich_test.go | 10 +-- 11 files changed, 161 insertions(+), 109 deletions(-) rename libbeat/processors/{add_data_stream_index/add_data_stream_index.go => add_data_stream/add_data_stream.go} (57%) rename libbeat/processors/{add_data_stream_index/add_data_stream_index_test.go => add_data_stream/add_data_stream_test.go} (76%) delete mode 100644 libbeat/processors/add_data_stream_index/datastream.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8ab8a3fbf238..542f6f219bb7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -51,6 +51,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add option for S3 input to work without SQS notification {issue}18205[18205] {pull}27332[27332] *Heartbeat* +- Remove long deprecated `watch_poll` functionality. {pull}27166[27166] +- Fix inconsistency in `event.dataset` values between heartbeat and fleet by always setting this value to the monitor type / fleet dataset. {pull}27535[27535] *Journalbeat* diff --git a/heartbeat/config/config_test.go b/heartbeat/config/config_test.go index f4373cc528be..9b529728b453 100644 --- a/heartbeat/config/config_test.go +++ b/heartbeat/config/config_test.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//go:build !integration // +build !integration package config diff --git a/heartbeat/magefile.go b/heartbeat/magefile.go index 0480c30a69ef..40cc655ab1fb 100644 --- a/heartbeat/magefile.go +++ b/heartbeat/magefile.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//go:build mage // +build mage package main diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 78d5aa648095..a4d2ce02b6cd 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -18,6 +18,8 @@ package monitors import ( + "fmt" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/scheduler" @@ -26,7 +28,8 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/processors" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/actions" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" ) @@ -52,10 +55,10 @@ type publishSettings struct { KeepNull bool `config:"keep_null"` // Output meta data settings - Pipeline string `config:"pipeline"` // ES Ingest pipeline name - Index fmtstr.EventFormatString `config:"index"` // ES output index pattern - DataStream *add_data_stream_index.DataStream `config:"data_stream"` - DataSet string `config:"dataset"` + Pipeline string `config:"pipeline"` // ES Ingest pipeline name + Index fmtstr.EventFormatString `config:"index"` // ES output index pattern + DataStream *add_data_stream.DataStream `config:"data_stream"` + DataSet string `config:"dataset"` } // NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects. @@ -91,12 +94,13 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi return nil, err } - stdFields, err := stdfields.ConfigToStdMonitorFields(cfg) + sf, err := stdfields.ConfigToStdMonitorFields(cfg) if err != nil { - return nil, err + return nil, fmt.Errorf("could not parse cfg for datastream %w", err) } - indexProcessor, err := setupIndexProcessor(info, settings, stdFields.Type) + // Early stage processors for setting data_stream, event.dataset, and index to write to + preProcs, err := preProcessors(info, settings, sf.Type) if err != nil { return nil, err } @@ -106,40 +110,20 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi return nil, err } - // TODO: Remove this logic in the 8.0/master branch, preserve only in 7.x - dataset := settings.DataSet - if dataset == "" { - if settings.DataStream != nil && settings.DataStream.Dataset != "" { - dataset = settings.DataStream.Dataset - } else { - dataset = "uptime" - } - } - return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { fields := clientCfg.Processing.Fields.Clone() - fields.Put("event.dataset", dataset) - - if settings.DataStream != nil { - fields.Put("data_stream", settings.DataStream) - } meta := clientCfg.Processing.Meta.Clone() if settings.Pipeline != "" { meta.Put("pipeline", settings.Pipeline) } - // assemble the processors. Ordering is important. - // 1. add support for index configuration via processor - // 2. add processors added by the input that wants to connect - // 3. add locally configured processors from the 'processors' settings procs := processors.NewList(nil) - if indexProcessor != nil { - procs.AddProcessor(indexProcessor) - } + if lst := clientCfg.Processing.Processor; lst != nil { procs.AddProcessor(lst) } + procs.AddProcessors(*preProcs) if userProcessors != nil { procs.AddProcessors(*userProcessors) } @@ -155,28 +139,50 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi }, nil } -func setupIndexProcessor(info beat.Info, settings publishSettings, dataset string) (processors.Processor, error) { - var indexProcessor processors.Processor +// preProcessors sets up the required event.dataset, data_stream.*, and write index processors for future event publishes. +func preProcessors(info beat.Info, settings publishSettings, monitorType string) (procs *processors.Processors, err error) { + procs = processors.NewList(nil) + + var dataset string + if settings.DataStream != nil && settings.DataStream.Dataset != "" { + dataset = settings.DataStream.Dataset + } else { + dataset = monitorType + } + + // Always set event.dataset + procs.AddProcessor(actions.NewAddFields(common.MapStr{"event": common.MapStr{"dataset": dataset}}, true, true)) + if settings.DataStream != nil { - ds := settings.DataStream + ds := *settings.DataStream if ds.Type == "" { ds.Type = "synthetics" } if ds.Dataset == "" { ds.Dataset = dataset } - return add_data_stream_index.New(*ds), nil + + procs.AddProcessor(add_data_stream.New(ds)) } if !settings.Index.IsEmpty() { - staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version) - - timestampFormat, err := - fmtstr.NewTimestampFormatString(&settings.Index, staticFields) + proc, err := indexProcessor(&settings.Index, info) if err != nil { return nil, err } - indexProcessor = add_formatted_index.New(timestampFormat) + procs.AddProcessor(proc) + } + + return procs, nil +} + +func indexProcessor(index *fmtstr.EventFormatString, info beat.Info) (beat.Processor, error) { + staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version) + + timestampFormat, err := + fmtstr.NewTimestampFormatString(index, staticFields) + if err != nil { + return nil, err } - return indexProcessor, nil + return add_formatted_index.New(timestampFormat), nil } diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index 7c515885421a..27b119392eec 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -18,6 +18,7 @@ package monitors import ( + "regexp" "testing" "github.com/stretchr/testify/require" @@ -26,25 +27,27 @@ import ( "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" ) -func TestSetupIndexProcessor(t *testing.T) { +func TestPreProcessors(t *testing.T) { binfo := beat.Info{ Beat: "heartbeat", IndexPrefix: "heartbeat", Version: "8.0.0", } tests := map[string]struct { - settings publishSettings - expectedIndex string - monitorType string - wantProc bool - wantErr bool + settings publishSettings + expectedIndex string + expectedDatastream *add_data_stream.DataStream + monitorType string + wantProc bool + wantErr bool }{ "no settings should yield no processor": { publishSettings{}, "", + nil, "browser", false, false, @@ -52,28 +55,39 @@ func TestSetupIndexProcessor(t *testing.T) { "exact index should be used exactly": { publishSettings{Index: *fmtstr.MustCompileEvent("test")}, "test", + nil, "browser", true, false, }, "data stream should be type-namespace-dataset": { publishSettings{ - DataStream: &add_data_stream_index.DataStream{ + DataStream: &add_data_stream.DataStream{ Namespace: "myNamespace", Dataset: "myDataset", Type: "myType", }, }, "myType-myDataset-myNamespace", + &add_data_stream.DataStream{ + Namespace: "myNamespace", + Dataset: "myDataset", + Type: "myType", + }, "myType", true, false, }, "data stream should use defaults": { publishSettings{ - DataStream: &add_data_stream_index.DataStream{}, + DataStream: &add_data_stream.DataStream{}, }, "synthetics-browser-default", + &add_data_stream.DataStream{ + Namespace: "default", + Dataset: "browser", + Type: "synthetics", + }, "browser", true, false, @@ -83,21 +97,49 @@ func TestSetupIndexProcessor(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { e := beat.Event{Meta: common.MapStr{}, Fields: common.MapStr{}} - proc, err := setupIndexProcessor(binfo, tt.settings, tt.monitorType) + procs, err := preProcessors(binfo, tt.settings, tt.monitorType) if tt.wantErr == true { require.Error(t, err) return } require.NoError(t, err) + // If we're just setting event.dataset we only get the 1 + // otherwise we get a second add_data_stream processor if !tt.wantProc { - require.Nil(t, proc) + require.Len(t, procs.List, 1) return } + require.Len(t, procs.List, 2) + + _, err = procs.Run(&e) + + t.Run("index name should be set", func(t *testing.T) { + require.NoError(t, err) + require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex]) + }) + + eventDs, err := e.GetValue("event.dataset") + require.NoError(t, err) + + t.Run("event.dataset should always be present, preferring data_stream", func(t *testing.T) { + dataset := tt.monitorType + if tt.settings.DataStream != nil && tt.settings.DataStream.Dataset != "" { + dataset = tt.settings.DataStream.Dataset + } + require.Equal(t, dataset, eventDs, "event.dataset be computed correctly") + require.Regexp(t, regexp.MustCompile(`^.+`), eventDs, "should be a string > 1 char") + }) + + t.Run("event.data_stream", func(t *testing.T) { + dataStreamRaw, _ := e.GetValue("data_stream") + if tt.expectedDatastream != nil { + dataStream := dataStreamRaw.(add_data_stream.DataStream) + require.Equal(t, eventDs, dataStream.Dataset, "event.dataset be identical to data_stream.dataset") - require.NotNil(t, proc) - _, err = proc.Run(&e) - require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex]) + require.Equal(t, *tt.expectedDatastream, dataStream) + } + }) }) } } diff --git a/heartbeat/tests/system/test_base.py b/heartbeat/tests/system/test_base.py index 643f9f31bf76..e0361761e041 100644 --- a/heartbeat/tests/system/test_base.py +++ b/heartbeat/tests/system/test_base.py @@ -206,6 +206,6 @@ def test_dataset(self): for output in self.read_output(): self.assertEqual( output["event.dataset"], - "uptime", + output["monitor.type"], "Check for event.dataset in {} failed".format(output) ) diff --git a/libbeat/processors/add_data_stream_index/add_data_stream_index.go b/libbeat/processors/add_data_stream/add_data_stream.go similarity index 57% rename from libbeat/processors/add_data_stream_index/add_data_stream_index.go rename to libbeat/processors/add_data_stream/add_data_stream.go index 7d770c6f082c..d27f9960b507 100644 --- a/libbeat/processors/add_data_stream_index/add_data_stream_index.go +++ b/libbeat/processors/add_data_stream/add_data_stream.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package add_data_stream_index +package add_data_stream import ( "fmt" @@ -37,50 +37,59 @@ func SetEventDataset(event *beat.Event, ds string) { } } -// AddDataStreamIndex is a Processor to set an event's "raw_index" metadata field -// based on the given type, dataset, and namespace fields. -// If the event's metadata contains an -type AddDataStreamIndex struct { +// AddDataStream is a Processor to set an event's "raw_index" metadata field +// based on the given type, dataset, and namespace fields, as well as its +// `data_stream` field dynamically. +type AddDataStream struct { DataStream DataStream // cached, compiled version of the index name derived from the data stream - dsCached string - customDsCache string + idxNameCache string + // cached, compiled version of the index name derived from the data stream, sans datastream + // which is dynamic + idxNamePartialCache string } // New returns a new AddDataStreamIndex processor. -func New(ds DataStream) *AddDataStreamIndex { +func New(ds DataStream) *AddDataStream { if ds.Namespace == "" { ds.Namespace = "default" } if ds.Dataset == "" { ds.Dataset = "generic" } - return &AddDataStreamIndex{ - DataStream: ds, - dsCached: ds.indexName(), - customDsCache: ds.datasetFmtString(), + return &AddDataStream{ + DataStream: ds, + idxNameCache: ds.indexName(), + idxNamePartialCache: ds.idxNamePartialCache(), } } // Run runs the processor. -func (p *AddDataStreamIndex) Run(event *beat.Event) (*beat.Event, error) { +func (p *AddDataStream) Run(event *beat.Event) (*beat.Event, error) { + eventDataStream := p.DataStream if event.Meta == nil { event.Meta = common.MapStr{ - events.FieldMetaRawIndex: p.dsCached, + events.FieldMetaRawIndex: p.idxNameCache, } } else { - customDs, hasCustom := event.Meta[FieldMetaCustomDataset] + customDataset, hasCustom := event.Meta[FieldMetaCustomDataset] if !hasCustom { - event.Meta[events.FieldMetaRawIndex] = p.dsCached + event.Meta[events.FieldMetaRawIndex] = p.idxNameCache } else { - event.Meta[events.FieldMetaRawIndex] = fmt.Sprintf(p.customDsCache, customDs) + event.Meta[events.FieldMetaRawIndex] = fmt.Sprintf(p.idxNamePartialCache, customDataset) + eventDataStream.Dataset = customDataset.(string) } } + if event.Fields == nil { + event.Fields = common.MapStr{} + } + event.PutValue("event.dataset", eventDataStream.Dataset) + event.PutValue("data_stream", eventDataStream) return event, nil } -func (p *AddDataStreamIndex) String() string { +func (p *AddDataStream) String() string { return fmt.Sprintf("add_data_stream_index=%v", p.DataStream.indexName()) } @@ -92,10 +101,12 @@ type DataStream struct { Type string `config:"type"` } -func (ds DataStream) datasetFmtString() string { +// genIdxFmtStringForDataset returns a format string that takes a single argument, the dataset +// this is slightly more efficient than generating the entire string with all three vars every time +func (ds DataStream) idxNamePartialCache() string { return fmt.Sprintf("%s-%%s-%s", ds.Type, ds.Namespace) } func (ds DataStream) indexName() string { - return fmt.Sprintf(ds.datasetFmtString(), ds.Dataset) + return fmt.Sprintf(ds.idxNamePartialCache(), ds.Dataset) } diff --git a/libbeat/processors/add_data_stream_index/add_data_stream_index_test.go b/libbeat/processors/add_data_stream/add_data_stream_test.go similarity index 76% rename from libbeat/processors/add_data_stream_index/add_data_stream_index_test.go rename to libbeat/processors/add_data_stream/add_data_stream_test.go index d71181e31271..71d981d6034b 100644 --- a/libbeat/processors/add_data_stream_index/add_data_stream_index_test.go +++ b/libbeat/processors/add_data_stream/add_data_stream_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package add_data_stream_index +package add_data_stream import ( "testing" @@ -27,24 +27,26 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -func TestAddDataStreamIndex(t *testing.T) { +func TestAddDataStream(t *testing.T) { simpleDs := DataStream{ "myns", "myds", "mytype", } tests := []struct { - name string - ds DataStream - event *beat.Event - want string - wantErr bool + name string + ds DataStream + event *beat.Event + wantIndex string + wantDataStream DataStream + wantErr bool }{ { "simple", simpleDs, &beat.Event{}, "mytype-myds-myns", + simpleDs, false, }, { @@ -52,6 +54,7 @@ func TestAddDataStreamIndex(t *testing.T) { simpleDs, &beat.Event{Meta: common.MapStr{}}, "mytype-myds-myns", + simpleDs, false, }, { @@ -61,6 +64,7 @@ func TestAddDataStreamIndex(t *testing.T) { FieldMetaCustomDataset: "custom-ds", }}, "mytype-custom-ds-myns", + DataStream{"myns", "custom-ds", "mytype"}, false, }, { @@ -70,6 +74,7 @@ func TestAddDataStreamIndex(t *testing.T) { }, &beat.Event{}, "mytype-generic-default", + DataStream{"default", "generic", "mytype"}, false, }, } @@ -81,7 +86,9 @@ func TestAddDataStreamIndex(t *testing.T) { t.Errorf("Run() error = %v, wantErr %v", err, tt.wantErr) return } - require.Equal(t, tt.want, got.Meta[events.FieldMetaRawIndex]) + require.Equal(t, tt.wantIndex, got.Meta[events.FieldMetaRawIndex]) + require.Equal(t, tt.wantDataStream, got.Fields["data_stream"]) + require.Equal(t, tt.wantDataStream.Dataset, got.Fields["event"].(common.MapStr)["dataset"]) }) } } diff --git a/libbeat/processors/add_data_stream_index/datastream.go b/libbeat/processors/add_data_stream_index/datastream.go deleted file mode 100644 index c2a69dc5214c..000000000000 --- a/libbeat/processors/add_data_stream_index/datastream.go +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package add_data_stream_index diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index a842dd28794b..c3ab6c76faab 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -9,7 +9,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/beat/events" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/gofrs/uuid" @@ -122,9 +122,9 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e case "step/screenshot_ref": fallthrough case "screenshot/block": - add_data_stream_index.SetEventDataset(event, "browser.screenshot") + add_data_stream.SetEventDataset(event, "browser.screenshot") case "journey/network_info": - add_data_stream_index.SetEventDataset(event, "browser.network") + add_data_stream.SetEventDataset(event, "browser.network") } if se.Id != "" { diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index 8addfece0f4e..629454f34c0e 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -16,7 +16,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/testslike" ) @@ -151,7 +151,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "step/screenshot"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, "browser.screenshot", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.screenshot", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, { @@ -160,7 +160,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "step/screenshot_ref"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, "browser.screenshot", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.screenshot", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, { @@ -171,7 +171,7 @@ func TestEnrichSynthEvent(t *testing.T) { func(t *testing.T, e *beat.Event, je *journeyEnricher) { require.Equal(t, "my_id", e.Meta["_id"]) require.Equal(t, events.OpTypeCreate, e.Meta[events.FieldMetaOpType]) - require.Equal(t, "browser.screenshot", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.screenshot", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, { @@ -180,7 +180,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "journey/network_info"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, "browser.network", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.network", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, }