Skip to content

Commit

Permalink
Add Histogram support aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali Alrahahleh committed Jun 11, 2016
1 parent 06cb5a0 commit 3a9c6ba
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 7 deletions.
1 change: 1 addition & 0 deletions Godeps
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9
github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc
github.com/VividCortex/gohistogram da38b6e56f2f7dc1999a037141441e50d6213f5d
github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687
github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857
github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4
Expand Down
62 changes: 58 additions & 4 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package agent

import (
"fmt"
"github.com/VividCortex/gohistogram"
"log"
"os"
"runtime"
"strconv"
"sync"
"time"

Expand All @@ -16,13 +18,17 @@ import (

// Agent runs telegraf and collects data based on the given config
type Agent struct {
Config *config.Config
Config *config.Config
fieldMap map[string]map[string]*gohistogram.NumericHistogram
metricTags map[string]map[string]string
}

// NewAgent returns an Agent struct based off the given Config
func NewAgent(config *config.Config) (*Agent, error) {
a := &Agent{
Config: config,
Config: config,
fieldMap: make(map[string]map[string]*gohistogram.NumericHistogram),
metricTags: make(map[string]map[string]string),
}

if !a.Config.Agent.OmitHostname {
Expand Down Expand Up @@ -235,6 +241,7 @@ func (a *Agent) flush() {
for _, o := range a.Config.Outputs {
go func(output *internal_models.RunningOutput) {
defer wg.Done()
a.AddHistMetricToOutput(output)
err := output.Write()
if err != nil {
log.Printf("Error writing to output [%s]: %s\n",
Expand Down Expand Up @@ -264,13 +271,60 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er
internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
a.flush()
case m := <-metricC:
for _, o := range a.Config.Outputs {
o.AddMetric(m)
if _, ok := a.Config.Agent.Histogram[m.Name()]; ok {
a.AddMetric(m)
} else {
for _, o := range a.Config.Outputs {
o.AddMetric(m)
}
}
}
}
}

func (a *Agent) AddMetric(metric telegraf.Metric) {
name := metric.Name()
if a.fieldMap[name] == nil {
a.fieldMap[name] = make(map[string]*gohistogram.NumericHistogram)
}
if a.metricTags[name] == nil {
a.metricTags[name] = make(map[string]string)
}
a.metricTags[name] = metric.Tags()
for key, val := range metric.Fields() {
switch v := val.(type) {
case float64:
if a.fieldMap[name][key] == nil {
a.fieldMap[name][key] = gohistogram.NewHistogram(a.Config.Agent.HistogramBuckSize)
}
hist := a.fieldMap[name][key]
hist.Add(v)
default:
log.Printf("When stats enabled all the fields should be of type float64 [field name %s]", key)
}
}
}

func (a *Agent) AddHistMetricToOutput(output *internal_models.RunningOutput) {
for name, fields := range a.fieldMap {
mFields := make(map[string]interface{})
for key, val := range fields {
for _, perc := range a.Config.Agent.Histogram[name] {
p := strconv.FormatFloat(perc*100, 'f', 0, 64)
mFields[key+"_p"+p] = val.Quantile(perc)
}
mFields[key+"_variance"] = val.Variance()
mFields[key+"_mean"] = val.Mean()
mFields[key+"_count"] = val.Count()
}
m, _ := telegraf.NewMetric(name, a.metricTags[name], mFields, time.Now().UTC())
output.AddMetric(m)

delete(a.fieldMap, name)
delete(a.metricTags, name)
}
}

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup
Expand Down
34 changes: 31 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ func NewConfig() *Config {
c := &Config{
// Agent defaults:
Agent: &AgentConfig{
Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true,
FlushInterval: internal.Duration{Duration: 10 * time.Second},
Interval: internal.Duration{Duration: 10 * time.Second},
RoundInterval: true,
FlushInterval: internal.Duration{Duration: 10 * time.Second},
HistogramBuckSize: 20,
},

Tags: make(map[string]string),
Expand Down Expand Up @@ -121,6 +122,15 @@ type AgentConfig struct {
Quiet bool
Hostname string
OmitHostname bool

// Supported Histogram method
// (metric_name) = list of percintil (0.95, 0.39)
// Note if Histogram is enabled for a metric All field will be
// Sampled
Histogram map[string][]float64

// Histogram bucketsize
HistogramBuckSize int
}

// Inputs returns a list of strings of the configured inputs.
Expand Down Expand Up @@ -217,6 +227,24 @@ var header = `# Telegraf Configuration
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
## Supported Histogram method
## (metric_name) = list of percintile (0.95, 0.39)
## Note if Histogram is enabled for a metric. All field will
## be Sampled
## value generated are approxmation please refer to
## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm
## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
##[agent.histogram]
## (metric_name) = (percintile list ex [0.95, 0.50])
## Empty array will generate count median and variance
## (metric_name) = []
##Histogram bucket size
##A larger bin size yields more accurate approximations at the
##cost of increased memory utilization and performance
##histogram_buck_size = 20
###############################################################################
Expand Down

0 comments on commit 3a9c6ba

Please sign in to comment.