Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified Pipeline/output metrics #4663

Merged
merged 3 commits into from
Jul 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libbeat/cmd/test/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func GenTestOutputCmd(name, beatVersion string) *cobra.Command {
os.Exit(1)
}

output, err := outputs.Load(b.Info, b.Config.Output.Name(), b.Config.Output.Config())
output, err := outputs.Load(b.Info, nil, b.Config.Output.Name(), b.Config.Output.Config())
if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing output: %s\n", err)
os.Exit(1)
Expand Down
57 changes: 45 additions & 12 deletions libbeat/monitoring/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"math"
"strconv"
"sync"
"sync/atomic"

"github.com/elastic/beats/libbeat/common/atomic"
)

// makeExpvar wraps a callback for registering a metrics with expvar.Publish.
type makeExpvar func() string

// Int is a 64 bit integer variable satisfying the Var interface.
type Int struct{ i int64 }
type Int struct{ i atomic.Int64 }

// NewInt creates and registers a new integer variable.
//
Expand All @@ -32,15 +33,47 @@ func NewInt(r *Registry, name string, opts ...Option) *Int {
return v
}

func (v *Int) Get() int64 { return atomic.LoadInt64(&v.i) }
func (v *Int) Set(value int64) { atomic.StoreInt64(&v.i, value) }
func (v *Int) Add(delta int64) { atomic.AddInt64(&v.i, delta) }
func (v *Int) Inc() { atomic.AddInt64(&v.i, 1) }
func (v *Int) Dec() { atomic.AddInt64(&v.i, -1) }
func (v *Int) Get() int64 { return v.i.Load() }
func (v *Int) Set(value int64) { v.i.Store(value) }
func (v *Int) Add(delta int64) { v.i.Add(delta) }
func (v *Int) Sub(delta int64) { v.i.Sub(delta) }
func (v *Int) Inc() { v.i.Inc() }
func (v *Int) Dec() { v.i.Dec() }
func (v *Int) Visit(_ Mode, vs Visitor) { vs.OnInt(v.Get()) }

// Uint is a 64bit unsigned integer variable satisfying the Var interface.
type Uint struct{ u atomic.Uint64 }

// NewUint creates and registers a new unsigned integer variable.
//
// Note: If the registry is configured to publish variables to expvar, the
// variable will be available via expvars package as well, but can not be removed
// anymore.
func NewUint(r *Registry, name string, opts ...Option) *Uint {
if r == nil {
r = Default
}

v := &Uint{}
addVar(r, name, opts, v, makeExpvar(func() string {
return strconv.FormatUint(v.Get(), 10)
}))
return v
}

func (v *Uint) Get() uint64 { return v.u.Load() }
func (v *Uint) Set(value uint64) { v.u.Store(value) }
func (v *Uint) Add(delta uint64) { v.u.Add(delta) }
func (v *Uint) Sub(delta uint64) { v.u.Sub(delta) }
func (v *Uint) Inc() { v.u.Inc() }
func (v *Uint) Dec() { v.u.Dec() }
func (v *Uint) Visit(_ Mode, vs Visitor) {
value := v.Get() & (^uint64(1 << 63))
vs.OnInt(int64(value))
}

// Float is a 64 bit float variable satisfying the Var interface.
type Float struct{ f uint64 }
type Float struct{ f atomic.Uint64 }

// NewFloat creates and registers a new float variable.
//
Expand All @@ -59,16 +92,16 @@ func NewFloat(r *Registry, name string, opts ...Option) *Float {
return v
}

func (v *Float) Get() float64 { return math.Float64frombits(atomic.LoadUint64(&v.f)) }
func (v *Float) Set(value float64) { atomic.StoreUint64(&v.f, math.Float64bits(value)) }
func (v *Float) Get() float64 { return math.Float64frombits(v.f.Load()) }
func (v *Float) Set(value float64) { v.f.Store(math.Float64bits(value)) }
func (v *Float) Sub(delta float64) { v.Add(-delta) }
func (v *Float) Visit(_ Mode, vs Visitor) { vs.OnFloat(v.Get()) }

func (v *Float) Add(delta float64) {
for {
cur := atomic.LoadUint64(&v.f)
cur := v.f.Load()
next := math.Float64bits(math.Float64frombits(cur) + delta)
if atomic.CompareAndSwapUint64(&v.f, cur, next) {
if v.f.CAS(cur, next) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tricky abbreviation. only know what it means because of the diff

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh... to me CAS on atomics is pretty well known/common :)

return
}
}
Expand Down
13 changes: 9 additions & 4 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,15 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er
Events: 20,
}), nil
}
pipeline, err := pipeline.New(brokerFactory, out, pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
})

monitoring := monitoring.Default.NewRegistry("xpack.monitoring")

pipeline, err := pipeline.New(
monitoring,
brokerFactory, out, pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
})
if err != nil {
return nil, err
}
Expand Down
41 changes: 31 additions & 10 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

type console struct {
out *os.File
stats *outputs.Stats
writer *bufio.Writer
codec codec.Codec
index string
Expand All @@ -33,7 +34,11 @@ func init() {
outputs.RegisterType("console", makeConsole)
}

func makeConsole(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error) {
func makeConsole(
beat common.BeatInfo,
stats *outputs.Stats,
cfg *common.Config,
) (outputs.Group, error) {
config := defaultConfig
err := cfg.Unpack(&config)
if err != nil {
Expand All @@ -51,7 +56,7 @@ func makeConsole(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error
}

index := beat.Beat
c, err := newConsole(index, enc)
c, err := newConsole(index, stats, enc)
if err != nil {
return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err))
}
Expand All @@ -67,46 +72,62 @@ func makeConsole(beat common.BeatInfo, cfg *common.Config) (outputs.Group, error
return outputs.Success(config.BatchSize, 0, c)
}

func newConsole(index string, codec codec.Codec) (*console, error) {
c := &console{out: os.Stdout, codec: codec, index: index}
func newConsole(index string, stats *outputs.Stats, codec codec.Codec) (*console, error) {
c := &console{out: os.Stdout, codec: codec, stats: stats, index: index}
c.writer = bufio.NewWriterSize(c.out, 8*1024)
return c, nil
}

func (c *console) Close() error { return nil }
func (c *console) Publish(batch publisher.Batch) error {
st := c.stats
events := batch.Events()
st.NewBatch(len(events))

dropped := 0
for i := range events {
c.publishEvent(&events[i])
ok := c.publishEvent(&events[i])
if !ok {
dropped++
}
}

c.writer.Flush()
batch.ACK()

st.Dropped(dropped)
st.Acked(len(events) - dropped)

return nil
}

var nl = []byte("\n")

func (c *console) publishEvent(event *publisher.Event) {
func (c *console) publishEvent(event *publisher.Event) bool {
serializedEvent, err := c.codec.Encode(c.index, &event.Content)
if err != nil {
if !event.Guaranteed() {
return
return false
}

logp.Critical("Unable to encode event: %v", err)
return
return false
}

if err := c.writeBuffer(serializedEvent); err != nil {
c.stats.WriteError()
logp.Critical("Unable to publish events to console: %v", err)
return
return false
}

if err := c.writeBuffer(nl); err != nil {
c.stats.WriteError()
logp.Critical("Error when appending newline to event: %v", err)
return
return false
}

c.stats.WriteBytes(len(serializedEvent) + 1)
return true
}

func (c *console) writeBuffer(buf []byte) error {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/console/console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestConsoleOutput(t *testing.T) {

func run(codec codec.Codec, batches ...publisher.Batch) (string, error) {
return withStdout(func() {
c, _ := newConsole("test", codec)
c, _ := newConsole("test", nil, codec)
for _, b := range batches {
c.Publish(b)
}
Expand Down
46 changes: 22 additions & 24 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
Expand Down Expand Up @@ -41,7 +40,7 @@ type Client struct {
compressionLevel int
proxyURL *url.URL

stats *ClientStats
stats *outputs.Stats
}

// ClientSettings contains the settings for a client.
Expand All @@ -56,14 +55,7 @@ type ClientSettings struct {
Pipeline *outil.Selector
Timeout time.Duration
CompressionLevel int
Stats *ClientStats
}

type ClientStats struct {
PublishCallCount *monitoring.Int
EventsACKed *monitoring.Int
EventsFailed *monitoring.Int
IO *transport.IOStats
Stats *outputs.Stats
}

type connectCallback func(client *Client) error
Expand Down Expand Up @@ -139,9 +131,9 @@ func NewClient(
return nil, err
}

if st := s.Stats; st != nil && st.IO != nil {
dialer = transport.StatsDialer(dialer, st.IO)
tlsDialer = transport.StatsDialer(tlsDialer, st.IO)
if st := s.Stats; st != nil {
dialer = transport.StatsDialer(dialer, st)
tlsDialer = transport.StatsDialer(tlsDialer, st)
}

params := s.Parameters
Expand Down Expand Up @@ -251,8 +243,10 @@ func (client *Client) publishEvents(
data []publisher.Event,
) ([]publisher.Event, error) {
begin := time.Now()
if st := client.stats; st != nil && st.PublishCallCount != nil {
st.PublishCallCount.Add(1)
st := client.stats

if st != nil {
st.NewBatch(len(data))
}

if len(data) == 0 {
Expand All @@ -264,8 +258,14 @@ func (client *Client) publishEvents(

// encode events into bulk request buffer, dropping failed elements from
// events slice

origCount := len(data)
data = bulkEncodePublishRequest(body, client.index, client.pipeline, data)
if len(data) == 0 {
newCount := len(data)
if st != nil && origCount > newCount {
st.Dropped(origCount - newCount)
}
if newCount == 0 {
return nil, nil
}

Expand All @@ -290,22 +290,20 @@ func (client *Client) publishEvents(
failedEvents = bulkCollectPublishFails(&client.json, data)
}

failed := len(failedEvents)
if st := client.stats; st != nil {
countOK := int64(len(data) - len(failedEvents))
st.EventsACKed.Add(countOK)
outputs.AckedEvents.Add(countOK)
if failed := int64(len(failedEvents)); failed > 0 {
st.EventsFailed.Add(failed)
}
acked := len(data) - failed

st.Acked(acked)
st.Failed(failed)
}

if len(failedEvents) > 0 {
if failed > 0 {
if sendErr == nil {
sendErr = errTempBulkFailure
}
return failedEvents, sendErr
}

return nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.Client, *Client) {
t.Fatal(err)
}

output, err := makeES(common.BeatInfo{Beat: "libbeat"}, config)
output, err := makeES(common.BeatInfo{Beat: "libbeat"}, nil, config)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading