diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5a8b37bf45c7..ea4020a13a7e 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770] - Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961] - Count HTTP 429 responses in the elasticsearch output {pull}8056[8056] +- Report configured queue type. {pull}8091[8091] *Auditbeat* diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 05f30940dcf3..59a791aaf428 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -289,7 +289,14 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { } debugf("Initializing output plugins") - pipeline, err := pipeline.Load(b.Info, reg, b.Config.Pipeline, b.Config.Output) + pipeline, err := pipeline.Load(b.Info, + pipeline.Monitors{ + Metrics: reg, + Telemetry: monitoring.GetNamespace("state").GetRegistry(), + Logger: logp.L().Named("publisher"), + }, + b.Config.Pipeline, + b.Config.Output) if err != nil { return nil, fmt.Errorf("error initializing publisher: %+v", err) } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 7c9357abdd77..f64bad7432ac 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -43,7 +43,8 @@ import ( ) type reporter struct { - done *stopper + done *stopper + logger *logp.Logger checkRetry time.Duration @@ -57,7 +58,9 @@ type reporter struct { out outputs.Group } -var debugf = logp.MakeDebug("monitoring") +const selector = "monitoring" + +var debugf = logp.MakeDebug(selector) var errNoMonitoring = errors.New("xpack monitoring not available") @@ -72,6 +75,8 @@ func init() { } func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { + log := logp.L().Named(selector) + config := defaultConfig if err := cfg.Unpack(&config); err != nil { return nil, err @@ -89,7 +94,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { return nil, err } if proxyURL != nil { - logp.Info("Using proxy URL: %s", proxyURL) + log.Infof("Using proxy URL: %s", proxyURL) } tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) if err != nil { @@ -123,10 +128,11 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { } queueFactory := func(e queue.Eventer) (queue.Queue, error) { - return memqueue.NewBroker(memqueue.Settings{ - Eventer: e, - Events: 20, - }), nil + return memqueue.NewBroker(log, + memqueue.Settings{ + Eventer: e, + Events: 20, + }), nil } monitoring := monitoring.Default.GetRegistry("xpack.monitoring") @@ -149,6 +155,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { } r := &reporter{ + logger: log, done: newStopper(), beatMeta: makeMeta(beat), tags: config.Tags, @@ -171,6 +178,8 @@ func (r *reporter) initLoop(c config) { debugf("Start monitoring endpoint init loop.") defer debugf("Finish monitoring endpoint init loop.") + log := r.logger + logged := false for { @@ -178,11 +187,11 @@ func (r *reporter) initLoop(c config) { client := r.out.Clients[rand.Intn(len(r.out.Clients))].(outputs.NetworkClient) err := client.Connect() if err == nil { - closing(client) + closing(log, client) break } else { if !logged { - logp.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.") + log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.") logged = true } debugf("Monitoring could not connect to elasticsearch, failed with %v", err) @@ -195,7 +204,7 @@ func (r *reporter) initLoop(c config) { } } - logp.Info("Successfully connected to X-Pack Monitoring endpoint.") + log.Info("Successfully connected to X-Pack Monitoring endpoint.") // Start collector and send loop if monitoring endpoint has been found. go r.snapshotLoop("state", "state", c.StatePeriod) @@ -207,8 +216,10 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration) ticker := time.NewTicker(period) defer ticker.Stop() - logp.Info("Start monitoring %s metrics snapshot loop with period %s.", namespace, period) - defer logp.Info("Stop monitoring %s metrics snapshot loop.", namespace) + log := r.logger + + log.Infof("Start monitoring %s metrics snapshot loop with period %s.", namespace, period) + defer log.Infof("Stop monitoring %s metrics snapshot loop.", namespace) for { var ts time.Time @@ -277,9 +288,9 @@ func makeClient( return newPublishClient(esClient, params), nil } -func closing(c io.Closer) { +func closing(log *logp.Logger, c io.Closer) { if err := c.Close(); err != nil { - logp.Warn("Closed failed with: %v", err) + log.Warnf("Closed failed with: %v", err) } } diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 4fb5282f5825..7ed0ca95c112 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -38,6 +38,14 @@ var publishDisabled = false const defaultQueueType = "mem" +// Monitors configures visibility for observing state and progress of the +// pipeline. +type Monitors struct { + Metrics *monitoring.Registry + Telemetry *monitoring.Registry + Logger *logp.Logger +} + func init() { flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing") } @@ -46,12 +54,17 @@ func init() { // configured queue and outputs. func Load( beatInfo beat.Info, - reg *monitoring.Registry, + monitors Monitors, config Config, outcfg common.ConfigNamespace, ) (*Pipeline, error) { + log := monitors.Logger + if log == nil { + log = logp.L() + } + if publishDisabled { - logp.Info("Dry run mode. All output types except the file based one are disabled.") + log.Info("Dry run mode. All output types except the file based one are disabled.") } processors, err := processors.New(config.Processors) @@ -80,49 +93,54 @@ func Load( }, } - queueBuilder, err := createQueueBuilder(config.Queue) + queueBuilder, err := createQueueBuilder(config.Queue, monitors) if err != nil { return nil, err } - out, err := loadOutput(beatInfo, reg, outcfg) + out, err := loadOutput(beatInfo, monitors, outcfg) if err != nil { return nil, err } - p, err := New(beatInfo, reg, queueBuilder, out, settings) + p, err := New(beatInfo, monitors.Metrics, queueBuilder, out, settings) if err != nil { return nil, err } - logp.Info("Beat name: %s", name) + log.Info("Beat name: %s", name) return p, err } func loadOutput( beatInfo beat.Info, - reg *monitoring.Registry, + monitors Monitors, outcfg common.ConfigNamespace, ) (outputs.Group, error) { + log := monitors.Logger + if log == nil { + log = logp.L() + } + if publishDisabled { return outputs.Group{}, nil } if !outcfg.IsSet() { msg := "No outputs are defined. Please define one under the output section." - logp.Info(msg) + log.Info(msg) return outputs.Fail(errors.New(msg)) } // TODO: add support to unload/reassign outStats on output reloading var ( - outReg *monitoring.Registry + metrics *monitoring.Registry outStats outputs.Observer ) - if reg != nil { - outReg = reg.NewRegistry("output") - outStats = outputs.NewStats(outReg) + if monitors.Metrics != nil { + metrics = monitors.Metrics.NewRegistry("output") + outStats = outputs.NewStats(metrics) } out, err := outputs.Load(beatInfo, outStats, outcfg.Name(), outcfg.Config()) @@ -130,18 +148,21 @@ func loadOutput( return outputs.Fail(err) } - if outReg != nil { - monitoring.NewString(outReg, "type").Set(outcfg.Name()) + if metrics != nil { + monitoring.NewString(metrics, "type").Set(outcfg.Name()) + } + if monitors.Telemetry != nil { + telemetry := monitors.Telemetry.NewRegistry("output") + monitoring.NewString(telemetry, "name").Set(outcfg.Name()) } - - stateRegistry := monitoring.GetNamespace("state").GetRegistry() - outputRegistry := stateRegistry.NewRegistry("output") - monitoring.NewString(outputRegistry, "name").Set(outcfg.Name()) return out, nil } -func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (queue.Queue, error), error) { +func createQueueBuilder( + config common.ConfigNamespace, + monitors Monitors, +) (func(queue.Eventer) (queue.Queue, error), error) { queueType := defaultQueueType if b := config.Name(); b != "" { queueType = b @@ -157,7 +178,12 @@ func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (que queueConfig = common.NewConfig() } + if monitors.Telemetry != nil { + queueReg := monitors.Telemetry.NewRegistry("queue") + monitoring.NewString(queueReg, "name").Set(queueType) + } + return func(eventer queue.Eventer) (queue.Queue, error) { - return queueFactory(eventer, queueConfig) + return queueFactory(eventer, monitors.Logger, queueConfig) }, nil } diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index 3a71f8894bbc..e21d3f29d269 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -57,8 +57,13 @@ func RunTests( return fmt.Errorf("unpacking config failed: %v", err) } - // reg := monitoring.NewRegistry() - pipeline, err := pipeline.Load(info, nil, config.Pipeline, config.Output) + pipeline, err := pipeline.Load(info, pipeline.Monitors{ + Metrics: nil, + Telemetry: nil, + Logger: logp.L(), + }, + config.Pipeline, + config.Output) if err != nil { return fmt.Errorf("loading pipeline failed: %+v", err) } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index b582155bc0ae..34cefd6ef2a3 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -87,13 +87,17 @@ func init() { queue.RegisterType("mem", create) } -func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) { +func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (queue.Queue, error) { config := defaultConfig if err := cfg.Unpack(&config); err != nil { return nil, err } - return NewBroker(Settings{ + if logger == nil { + logger = logp.L() + } + + return NewBroker(logger, Settings{ Eventer: eventer, Events: config.Events, FlushMinEvents: config.FlushMinEvents, @@ -105,6 +109,7 @@ func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) { // If waitOnClose is set to true, the broker will block on Close, until all internal // workers handling incoming messages and ACKs have been shut down. func NewBroker( + logger logger, settings Settings, ) *Broker { // define internal channel size for producer/client requests @@ -128,9 +133,13 @@ func NewBroker( minEvents = sz } + if logger == nil { + logger = logp.NewLogger("memqueue") + } + b := &Broker{ done: make(chan struct{}), - logger: logp.NewLogger("memqueue"), + logger: logger, // broker API channels events: make(chan pushRequest, chanSize), diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 528cc39996e4..cac0c68d3a8f 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -72,7 +72,7 @@ func TestProducerCancelRemovesEvents(t *testing.T) { func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory { return func(_ *testing.T) queue.Queue { - return NewBroker(Settings{ + return NewBroker(nil, Settings{ Events: sz, FlushMinEvents: minEvents, FlushTimeout: flushTimeout, diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 201af049b94d..eca5c0c499d6 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -22,11 +22,12 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" ) // Factory for creating a queue used by a pipeline instance. -type Factory func(Eventer, *common.Config) (Queue, error) +type Factory func(Eventer, *logp.Logger, *common.Config) (Queue, error) // Eventer listens to special events to be send by queue implementations. type Eventer interface { diff --git a/libbeat/publisher/queue/spool/module.go b/libbeat/publisher/queue/spool/module.go index b53d1be42d4d..71a43c421ecb 100644 --- a/libbeat/publisher/queue/spool/module.go +++ b/libbeat/publisher/queue/spool/module.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/feature" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/publisher/queue" "github.com/elastic/go-txfile" @@ -38,7 +39,7 @@ func init() { queue.RegisterType("spool", create) } -func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) { +func create(eventer queue.Eventer, logp *logp.Logger, cfg *common.Config) (queue.Queue, error) { cfgwarn.Beta("Spooling to disk is beta") config := defaultConfig() @@ -56,7 +57,12 @@ func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) { flushEvents = uint(count) } - return NewSpool(defaultLogger(), path, Settings{ + var log logger = logp + if logp == nil { + log = defaultLogger() + } + + return NewSpool(log, path, Settings{ Eventer: eventer, Mode: config.File.Permissions, WriteBuffer: uint(config.Write.BufferSize),