From 727dcbb808f09b3c960f4d28c6d323a27adbae3a Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 16 Oct 2015 16:13:32 -0600 Subject: [PATCH] Utilizing new client and overhauling Accumulator interface Fixes #280 Fixes #281 --- accumulator.go | 126 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 120 insertions(+), 6 deletions(-) diff --git a/accumulator.go b/accumulator.go index f7fb7c1e421d0..5e3fe1a21626d 100644 --- a/accumulator.go +++ b/accumulator.go @@ -7,15 +7,129 @@ import ( "sync" "time" - "github.com/influxdb/influxdb/client" + oldclient "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/client/v2" ) +type Accumulator interface { + Add(measurement string, fields map[string]interface{}, + tags map[string]string, t ...time.Time) + + BatchPoints() client.BatchPoints + + SetDefaultTags(tags map[string]string) + AddDefaultTag(key, value string) + + Prefix() string + SetPrefix(prefix string) + + Debug() bool + SetDebug(enabled bool) +} + +func NewAccumulator( + precision string, + database string, + config *ConfiguredPlugin, +) (Accumulator, error) { + var err error + acc := accumulator{} + bpconfig := client.BatchPointsConfig{ + Precision: precision, + Database: database, + } + if acc.batch, err = client.NewBatchPoints(bpconfig); err != nil { + return nil, err + } + acc.config = config + return &acc, nil +} + +type accumulator struct { + sync.Mutex + + batch client.BatchPoints + + defaultTags map[string]string + + debug bool + + config *ConfiguredPlugin + + prefix string +} + +func (ac *accumulator) Add( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + ac.Lock() + defer ac.Unlock() + + var timestamp time.Time + if len(t) > 0 { + timestamp = t[0] + } else { + timestamp = time.Now() + } + + if ac.config != nil { + if !ac.config.ShouldPass(measurement, tags) { + return + } + } + + for k, v := range ac.defaultTags { + if _, ok := tags[k]; !ok { + tags[k] = v + } + } + + pt := client.NewPoint(measurement, tags, fields, timestamp) + if ac.debug { + fmt.Println(pt.PrecisionString(ac.batch.Precision())) + } + ac.batch.AddPoint(pt) +} + +func (ac *accumulator) BatchPoints() client.BatchPoints { + ac.Lock() + defer ac.Unlock() + return ac.batch +} + +func (ac *accumulator) SetDefaultTags(tags map[string]string) { + ac.defaultTags = tags +} + +func (ac *accumulator) AddDefaultTag(key, value string) { + ac.defaultTags[key] = value +} + +func (ac *accumulator) Prefix() string { + return ac.prefix +} + +func (ac *accumulator) SetPrefix(prefix string) { + ac.prefix = prefix +} + +func (ac *accumulator) Debug() bool { + return ac.debug +} + +func (ac *accumulator) SetDebug(enabled bool) { + ac.debug = enabled +} + // BatchPoints is used to send a batch of data in a single write from telegraf // to influx type BatchPoints struct { sync.Mutex - client.BatchPoints + oldclient.BatchPoints Debug bool @@ -39,9 +153,9 @@ func (bp *BatchPoints) deepcopy() *BatchPoints { bpc.Tags[k] = v } - var pts []client.Point + var pts []oldclient.Point for _, pt := range bp.Points { - var ptc client.Point + var ptc oldclient.Point ptc.Measurement = pt.Measurement ptc.Time = pt.Time @@ -116,7 +230,7 @@ func (bp *BatchPoints) AddFieldsWithTime( // fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " ")) // } - // bp.Points = append(bp.Points, client.Point{ + // bp.Points = append(bp.Points, oldclient.Point{ // Measurement: measurement, // Tags: tags, // Fields: fields, @@ -171,7 +285,7 @@ func (bp *BatchPoints) AddFields( fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " ")) } - bp.Points = append(bp.Points, client.Point{ + bp.Points = append(bp.Points, oldclient.Point{ Measurement: measurement, Tags: tags, Fields: fields,