From 58cca8cc3a003ce9e6be6a3ce2e6037499721605 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 21 Oct 2015 10:57:51 -0600 Subject: [PATCH] Add support for retrying output writes, using independent threads Fixes #285 --- agent.go | 70 +++++++++++++++++++++++++++++++++++++++++-------------- config.go | 2 ++ 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/agent.go b/agent.go index aa146ca3d6f3d..d13c171f86544 100644 --- a/agent.go +++ b/agent.go @@ -34,6 +34,9 @@ type Agent struct { // Interval at which to flush data FlushInterval Duration + // FlushRetries is the number of times to retry each data flush + FlushRetries int + // TODO(cam): Remove UTC and Precision parameters, they are no longer // valid for the agent config. Leaving them here for now for backwards- // compatability @@ -61,6 +64,7 @@ func NewAgent(config *Config) (*Agent, error) { Config: config, Interval: Duration{10 * time.Second}, FlushInterval: Duration{10 * time.Second}, + FlushRetries: 2, UTC: true, Precision: "s", } @@ -293,28 +297,56 @@ func (a *Agent) Test() error { return nil } -func (a *Agent) flush(points []*client.Point) { - var wg sync.WaitGroup - +// writeOutput writes a list of points to a single output, with retries +func (a *Agent) writeOutput( + points []*client.Point, + ro *runningOutput, + shutdown chan struct{}, +) { + retry := 0 + retries := a.FlushRetries start := time.Now() - counter := 0 - for _, o := range a.outputs { - wg.Add(1) - counter++ - go func(ro *runningOutput) { - defer wg.Done() - // Log all output errors: - if err := ro.output.Write(points); err != nil { - log.Printf("Error in output [%s]: %s", ro.name, err.Error()) + for { + err := ro.output.Write(points) + + select { + case <-shutdown: + return + default: + if err == nil { + // Write successful + elapsed := time.Since(start) + log.Printf("Flushed %d metrics to output %s in %s\n", + len(points), ro.name, elapsed) + return + } else if retry >= retries { + // No more retries + msg := "FATAL: Write to output [%s] failed %d times, dropping" + + " %d metrics\n" + log.Printf(msg, ro.name, retries+1, len(points)) + return + } else if err != nil { + // Sleep for a retry + log.Printf("Error in output [%s]: %s, retrying in %s", + ro.name, err.Error(), a.FlushInterval.Duration) + time.Sleep(a.FlushInterval.Duration) } - }(o) + } + + retry++ } +} - wg.Wait() - elapsed := time.Since(start) - log.Printf("Flushed %d metrics to %d output sinks in %s\n", - len(points), counter, elapsed) +// flush writes a list of points to all configured outputs +func (a *Agent) flush(points []*client.Point, shutdown chan struct{}) { + if len(points) == 0 { + return + } + + for _, o := range a.outputs { + go a.writeOutput(points, o, shutdown) + } } // flusher monitors the points input channel and flushes on the minimum interval @@ -327,9 +359,11 @@ func (a *Agent) flusher(shutdown chan struct{}, pointChan chan *client.Point) er for { select { case <-shutdown: + log.Println("Hang on, flushing any cached points before shutdown") + a.flush(points, shutdown) return nil case <-ticker.C: - a.flush(points) + a.flush(points, shutdown) points = make([]*client.Point, 0) case pt := <-pointChan: points = append(points, pt) diff --git a/config.go b/config.go index 13f62ce55b249..ac0fba4257da4 100644 --- a/config.go +++ b/config.go @@ -357,6 +357,8 @@ var header = `# Telegraf configuration interval = "10s" # Default data flushing interval for all outputs flush_interval = "10s" + # Number of times to retry each data flush + flush_retries = 2 # run telegraf in debug mode debug = false # Override default hostname, if empty use os.Hostname()