Skip to content

Commit

Permalink
Implement telegraf collecting stats on itself
Browse files Browse the repository at this point in the history
closes #1348
  • Loading branch information
sparrc committed Nov 7, 2016
1 parent 5d3850c commit 7f15abd
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 38 deletions.
7 changes: 6 additions & 1 deletion accumulator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package telegraf

import "time"
import (
"time"
)

// Accumulator is an interface for "accumulating" metrics from plugin(s).
// The metrics are sent down a channel shared between all plugins.
Expand Down Expand Up @@ -28,6 +30,9 @@ type Accumulator interface {
tags map[string]string,
t ...time.Time)

// TODO document
AddMetrics(metrics []Metric)

SetPrecision(precision, interval time.Duration)

AddError(err error)
Expand Down
14 changes: 11 additions & 3 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package agent

import (
"log"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

var (
NErrors = selfstat.New("agent", "gather_errors", map[string]string{})
)

type MetricMaker interface {
Expand Down Expand Up @@ -37,8 +41,12 @@ type accumulator struct {
maker MetricMaker

precision time.Duration
}

errCount uint64
func (ac *accumulator) AddMetrics(metrics []telegraf.Metric) {
for _, m := range metrics {
ac.metrics <- m
}
}

func (ac *accumulator) AddFields(
Expand Down Expand Up @@ -80,7 +88,7 @@ func (ac *accumulator) AddError(err error) {
if err == nil {
return
}
atomic.AddUint64(&ac.errCount, 1)
NErrors.Incr(1)
//TODO suppress/throttle consecutive duplicate errors?
log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err)
}
Expand Down
3 changes: 0 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,6 @@ func (a *Agent) Test() error {
if err := input.Input.Gather(acc); err != nil {
return err
}
if acc.errCount > 0 {
return fmt.Errorf("Errors encountered during processing")
}

// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
Expand Down
25 changes: 8 additions & 17 deletions internal/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import (
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

var (
MetricsGathered = selfstat.New("agent", "metrics_gathered", map[string]string{})
MetricsDropped = selfstat.New("agent", "metrics_dropped", map[string]string{})
)

// Buffer is an object for storing metrics in a circular buffer.
type Buffer struct {
buf chan telegraf.Metric
// total dropped metrics
drops int
// total metrics added
total int

mu sync.Mutex
}
Expand All @@ -36,25 +38,14 @@ func (b *Buffer) Len() int {
return len(b.buf)
}

// Drops returns the total number of dropped metrics that have occured in this
// buffer since instantiation.
func (b *Buffer) Drops() int {
return b.drops
}

// Total returns the total number of metrics that have been added to this buffer.
func (b *Buffer) Total() int {
return b.total
}

// Add adds metrics to the buffer.
func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i, _ := range metrics {
b.total++
select {
case b.buf <- metrics[i]:
MetricsGathered.Incr(1)
default:
b.drops++
MetricsDropped.Incr(1)
<-b.buf
b.buf <- metrics[i]
}
Expand Down
5 changes: 1 addition & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,10 +821,7 @@ func (c *Config) addInput(name string, table *ast.Table) error {
return err
}

rp := &models.RunningInput{
Input: input,
Config: pluginConfig,
}
rp := models.NewRunningInput(input, pluginConfig)
c.Inputs = append(c.Inputs, rp)
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions internal/models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/selfstat"
)

type RunningInput struct {
Expand All @@ -14,6 +15,19 @@ type RunningInput struct {
trace bool
debug bool
defaultTags map[string]string

MetricsGathered selfstat.Stat
}

func NewRunningInput(
input telegraf.Input,
config *InputConfig,
) *RunningInput {
return &RunningInput{
Input: input,
Config: config,
MetricsGathered: selfstat.New("inputs", "metrics_gathered", map[string]string{"input": config.Name}),
}
}

// InputConfig containing a name, interval, and filter
Expand Down Expand Up @@ -60,6 +74,7 @@ func (r *RunningInput) MakeMetric(
fmt.Println("> " + m.String())
}

r.MetricsGathered.Incr(1)
return m
}

Expand Down
21 changes: 11 additions & 10 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/buffer"
"github.com/influxdata/telegraf/selfstat"
)

const (
Expand All @@ -24,6 +25,9 @@ type RunningOutput struct {
Quiet bool
MetricBufferLimit int
MetricBatchSize int
MetricsWritten selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat

metrics *buffer.Buffer
failMetrics *buffer.Buffer
Expand All @@ -50,7 +54,11 @@ func NewRunningOutput(
Config: conf,
MetricBufferLimit: bufferLimit,
MetricBatchSize: batchSize,
MetricsWritten: selfstat.New("outputs", "metrics_written", map[string]string{"output": name}),
BufferSize: selfstat.New("outputs", "buffer_size", map[string]string{"output": name}),
BufferLimit: selfstat.New("outputs", "buffer_limit", map[string]string{"output": name}),
}
ro.BufferLimit.Set(int64(ro.MetricBufferLimit))
return ro
}

Expand Down Expand Up @@ -84,16 +92,7 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {

// Write writes all cached points to this output.
func (ro *RunningOutput) Write() error {
if !ro.Quiet {
log.Printf("I! Output [%s] buffer fullness: %d / %d metrics. "+
"Total gathered metrics: %d. Total dropped metrics: %d.",
ro.Name,
ro.failMetrics.Len()+ro.metrics.Len(),
ro.MetricBufferLimit,
ro.metrics.Total(),
ro.metrics.Drops()+ro.failMetrics.Drops())
}

ro.BufferSize.Set(int64(ro.failMetrics.Len() + ro.metrics.Len()))
var err error
if !ro.failMetrics.IsEmpty() {
bufLen := ro.failMetrics.Len()
Expand Down Expand Up @@ -141,6 +140,8 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
err := ro.Output.Write(metrics)
elapsed := time.Since(start)
if err == nil {
ro.MetricsWritten.Incr(int64(len(metrics)))
// TODO write one-off "elapsed" metric and remove this log message
if !ro.Quiet {
log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n",
ro.Name, len(metrics), elapsed)
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/self"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
Expand Down
34 changes: 34 additions & 0 deletions plugins/inputs/http_listener/http_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/selfstat"
)

const (
Expand Down Expand Up @@ -43,6 +44,17 @@ type HTTPListener struct {
parser influx.InfluxParser
acc telegraf.Accumulator
pool *pool

BytesRecv selfstat.Stat
RequestsServed selfstat.Stat
WritesServed selfstat.Stat
QueriesServed selfstat.Stat
PingsServed selfstat.Stat
RequestsRecv selfstat.Stat
WritesRecv selfstat.Stat
QueriesRecv selfstat.Stat
PingsRecv selfstat.Stat
NotFoundsServed selfstat.Stat
}

const sampleConfig = `
Expand Down Expand Up @@ -81,6 +93,18 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
h.mu.Lock()
defer h.mu.Unlock()

tags := map[string]string{"input": "http_listener"}
h.BytesRecv = selfstat.New("inputs", "bytes_received", tags)
h.RequestsServed = selfstat.New("inputs", "requests_served", tags)
h.WritesServed = selfstat.New("inputs", "writes_served", tags)
h.QueriesServed = selfstat.New("inputs", "queries_served", tags)
h.PingsServed = selfstat.New("inputs", "pings_served", tags)
h.RequestsRecv = selfstat.New("inputs", "requests_received", tags)
h.WritesRecv = selfstat.New("inputs", "writes_received", tags)
h.QueriesRecv = selfstat.New("inputs", "queries_received", tags)
h.PingsRecv = selfstat.New("inputs", "pings_received", tags)
h.NotFoundsServed = selfstat.New("inputs", "not_founds_served", tags)

if h.MaxBodySize == 0 {
h.MaxBodySize = DEFAULT_MAX_BODY_SIZE
}
Expand Down Expand Up @@ -141,20 +165,29 @@ func (h *HTTPListener) httpListen() error {
}

func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
h.RequestsRecv.Incr(1)
defer h.RequestsServed.Incr(1)
switch req.URL.Path {
case "/write":
h.WritesRecv.Incr(1)
defer h.WritesServed.Incr(1)
h.serveWrite(res, req)
case "/query":
h.QueriesRecv.Incr(1)
defer h.QueriesServed.Incr(1)
// Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusOK)
res.Write([]byte("{\"results\":[]}"))
case "/ping":
h.PingsRecv.Incr(1)
defer h.PingsServed.Incr(1)
// respond to ping requests
res.WriteHeader(http.StatusNoContent)
default:
defer h.NotFoundsServed.Incr(1)
// Don't know how to respond to calls to other endpoints
http.NotFound(res, req)
}
Expand Down Expand Up @@ -195,6 +228,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) {
badRequest(res)
return
}
h.BytesRecv.Incr(int64(n))

if err == io.EOF {
if return400 {
Expand Down
32 changes: 32 additions & 0 deletions plugins/inputs/self/self.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package self

import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/selfstat"
)

type Self struct {
}

var sampleConfig = `
`

func (s *Self) Description() string {
return "Collect statistics about itself"
}

func (s *Self) SampleConfig() string {
return sampleConfig
}

func (s *Self) Gather(acc telegraf.Accumulator) error {
acc.AddMetrics(selfstat.Metrics())
return nil
}

func init() {
inputs.Add("self", func() telegraf.Input {
return &Self{}
})
}
Loading

0 comments on commit 7f15abd

Please sign in to comment.