From 18502643f039092617213e84b8146517c20672da Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 14 Aug 2020 18:01:51 +0200 Subject: [PATCH 1/7] Heartbeat: move configuration of common input setting to the runner factory This changes moves the configuration of common input settings like processors, fields, tags to the runner factory (similar to filebeat). The settings index, pipeline, and dataset have been added as generic settings to all monitors. --- heartbeat/beater/heartbeat.go | 4 +- heartbeat/monitors/factory.go | 102 +++++++++++++++++++++++++++++++++- heartbeat/monitors/task.go | 48 +++------------- 3 files changed, 109 insertions(+), 45 deletions(-) diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 15185c707deb..26b1202850da 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -72,7 +72,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { config: parsedConfig, scheduler: scheduler, // dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload - dynamicFactory: monitors.NewFactory(scheduler, false), + dynamicFactory: monitors.NewFactory(b.Info, scheduler, false), } return bt, nil } @@ -123,7 +123,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { // RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present. func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error { - factory := monitors.NewFactory(bt.scheduler, true) + factory := monitors.NewFactory(b.Info, bt.scheduler, true) for _, cfg := range bt.config.Monitors { created, err := factory.Create(b.Publisher, cfg) diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index e453bc3a03ac..d9a48ef4f370 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -22,22 +22,51 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" ) // RunnerFactory that can be used to create cfg.Runner cast versions of Monitor // suitable for config reloading. type RunnerFactory struct { + info beat.Info sched *scheduler.Scheduler allowWatches bool } +type publishSettings struct { + // Fields and tags to add to monitor. + EventMetadata common.EventMetadata `config:",inline"` + Processors processors.PluginConfig `config:"processors"` + + PublisherPipeline struct { + DisableHost bool `config:"disable_host"` // Disable addition of host.name. + } `config:"publisher_pipeline"` + + // KeepNull determines whether published events will keep null values or omit them. + KeepNull bool `config:"keep_null"` + + // Output meta data settings + Pipeline string `config:"pipeline"` // ES Ingest pipeline name + Index fmtstr.EventFormatString `config:"index"` // ES output index pattern + DataSet string `config:"dataset"` +} + // NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects. -func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory { - return &RunnerFactory{sched, allowWatches} +func NewFactory(info beat.Info, sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory { + return &RunnerFactory{info, sched, allowWatches} } // Create makes a new Runner for a new monitor with the given Config. -func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) { +func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runner, error) { + configEditor, err := newCommonPublishConfigs(f.info, c) + if err != nil { + return nil, err + } + + p = pipetool.WithClientConfigEdit(p, configEditor) monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches) return monitor, err } @@ -46,3 +75,70 @@ func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config) (cfgf func (f *RunnerFactory) CheckConfig(config *common.Config) error { return checkMonitorConfig(config, globalPluginsReg, f.allowWatches) } + +func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.ConfigEditor, error) { + var settings publishSettings + if err := cfg.Unpack(&settings); err != nil { + return nil, err + } + + var indexProcessor processors.Processor + if !settings.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version) + timestampFormat, err := + fmtstr.NewTimestampFormatString(&settings.Index, staticFields) + if err != nil { + return nil, err + } + indexProcessor = add_formatted_index.New(timestampFormat) + } + + userProcessors, err := processors.New(settings.Processors) + if err != nil { + return nil, err + } + + dataset := settings.DataSet + if dataset == "" { + dataset = "uptime" + } + + return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { + fields := clientCfg.Processing.Fields.Clone() + fields.Put("event.dataset", dataset) + + meta := clientCfg.Processing.Meta.Clone() + if settings.Pipeline != "" { + meta.Put("pipeline", settings.Pipeline) + } + + // assemble the processors. Ordering is important. + // 1. add support for index configuration via processor + // 2. add processors added by the input that wants to connect + // 3. add locally configured processors from the 'processors' settings + procs := processors.NewList(nil) + if indexProcessor != nil { + procs.AddProcessor(indexProcessor) + } + if lst := clientCfg.Processing.Processor; lst != nil { + procs.AddProcessor(lst) + } + if userProcessors != nil { + procs.AddProcessors(*userProcessors) + } + + clientCfg.Processing.EventMetadata = settings.EventMetadata + clientCfg.Processing.Meta = meta + clientCfg.Processing.Processor = procs + clientCfg.Processing.KeepNull = settings.KeepNull + clientCfg.Processing.DisableHost = settings.PublisherPipeline.DisableHost + + return clientCfg, nil + }, nil +} + +func setOptional(to common.MapStr, key string, value string) { + if value != "" { + to.Put(key, value) + } +} diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 0853b5547183..eb53ddeb1953 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -21,8 +21,6 @@ import ( "context" "fmt" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/heartbeat/eventext" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/scheduler" @@ -30,39 +28,24 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/processors" ) // configuredJob represents a job combined with its config and any // subsequent processors. type configuredJob struct { - job jobs.Job - config jobConfig - monitor *Monitor - processors *processors.Processors - cancelFn context.CancelFunc - client beat.Client + job jobs.Job + config jobConfig + monitor *Monitor + cancelFn context.CancelFunc + client beat.Client } func newConfiguredJob(job jobs.Job, config jobConfig, monitor *Monitor) (*configuredJob, error) { - t := &configuredJob{ + return &configuredJob{ job: job, config: config, monitor: monitor, - } - - processors, err := processors.New(config.Processors) - if err != nil { - return nil, ProcessorsError{err} - } - t.processors = processors - - if err != nil { - logp.Critical("Could not create client for monitor configuredJob %+v", t.monitor) - return nil, errors.Wrap(err, "could not create client for monitor configuredJob") - } - - return t, nil + }, nil } // jobConfig represents fields needed to execute a single job. @@ -70,13 +53,6 @@ type jobConfig struct { Name string `config:"pluginName"` Type string `config:"type"` Schedule *schedule.Schedule `config:"schedule" validate:"required"` - - // Fields and tags to add to monitor. - EventMetadata common.EventMetadata `config:",inline"` - Processors processors.PluginConfig `config:"processors"` - - // KeepNull determines whether published events will keep null values or omit them. - KeepNull bool `config:"keep_null"` } // ProcessorsError is used to indicate situations when processors could not be loaded. @@ -101,15 +77,7 @@ func (t *configuredJob) makeSchedulerTaskFunc() scheduler.TaskFunc { func (t *configuredJob) Start() { var err error - fields := common.MapStr{"event": common.MapStr{"dataset": "uptime"}} - t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - EventMetadata: t.config.EventMetadata, - Processor: t.processors, - KeepNull: t.config.KeepNull, - Fields: fields, - }, - }) + t.client, err = t.monitor.pipelineConnector.Connect() if err != nil { logp.Err("could not start monitor: %v", err) return From 0bb310644bc9a3bc408d0aa35be6958eb7d90144 Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 14 Aug 2020 18:13:56 +0200 Subject: [PATCH 2/7] remove unused function --- heartbeat/monitors/factory.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index d9a48ef4f370..17f5558c82e2 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -136,9 +136,3 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi return clientCfg, nil }, nil } - -func setOptional(to common.MapStr, key string, value string) { - if value != "" { - to.Put(key, value) - } -} From e883c624281af3ce0523bb9544071b10d1eee062 Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 14 Aug 2020 18:18:37 +0200 Subject: [PATCH 3/7] Add index and pipeline setting to docs --- .../monitors/monitor-common-options.asciidoc | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/heartbeat/docs/monitors/monitor-common-options.asciidoc b/heartbeat/docs/monitors/monitor-common-options.asciidoc index ac1e0b27a1cb..53a7e1242816 100644 --- a/heartbeat/docs/monitors/monitor-common-options.asciidoc +++ b/heartbeat/docs/monitors/monitor-common-options.asciidoc @@ -143,6 +143,30 @@ A list of processors to apply to the data generated by the monitor. See <> for information about specifying processors in your config. +[float] +[[monitor-pipeline]] +===== `pipeline` + +The Ingest Node pipeline ID to set for the events generated by this input. + +NOTE: The pipeline ID can also be configured in the Elasticsearch output, but +this option usually results in simpler configuration files. If the pipeline is +configured both in the input and output, the option from the +input is used. + +[float] +[[monitor-index]] +===== `index` + +If present, this formatted string overrides the index for events from this input +(for elasticsearch outputs), or sets the `raw_index` field of the event's +metadata (for other outputs). This string can only refer to the agent name and +version and the event timestamp; for access to dynamic fields, use +`output.elasticsearch.index` or a processor. + +Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might +expand to `"heartbeat-myindex-2019.11.01"`. + [float] [[monitor-keep-null]] ==== `keep_null` From 3ea607e3034d3f4cbdf8ad7e70ef484b31b9484a Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 14 Aug 2020 18:25:55 +0200 Subject: [PATCH 4/7] add index/ipeline setting to reference config --- .../_meta/config/beat.reference.yml.tmpl | 24 +++++++++++++++++++ heartbeat/heartbeat.reference.yml | 24 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/heartbeat/_meta/config/beat.reference.yml.tmpl b/heartbeat/_meta/config/beat.reference.yml.tmpl index 1b4cd61937af..e72d7fd7ae16 100644 --- a/heartbeat/_meta/config/beat.reference.yml.tmpl +++ b/heartbeat/_meta/config/beat.reference.yml.tmpl @@ -80,6 +80,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -169,6 +177,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -261,6 +277,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 8950cbe8ada8..a0a7972b8baa 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -80,6 +80,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -169,6 +177,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -261,6 +277,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false From 3d9a3e9cf8ff470c7a0cc36073df909bd8d39f4e Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 14 Aug 2020 18:27:42 +0200 Subject: [PATCH 5/7] Add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c33b5e052aa0..11a49d102238 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -542,6 +542,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Allow a list of status codes for HTTP checks. {pull}15587[15587] - Add additional ECS compatible fields for TLS information. {pull}17687[17687] - Record HTTP response headers. {pull}18327[18327] +- Add index and pipeline settings to monitor configurations. {pull}20610[20610] *Journalbeat* From 4c5d7419f9958c2ca4ce1436fb1a4412deb78b9e Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 17 Aug 2020 18:49:11 +0200 Subject: [PATCH 6/7] fix lint --- x-pack/heartbeat/heartbeat.reference.yml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index 8950cbe8ada8..a0a7972b8baa 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -80,6 +80,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -169,6 +177,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -261,6 +277,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false From 748739b3a1ad0a11865e75c9a0d93eec9c6f283c Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 20 Aug 2020 17:30:05 +0200 Subject: [PATCH 7/7] Fix field not being added --- heartbeat/monitors/factory.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 17f5558c82e2..10d039d0830f 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" @@ -104,6 +105,8 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi } return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { + logp.Info("Client connection with: %#v", clientCfg) + fields := clientCfg.Processing.Fields.Clone() fields.Put("event.dataset", dataset) @@ -128,6 +131,7 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi } clientCfg.Processing.EventMetadata = settings.EventMetadata + clientCfg.Processing.Fields = fields clientCfg.Processing.Meta = meta clientCfg.Processing.Processor = procs clientCfg.Processing.KeepNull = settings.KeepNull