Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher pipeline: pass logger and metrics registry #8091

Merged
merged 5 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Add DNS processor with support for performing reverse lookups on IP addresses. {issue}7770[7770]
- Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961]
- Count HTTP 429 responses in the elasticsearch output {pull}8056[8056]
- Report configured queue type. {pull}8091[8091]

*Auditbeat*

Expand Down
9 changes: 8 additions & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,14 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
}

debugf("Initializing output plugins")
pipeline, err := pipeline.Load(b.Info, reg, b.Config.Pipeline, b.Config.Output)
pipeline, err := pipeline.Load(b.Info,
pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
},
b.Config.Pipeline,
b.Config.Output)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %+v", err)
}
Expand Down
39 changes: 25 additions & 14 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (
)

type reporter struct {
done *stopper
done *stopper
logger *logp.Logger

checkRetry time.Duration

Expand All @@ -57,7 +58,9 @@ type reporter struct {
out outputs.Group
}

var debugf = logp.MakeDebug("monitoring")
const selector = "monitoring"

var debugf = logp.MakeDebug(selector)

var errNoMonitoring = errors.New("xpack monitoring not available")

Expand All @@ -72,6 +75,8 @@ func init() {
}

func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
log := logp.L().Named(selector)

config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand All @@ -89,7 +94,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
return nil, err
}
if proxyURL != nil {
logp.Info("Using proxy URL: %s", proxyURL)
log.Infof("Using proxy URL: %s", proxyURL)
}
tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
Expand Down Expand Up @@ -123,10 +128,11 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
}

queueFactory := func(e queue.Eventer) (queue.Queue, error) {
return memqueue.NewBroker(memqueue.Settings{
Eventer: e,
Events: 20,
}), nil
return memqueue.NewBroker(log,
memqueue.Settings{
Eventer: e,
Events: 20,
}), nil
}

monitoring := monitoring.Default.GetRegistry("xpack.monitoring")
Expand All @@ -149,6 +155,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
}

r := &reporter{
logger: log,
done: newStopper(),
beatMeta: makeMeta(beat),
tags: config.Tags,
Expand All @@ -171,18 +178,20 @@ func (r *reporter) initLoop(c config) {
debugf("Start monitoring endpoint init loop.")
defer debugf("Finish monitoring endpoint init loop.")

log := r.logger

logged := false

for {
// Select one configured endpoint by random and check if xpack is available
client := r.out.Clients[rand.Intn(len(r.out.Clients))].(outputs.NetworkClient)
err := client.Connect()
if err == nil {
closing(client)
closing(log, client)
break
} else {
if !logged {
logp.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.")
log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.")
logged = true
}
debugf("Monitoring could not connect to elasticsearch, failed with %v", err)
Expand All @@ -195,7 +204,7 @@ func (r *reporter) initLoop(c config) {
}
}

logp.Info("Successfully connected to X-Pack Monitoring endpoint.")
log.Info("Successfully connected to X-Pack Monitoring endpoint.")

// Start collector and send loop if monitoring endpoint has been found.
go r.snapshotLoop("state", "state", c.StatePeriod)
Expand All @@ -207,8 +216,10 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration)
ticker := time.NewTicker(period)
defer ticker.Stop()

logp.Info("Start monitoring %s metrics snapshot loop with period %s.", namespace, period)
defer logp.Info("Stop monitoring %s metrics snapshot loop.", namespace)
log := r.logger

log.Infof("Start monitoring %s metrics snapshot loop with period %s.", namespace, period)
defer log.Infof("Stop monitoring %s metrics snapshot loop.", namespace)

for {
var ts time.Time
Expand Down Expand Up @@ -277,9 +288,9 @@ func makeClient(
return newPublishClient(esClient, params), nil
}

func closing(c io.Closer) {
func closing(log *logp.Logger, c io.Closer) {
if err := c.Close(); err != nil {
logp.Warn("Closed failed with: %v", err)
log.Warnf("Closed failed with: %v", err)
}
}

Expand Down
66 changes: 46 additions & 20 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ var publishDisabled = false

const defaultQueueType = "mem"

// Monitors configures visibility for observing state and progress of the
// pipeline.
type Monitors struct {
Metrics *monitoring.Registry
Telemetry *monitoring.Registry
Logger *logp.Logger
}

func init() {
flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing")
}
Expand All @@ -46,12 +54,17 @@ func init() {
// configured queue and outputs.
func Load(
beatInfo beat.Info,
reg *monitoring.Registry,
monitors Monitors,
config Config,
outcfg common.ConfigNamespace,
) (*Pipeline, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
logp.Info("Dry run mode. All output types except the file based one are disabled.")
log.Info("Dry run mode. All output types except the file based one are disabled.")
}

processors, err := processors.New(config.Processors)
Expand Down Expand Up @@ -80,68 +93,76 @@ func Load(
},
}

queueBuilder, err := createQueueBuilder(config.Queue)
queueBuilder, err := createQueueBuilder(config.Queue, monitors)
if err != nil {
return nil, err
}

out, err := loadOutput(beatInfo, reg, outcfg)
out, err := loadOutput(beatInfo, monitors, outcfg)
if err != nil {
return nil, err
}

p, err := New(beatInfo, reg, queueBuilder, out, settings)
p, err := New(beatInfo, monitors.Metrics, queueBuilder, out, settings)
if err != nil {
return nil, err
}

logp.Info("Beat name: %s", name)
log.Info("Beat name: %s", name)
return p, err
}

func loadOutput(
beatInfo beat.Info,
reg *monitoring.Registry,
monitors Monitors,
outcfg common.ConfigNamespace,
) (outputs.Group, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
return outputs.Group{}, nil
}

if !outcfg.IsSet() {
msg := "No outputs are defined. Please define one under the output section."
logp.Info(msg)
log.Info(msg)
return outputs.Fail(errors.New(msg))
}

// TODO: add support to unload/reassign outStats on output reloading

var (
outReg *monitoring.Registry
metrics *monitoring.Registry
outStats outputs.Observer
)
if reg != nil {
outReg = reg.NewRegistry("output")
outStats = outputs.NewStats(outReg)
if monitors.Metrics != nil {
metrics = monitors.Metrics.NewRegistry("output")
outStats = outputs.NewStats(metrics)
}

out, err := outputs.Load(beatInfo, outStats, outcfg.Name(), outcfg.Config())
if err != nil {
return outputs.Fail(err)
}

if outReg != nil {
monitoring.NewString(outReg, "type").Set(outcfg.Name())
if metrics != nil {
monitoring.NewString(metrics, "type").Set(outcfg.Name())
}
if monitors.Telemetry != nil {
telemetry := monitors.Telemetry.NewRegistry("output")
monitoring.NewString(telemetry, "name").Set(outcfg.Name())
}

stateRegistry := monitoring.GetNamespace("state").GetRegistry()
outputRegistry := stateRegistry.NewRegistry("output")
monitoring.NewString(outputRegistry, "name").Set(outcfg.Name())

return out, nil
}

func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (queue.Queue, error), error) {
func createQueueBuilder(
config common.ConfigNamespace,
monitors Monitors,
) (func(queue.Eventer) (queue.Queue, error), error) {
queueType := defaultQueueType
if b := config.Name(); b != "" {
queueType = b
Expand All @@ -157,7 +178,12 @@ func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (que
queueConfig = common.NewConfig()
}

if monitors.Telemetry != nil {
queueReg := monitors.Telemetry.NewRegistry("queue")
monitoring.NewString(queueReg, "name").Set(queueType)
}

return func(eventer queue.Eventer) (queue.Queue, error) {
return queueFactory(eventer, queueConfig)
return queueFactory(eventer, monitors.Logger, queueConfig)
}, nil
}
9 changes: 7 additions & 2 deletions libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ func RunTests(
return fmt.Errorf("unpacking config failed: %v", err)
}

// reg := monitoring.NewRegistry()
pipeline, err := pipeline.Load(info, nil, config.Pipeline, config.Output)
pipeline, err := pipeline.Load(info, pipeline.Monitors{
Metrics: nil,
Telemetry: nil,
Logger: logp.L(),
},
config.Pipeline,
config.Output)
if err != nil {
return fmt.Errorf("loading pipeline failed: %+v", err)
}
Expand Down
15 changes: 12 additions & 3 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,17 @@ func init() {
queue.RegisterType("mem", create)
}

func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) {
func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (queue.Queue, error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

return NewBroker(Settings{
if logger == nil {
logger = logp.L()
}

return NewBroker(logger, Settings{
Eventer: eventer,
Events: config.Events,
FlushMinEvents: config.FlushMinEvents,
Expand All @@ -105,6 +109,7 @@ func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) {
// If waitOnClose is set to true, the broker will block on Close, until all internal
// workers handling incoming messages and ACKs have been shut down.
func NewBroker(
logger logger,
settings Settings,
) *Broker {
// define internal channel size for producer/client requests
Expand All @@ -128,9 +133,13 @@ func NewBroker(
minEvents = sz
}

if logger == nil {
logger = logp.NewLogger("memqueue")
}

b := &Broker{
done: make(chan struct{}),
logger: logp.NewLogger("memqueue"),
logger: logger,

// broker API channels
events: make(chan pushRequest, chanSize),
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestProducerCancelRemovesEvents(t *testing.T) {

func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory {
return func(_ *testing.T) queue.Queue {
return NewBroker(Settings{
return NewBroker(nil, Settings{
Events: sz,
FlushMinEvents: minEvents,
FlushTimeout: flushTimeout,
Expand Down
3 changes: 2 additions & 1 deletion libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
)

// Factory for creating a queue used by a pipeline instance.
type Factory func(Eventer, *common.Config) (Queue, error)
type Factory func(Eventer, *logp.Logger, *common.Config) (Queue, error)

// Eventer listens to special events to be send by queue implementations.
type Eventer interface {
Expand Down
10 changes: 8 additions & 2 deletions libbeat/publisher/queue/spool/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/feature"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/publisher/queue"
"github.com/elastic/go-txfile"
Expand All @@ -38,7 +39,7 @@ func init() {
queue.RegisterType("spool", create)
}

func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) {
func create(eventer queue.Eventer, logp *logp.Logger, cfg *common.Config) (queue.Queue, error) {
cfgwarn.Beta("Spooling to disk is beta")

config := defaultConfig()
Expand All @@ -56,7 +57,12 @@ func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) {
flushEvents = uint(count)
}

return NewSpool(defaultLogger(), path, Settings{
var log logger = logp
if logp == nil {
log = defaultLogger()
}

return NewSpool(log, path, Settings{
Eventer: eventer,
Mode: config.File.Permissions,
WriteBuffer: uint(config.Write.BufferSize),
Expand Down