diff --git a/accumulator.go b/accumulator.go index 13fd6e5711852..fe0b9a4c211c7 100644 --- a/accumulator.go +++ b/accumulator.go @@ -1,6 +1,8 @@ package telegraf -import "time" +import ( + "time" +) // Accumulator is an interface for "accumulating" metrics from plugin(s). // The metrics are sent down a channel shared between all plugins. @@ -28,6 +30,9 @@ type Accumulator interface { tags map[string]string, t ...time.Time) + // TODO document + AddMetrics(metrics []Metric) + SetPrecision(precision, interval time.Duration) AddError(err error) diff --git a/agent/accumulator.go b/agent/accumulator.go index 0d682d2857b6d..2996aa0f4c704 100644 --- a/agent/accumulator.go +++ b/agent/accumulator.go @@ -2,10 +2,14 @@ package agent import ( "log" - "sync/atomic" "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +var ( + NErrors = selfstat.New("agent", "gather_errors", map[string]string{}) ) type MetricMaker interface { @@ -37,8 +41,12 @@ type accumulator struct { maker MetricMaker precision time.Duration +} - errCount uint64 +func (ac *accumulator) AddMetrics(metrics []telegraf.Metric) { + for _, m := range metrics { + ac.metrics <- m + } } func (ac *accumulator) AddFields( @@ -80,7 +88,7 @@ func (ac *accumulator) AddError(err error) { if err == nil { return } - atomic.AddUint64(&ac.errCount, 1) + NErrors.Incr(1) //TODO suppress/throttle consecutive duplicate errors? log.Printf("E! Error in plugin [%s]: %s", ac.maker.Name(), err) } diff --git a/agent/agent.go b/agent/agent.go index 1a205e218b203..bbe2cbe75a83c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -204,9 +204,6 @@ func (a *Agent) Test() error { if err := input.Input.Gather(acc); err != nil { return err } - if acc.errCount > 0 { - return fmt.Errorf("Errors encountered during processing") - } // Special instructions for some inputs. cpu, for example, needs to be // run twice in order to return cpu usage percentages. diff --git a/internal/buffer/buffer.go b/internal/buffer/buffer.go index 58cd1c3764d7e..7e31303f35418 100644 --- a/internal/buffer/buffer.go +++ b/internal/buffer/buffer.go @@ -4,15 +4,17 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" +) + +var ( + MetricsGathered = selfstat.New("agent", "metrics_gathered", map[string]string{}) + MetricsDropped = selfstat.New("agent", "metrics_dropped", map[string]string{}) ) // Buffer is an object for storing metrics in a circular buffer. type Buffer struct { buf chan telegraf.Metric - // total dropped metrics - drops int - // total metrics added - total int mu sync.Mutex } @@ -36,25 +38,14 @@ func (b *Buffer) Len() int { return len(b.buf) } -// Drops returns the total number of dropped metrics that have occured in this -// buffer since instantiation. -func (b *Buffer) Drops() int { - return b.drops -} - -// Total returns the total number of metrics that have been added to this buffer. -func (b *Buffer) Total() int { - return b.total -} - // Add adds metrics to the buffer. func (b *Buffer) Add(metrics ...telegraf.Metric) { for i, _ := range metrics { - b.total++ select { case b.buf <- metrics[i]: + MetricsGathered.Incr(1) default: - b.drops++ + MetricsDropped.Incr(1) <-b.buf b.buf <- metrics[i] } diff --git a/internal/config/config.go b/internal/config/config.go index 2c2199dacb00e..24dec4169bfc9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -821,10 +821,7 @@ func (c *Config) addInput(name string, table *ast.Table) error { return err } - rp := &models.RunningInput{ - Input: input, - Config: pluginConfig, - } + rp := models.NewRunningInput(input, pluginConfig) c.Inputs = append(c.Inputs, rp) return nil } diff --git a/internal/models/running_input.go b/internal/models/running_input.go index 558af3e5c0d12..9c6286cf1c6aa 100644 --- a/internal/models/running_input.go +++ b/internal/models/running_input.go @@ -5,6 +5,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/selfstat" ) type RunningInput struct { @@ -14,6 +15,19 @@ type RunningInput struct { trace bool debug bool defaultTags map[string]string + + MetricsGathered selfstat.Stat +} + +func NewRunningInput( + input telegraf.Input, + config *InputConfig, +) *RunningInput { + return &RunningInput{ + Input: input, + Config: config, + MetricsGathered: selfstat.New("inputs", "metrics_gathered", map[string]string{"input": config.Name}), + } } // InputConfig containing a name, interval, and filter @@ -60,6 +74,7 @@ func (r *RunningInput) MakeMetric( fmt.Println("> " + m.String()) } + r.MetricsGathered.Incr(1) return m } diff --git a/internal/models/running_output.go b/internal/models/running_output.go index aa94178f74145..4e1b4b4f4ed2b 100644 --- a/internal/models/running_output.go +++ b/internal/models/running_output.go @@ -6,6 +6,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -24,6 +25,9 @@ type RunningOutput struct { Quiet bool MetricBufferLimit int MetricBatchSize int + MetricsWritten selfstat.Stat + BufferSize selfstat.Stat + BufferLimit selfstat.Stat metrics *buffer.Buffer failMetrics *buffer.Buffer @@ -50,7 +54,11 @@ func NewRunningOutput( Config: conf, MetricBufferLimit: bufferLimit, MetricBatchSize: batchSize, + MetricsWritten: selfstat.New("outputs", "metrics_written", map[string]string{"output": name}), + BufferSize: selfstat.New("outputs", "buffer_size", map[string]string{"output": name}), + BufferLimit: selfstat.New("outputs", "buffer_limit", map[string]string{"output": name}), } + ro.BufferLimit.Set(int64(ro.MetricBufferLimit)) return ro } @@ -84,16 +92,7 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) { // Write writes all cached points to this output. func (ro *RunningOutput) Write() error { - if !ro.Quiet { - log.Printf("I! Output [%s] buffer fullness: %d / %d metrics. "+ - "Total gathered metrics: %d. Total dropped metrics: %d.", - ro.Name, - ro.failMetrics.Len()+ro.metrics.Len(), - ro.MetricBufferLimit, - ro.metrics.Total(), - ro.metrics.Drops()+ro.failMetrics.Drops()) - } - + ro.BufferSize.Set(int64(ro.failMetrics.Len() + ro.metrics.Len())) var err error if !ro.failMetrics.IsEmpty() { bufLen := ro.failMetrics.Len() @@ -141,6 +140,8 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error { err := ro.Output.Write(metrics) elapsed := time.Since(start) if err == nil { + ro.MetricsWritten.Incr(int64(len(metrics))) + // TODO write one-off "elapsed" metric and remove this log message if !ro.Quiet { log.Printf("I! Output [%s] wrote batch of %d metrics in %s\n", ro.Name, len(metrics), elapsed) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 67b85905e8dc5..c7438b9a18ac6 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -62,6 +62,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/riak" + _ "github.com/influxdata/telegraf/plugins/inputs/self" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" _ "github.com/influxdata/telegraf/plugins/inputs/snmp" _ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy" diff --git a/plugins/inputs/http_listener/http_listener.go b/plugins/inputs/http_listener/http_listener.go index ddc9ac7bf8cec..95ed934bdbef9 100644 --- a/plugins/inputs/http_listener/http_listener.go +++ b/plugins/inputs/http_listener/http_listener.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -43,6 +44,17 @@ type HTTPListener struct { parser influx.InfluxParser acc telegraf.Accumulator pool *pool + + BytesRecv selfstat.Stat + RequestsServed selfstat.Stat + WritesServed selfstat.Stat + QueriesServed selfstat.Stat + PingsServed selfstat.Stat + RequestsRecv selfstat.Stat + WritesRecv selfstat.Stat + QueriesRecv selfstat.Stat + PingsRecv selfstat.Stat + NotFoundsServed selfstat.Stat } const sampleConfig = ` @@ -81,6 +93,18 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error { h.mu.Lock() defer h.mu.Unlock() + tags := map[string]string{"input": "http_listener"} + h.BytesRecv = selfstat.New("inputs", "bytes_received", tags) + h.RequestsServed = selfstat.New("inputs", "requests_served", tags) + h.WritesServed = selfstat.New("inputs", "writes_served", tags) + h.QueriesServed = selfstat.New("inputs", "queries_served", tags) + h.PingsServed = selfstat.New("inputs", "pings_served", tags) + h.RequestsRecv = selfstat.New("inputs", "requests_received", tags) + h.WritesRecv = selfstat.New("inputs", "writes_received", tags) + h.QueriesRecv = selfstat.New("inputs", "queries_received", tags) + h.PingsRecv = selfstat.New("inputs", "pings_received", tags) + h.NotFoundsServed = selfstat.New("inputs", "not_founds_served", tags) + if h.MaxBodySize == 0 { h.MaxBodySize = DEFAULT_MAX_BODY_SIZE } @@ -141,10 +165,16 @@ func (h *HTTPListener) httpListen() error { } func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { + h.RequestsRecv.Incr(1) + defer h.RequestsServed.Incr(1) switch req.URL.Path { case "/write": + h.WritesRecv.Incr(1) + defer h.WritesServed.Incr(1) h.serveWrite(res, req) case "/query": + h.QueriesRecv.Incr(1) + defer h.QueriesServed.Incr(1) // Deliver a dummy response to the query endpoint, as some InfluxDB // clients test endpoint availability with a query res.Header().Set("Content-Type", "application/json") @@ -152,9 +182,12 @@ func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { res.WriteHeader(http.StatusOK) res.Write([]byte("{\"results\":[]}")) case "/ping": + h.PingsRecv.Incr(1) + defer h.PingsServed.Incr(1) // respond to ping requests res.WriteHeader(http.StatusNoContent) default: + defer h.NotFoundsServed.Incr(1) // Don't know how to respond to calls to other endpoints http.NotFound(res, req) } @@ -195,6 +228,7 @@ func (h *HTTPListener) serveWrite(res http.ResponseWriter, req *http.Request) { badRequest(res) return } + h.BytesRecv.Incr(int64(n)) if err == io.EOF { if return400 { diff --git a/plugins/inputs/self/self.go b/plugins/inputs/self/self.go new file mode 100644 index 0000000000000..d34ab253b938f --- /dev/null +++ b/plugins/inputs/self/self.go @@ -0,0 +1,32 @@ +package self + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/selfstat" +) + +type Self struct { +} + +var sampleConfig = ` +` + +func (s *Self) Description() string { + return "Collect statistics about itself" +} + +func (s *Self) SampleConfig() string { + return sampleConfig +} + +func (s *Self) Gather(acc telegraf.Accumulator) error { + acc.AddMetrics(selfstat.Metrics()) + return nil +} + +func init() { + inputs.Add("self", func() telegraf.Input { + return &Self{} + }) +} diff --git a/selfstat/selfstat.go b/selfstat/selfstat.go new file mode 100644 index 0000000000000..d41319c7db0ca --- /dev/null +++ b/selfstat/selfstat.go @@ -0,0 +1,143 @@ +package selfstat + +import ( + "hash/fnv" + "log" + "sync" + "sync/atomic" + "time" + + "github.com/influxdata/telegraf" +) + +var registry = &rgstry{stats: make(map[uint64][]Stat)} + +type Stat interface { + Name() string + FieldName() string + Metadata() map[string]string + Key() uint64 + Incr(v int64) + Set(v int64) + Get() int64 + + register() +} + +func New(measurement, field string, metadata map[string]string) Stat { + s := &stat{ + measurement: measurement, + field: field, + metadata: metadata, + } + s.register() + return s +} + +func Metrics() []telegraf.Metric { + registry.mu.Lock() + now := time.Now() + metrics := make([]telegraf.Metric, len(registry.stats)) + i := 0 + for _, stats := range registry.stats { + if len(stats) > 0 { + tags := stats[0].Metadata() + name := stats[0].Name() + fields := map[string]interface{}{} + for _, stat := range stats { + fields[stat.FieldName()] = stat.Get() + } + metric, err := telegraf.NewMetric("telegraf_"+name, tags, fields, now) + if err != nil { + log.Printf("E! Error creating selfstat metric: %s", err) + continue + } + metrics[i] = metric + i++ + } + } + registry.mu.Unlock() + return metrics +} + +type intStat struct { + stat + v int64 +} + +type floatStat struct { + stat + v float64 +} + +type stat struct { + measurement string + field string + metadata map[string]string + key uint64 + v int64 + registered bool +} + +func (s *stat) Incr(v int64) { + atomic.AddInt64(&s.v, v) +} + +func (s *stat) Set(v int64) { + atomic.StoreInt64(&s.v, v) +} + +func (s *stat) Get() int64 { + return atomic.LoadInt64(&s.v) +} + +func (s *stat) Name() string { + return s.measurement +} + +func (s *stat) FieldName() string { + return s.field +} + +// Metadata returns a copy of the stat's metadata. +// NOTE this allocates a new map every time it is called. +func (s *stat) Metadata() map[string]string { + m := make(map[string]string, len(s.metadata)) + for k, v := range s.metadata { + m[k] = v + } + return m +} + +func (s *stat) register() { + // create hash key: + h := fnv.New64a() + h.Write([]byte(s.measurement)) + for k, v := range s.metadata { + h.Write([]byte(k + v)) + } + s.key = h.Sum64() + // Add the stat to the registry: + registry.register(s) +} + +func (s *stat) Key() uint64 { + return s.key +} + +type rgstry struct { + stats map[uint64][]Stat + mu sync.Mutex +} + +func (r *rgstry) register(s Stat) { + r.mu.Lock() + if stats, ok := r.stats[s.Key()]; ok { + stats = append(stats, s) + r.stats[s.Key()] = stats + } else { + // creating a new unique metric + r.stats[s.Key()] = []Stat{s} + } + r.mu.Unlock() +} diff --git a/testutil/accumulator.go b/testutil/accumulator.go index 99f9e300683b5..1ca02fd4022d9 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" + "github.com/stretchr/testify/assert" ) @@ -110,6 +112,12 @@ func (a *Accumulator) AddGauge( a.AddFields(measurement, fields, tags, timestamp...) } +func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) { + for _, m := range metrics { + a.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } +} + // AddError appends the given error to Accumulator.Errors. func (a *Accumulator) AddError(err error) { if err == nil {