diff --git a/Godeps b/Godeps index 2ac95a90455eb..0550ddcc85cdd 100644 --- a/Godeps +++ b/Godeps @@ -1,5 +1,6 @@ github.com/Shopify/sarama 8aadb476e66ca998f2f6bb3c993e9a2daa3666b9 github.com/Sirupsen/logrus 219c8cb75c258c552e999735be6df753ffc7afdc +github.com/VividCortex/gohistogram da38b6e56f2f7dc1999a037141441e50d6213f5d github.com/amir/raidman 53c1b967405155bfc8758557863bf2e14f814687 github.com/aws/aws-sdk-go 13a12060f716145019378a10e2806c174356b857 github.com/beorn7/perks 3ac7bf7a47d159a033b107610db8a1b6575507a4 diff --git a/agent/agent.go b/agent/agent.go index 1423ef773eeee..a9e8bc4be2464 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2,9 +2,11 @@ package agent import ( "fmt" + "github.com/VividCortex/gohistogram" "log" "os" "runtime" + "strconv" "sync" "time" @@ -16,13 +18,17 @@ import ( // Agent runs telegraf and collects data based on the given config type Agent struct { - Config *config.Config + Config *config.Config + fieldMap map[string]map[string]*gohistogram.NumericHistogram + metricTags map[string]map[string]string } // NewAgent returns an Agent struct based off the given Config func NewAgent(config *config.Config) (*Agent, error) { a := &Agent{ - Config: config, + Config: config, + fieldMap: make(map[string]map[string]*gohistogram.NumericHistogram), + metricTags: make(map[string]map[string]string), } if !a.Config.Agent.OmitHostname { @@ -235,6 +241,7 @@ func (a *Agent) flush() { for _, o := range a.Config.Outputs { go func(output *internal_models.RunningOutput) { defer wg.Done() + a.AddHistMetricToOutput(output) err := output.Write() if err != nil { log.Printf("Error writing to output [%s]: %s\n", @@ -264,13 +271,60 @@ func (a *Agent) flusher(shutdown chan struct{}, metricC chan telegraf.Metric) er internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() case m := <-metricC: - for _, o := range a.Config.Outputs { - o.AddMetric(m) + if _, ok := a.Config.Agent.Histogram[m.Name()]; ok { + a.AddMetric(m) + } else { + for _, o := range a.Config.Outputs { + o.AddMetric(m) + } } } } } +func (a *Agent) AddMetric(metric telegraf.Metric) { + name := metric.Name() + if a.fieldMap[name] == nil { + a.fieldMap[name] = make(map[string]*gohistogram.NumericHistogram) + } + if a.metricTags[name] == nil { + a.metricTags[name] = make(map[string]string) + } + a.metricTags[name] = metric.Tags() + for key, val := range metric.Fields() { + switch v := val.(type) { + case float64: + if a.fieldMap[name][key] == nil { + a.fieldMap[name][key] = gohistogram.NewHistogram(a.Config.Agent.HistogramBuckSize) + } + hist := a.fieldMap[name][key] + hist.Add(v) + default: + log.Printf("When stats enabled all the fields should be of type float64 [field name %s]", key) + } + } +} + +func (a *Agent) AddHistMetricToOutput(output *internal_models.RunningOutput) { + for name, fields := range a.fieldMap { + mFields := make(map[string]interface{}) + for key, val := range fields { + for _, perc := range a.Config.Agent.Histogram[name] { + p := strconv.FormatFloat(perc*100, 'f', 0, 64) + mFields[key+"_p"+p] = val.Quantile(perc) + } + mFields[key+"_variance"] = val.Variance() + mFields[key+"_mean"] = val.Mean() + mFields[key+"_count"] = val.Count() + } + m, _ := telegraf.NewMetric(name, a.metricTags[name], mFields, time.Now().UTC()) + output.AddMetric(m) + + delete(a.fieldMap, name) + delete(a.metricTags, name) + } +} + // Run runs the agent daemon, gathering every Interval func (a *Agent) Run(shutdown chan struct{}) error { var wg sync.WaitGroup diff --git a/internal/config/config.go b/internal/config/config.go index fdc9a8753e283..aec1970f726d2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,9 +55,10 @@ func NewConfig() *Config { c := &Config{ // Agent defaults: Agent: &AgentConfig{ - Interval: internal.Duration{Duration: 10 * time.Second}, - RoundInterval: true, - FlushInterval: internal.Duration{Duration: 10 * time.Second}, + Interval: internal.Duration{Duration: 10 * time.Second}, + RoundInterval: true, + FlushInterval: internal.Duration{Duration: 10 * time.Second}, + HistogramBuckSize: 20, }, Tags: make(map[string]string), @@ -121,6 +122,15 @@ type AgentConfig struct { Quiet bool Hostname string OmitHostname bool + + // Supported Histogram method + // (metric_name) = list of percintil (0.95, 0.39) + // Note if Histogram is enabled for a metric All field will be + // Sampled + Histogram map[string][]float64 + + // Histogram bucketsize + HistogramBuckSize int } // Inputs returns a list of strings of the configured inputs. @@ -217,6 +227,24 @@ var header = `# Telegraf Configuration hostname = "" ## If set to true, do no set the "host" tag in the telegraf agent. omit_hostname = false + ## Supported Histogram method + ## (metric_name) = list of percintile (0.95, 0.39) + ## Note if Histogram is enabled for a metric. All field will + ## be Sampled + ## value generated are approxmation please refer to + ## Ben-Haim & Yom-Tov's A Streaming Parallel Decision Tree Algorithm + ## http://jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + ##[agent.histogram] + ## (metric_name) = (percintile list ex [0.95, 0.50]) + ## Empty array will generate count median and variance + ## (metric_name) = [] + + + ##Histogram bucket size + ##A larger bin size yields more accurate approximations at the + ##cost of increased memory utilization and performance + ##histogram_buck_size = 20 + ###############################################################################