Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utilizing new client and overhauling Accumulator interface #292

Merged
merged 1 commit into from
Oct 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

### Release Notes
- The -test flag will now only output 2 collections for plugins that need it
- There is a new agent configuration option: `flush_interval`. This option tells
Telegraf how often to flush data to InfluxDB and other output sinks. For example,
users can set `interval = "2s"` and `flush_interval = "60s"` for Telegraf to
collect data every 2 seconds, and flush every 60 seconds.
- `precision` and `utc` are no longer valid agent config values. `precision` has
moved to the `influxdb` output config, where it will continue to default to "s"
- debug and test output will now print the raw line-protocol string

### Features
- [#205](https://github.com/influxdb/telegraf/issues/205): Include per-db redis keyspace info
Expand All @@ -18,13 +25,17 @@ of metrics collected and from how many plugins.
- [#262](https://github.com/influxdb/telegraf/pull/262): zookeeper plugin, thanks @jrxFive!
- [#237](https://github.com/influxdb/telegraf/pull/237): statsd service plugin, thanks @sparrc
- [#273](https://github.com/influxdb/telegraf/pull/273): puppet agent plugin, thats @jrxFive!
- [#280](https://github.com/influxdb/telegraf/issues/280): Use InfluxDB client v2.
- [#281](https://github.com/influxdb/telegraf/issues/281): Eliminate need to deep copy Batch Points.

### Bugfixes
- [#228](https://github.com/influxdb/telegraf/pull/228): New version of package will replace old one. Thanks @ekini!
- [#232](https://github.com/influxdb/telegraf/pull/232): Fix bashism run during deb package installation. Thanks @yankcrime!
- [#261](https://github.com/influxdb/telegraf/issues/260): RabbitMQ panics if wrong credentials given. Thanks @ekini!
- [#245](https://github.com/influxdb/telegraf/issues/245): Document Exec plugin example. Thanks @ekini!
- [#264](https://github.com/influxdb/telegraf/issues/264): logrotate config file fixes. Thanks @linsomniac!
- [#290](https://github.com/influxdb/telegraf/issues/290): Fix some plugins sending their values as strings.
- [#289](https://github.com/influxdb/telegraf/issues/289): Fix accumulator panic on nil tags.

## v0.1.9 [2015-09-22]

Expand Down
21 changes: 12 additions & 9 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ type Plugin interface {
}

type Accumulator interface {
Add(measurement string, value interface{}, tags map[string]string)
AddFieldsWithTime(measurement string,
values map[string]interface{},
Add(measurement string,
value interface{},
tags map[string]string,
timestamp time.Time)
timestamp ...time.Time)
AddFields(measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp ...time.Time)
}
```

Expand Down Expand Up @@ -81,8 +84,8 @@ func Gather(acc plugins.Accumulator) error {
"pid": fmt.Sprintf("%d", process.Pid),
}

acc.Add("cpu", process.CPUTime, tags)
acc.Add("memory", process.MemoryBytes, tags)
acc.Add("cpu", process.CPUTime, tags, time.Now())
acc.Add("memory", process.MemoryBytes, tags, time.Now())
}
}
```
Expand Down Expand Up @@ -179,7 +182,7 @@ type Output interface {
Close() error
Description() string
SampleConfig() string
Write(client.BatchPoints) error
Write(points []*client.Point) error
}
```

Expand Down Expand Up @@ -214,8 +217,8 @@ func (s *Simple) Close() error {
return nil
}

func (s *Simple) Write(bp client.BatchPoints) error {
for _, pt := range bp {
func (s *Simple) Write(points []*client.Point) error {
for _, pt := range points {
// write `pt` to the output sink here
}
return nil
Expand Down
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ ifeq ($(UNAME), Linux)
ADVERTISED_HOST=localhost docker-compose --file scripts/docker-compose.yml up -d
endif

test: prepare docker-compose
$(GOBIN)/godep go test ./...
test: test-cleanup prepare docker-compose
# Sleeping for kafka leadership election, TSDB setup, etc.
sleep 30
# Setup SUCCESS, running tests
godep go test ./...

test-short: prepare
$(GOBIN)/godep go test -short ./...
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ measurements at a 10s interval and will collect totalcpu & percpu data.
[outputs.influxdb]
url = "http://192.168.59.103:8086" # required.
database = "telegraf" # required.
precision = "s"

# PLUGINS
[cpu]
Expand Down Expand Up @@ -196,7 +197,7 @@ Telegraf currently has support for collecting metrics from
* disk
* swap

## Service Plugins
## Supported Service Plugins

Telegraf can collect metrics via the following services

Expand Down
210 changes: 80 additions & 130 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,188 +2,138 @@ package telegraf

import (
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/client/v2"
)

// BatchPoints is used to send a batch of data in a single write from telegraf
// to influx
type BatchPoints struct {
sync.Mutex

client.BatchPoints
type Accumulator interface {
Add(measurement string, value interface{},
tags map[string]string, t ...time.Time)
AddFields(measurement string, fields map[string]interface{},
tags map[string]string, t ...time.Time)

Debug bool
SetDefaultTags(tags map[string]string)
AddDefaultTag(key, value string)

Prefix string
Prefix() string
SetPrefix(prefix string)

Config *ConfiguredPlugin
Debug() bool
SetDebug(enabled bool)
}

// deepcopy returns a deep copy of the BatchPoints object. This is primarily so
// we can do multithreaded output flushing (see Agent.flush)
func (bp *BatchPoints) deepcopy() *BatchPoints {
bp.Lock()
defer bp.Unlock()

var bpc BatchPoints
bpc.Time = bp.Time
bpc.Precision = bp.Precision

bpc.Tags = make(map[string]string)
for k, v := range bp.Tags {
bpc.Tags[k] = v
}
func NewAccumulator(
plugin *ConfiguredPlugin,
points chan *client.Point,
) Accumulator {
acc := accumulator{}
acc.points = points
acc.plugin = plugin
return &acc
}

var pts []client.Point
for _, pt := range bp.Points {
var ptc client.Point
type accumulator struct {
sync.Mutex

ptc.Measurement = pt.Measurement
ptc.Time = pt.Time
ptc.Precision = pt.Precision
ptc.Raw = pt.Raw
points chan *client.Point

ptc.Tags = make(map[string]string)
ptc.Fields = make(map[string]interface{})
defaultTags map[string]string

for k, v := range pt.Tags {
ptc.Tags[k] = v
}
debug bool

for k, v := range pt.Fields {
ptc.Fields[k] = v
}
pts = append(pts, ptc)
}
plugin *ConfiguredPlugin

bpc.Points = pts
return &bpc
prefix string
}

// Add adds a measurement
func (bp *BatchPoints) Add(
func (ac *accumulator) Add(
measurement string,
val interface{},
value interface{},
tags map[string]string,
t ...time.Time,
) {
fields := make(map[string]interface{})
fields["value"] = val
bp.AddFields(measurement, fields, tags)
fields["value"] = value
ac.AddFields(measurement, fields, tags, t...)
}

// AddFieldsWithTime adds a measurement with a provided timestamp
func (bp *BatchPoints) AddFieldsWithTime(
func (ac *accumulator) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
timestamp time.Time,
t ...time.Time,
) {
// TODO this function should add the fields with the timestamp, but that will
// need to wait for the InfluxDB point precision/unit to be fixed
bp.AddFields(measurement, fields, tags)
// bp.Lock()
// defer bp.Unlock()

// measurement = bp.Prefix + measurement

// if bp.Config != nil {
// if !bp.Config.ShouldPass(measurement, tags) {
// return
// }
// }

// if bp.Debug {
// var tg []string

// for k, v := range tags {
// tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
// }

// var vals []string

// for k, v := range fields {
// vals = append(vals, fmt.Sprintf("%s=%v", k, v))
// }

// sort.Strings(tg)
// sort.Strings(vals)

// fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
// }

// bp.Points = append(bp.Points, client.Point{
// Measurement: measurement,
// Tags: tags,
// Fields: fields,
// Time: timestamp,
// })
}

// AddFields will eventually replace the Add function, once we move to having a
// single plugin as a single measurement with multiple fields
func (bp *BatchPoints) AddFields(
measurement string,
fields map[string]interface{},
tags map[string]string,
) {
bp.Lock()
defer bp.Unlock()
if tags == nil {
tags = make(map[string]string)
}

// InfluxDB does not support writing uint64
// InfluxDB client/points does not support writing uint64
// TODO fix when it does
// https://github.com/influxdb/influxdb/pull/4508
for k, v := range fields {
switch val := v.(type) {
case uint64:
if val < uint64(9223372036854775808) {
fields[k] = int64(val)
} else {
fields[k] = int64(9223372036854775807)
}
}
}

measurement = bp.Prefix + measurement
var timestamp time.Time
if len(t) > 0 {
timestamp = t[0]
} else {
timestamp = time.Now()
}

if bp.Config != nil {
if !bp.Config.ShouldPass(measurement, tags) {
if ac.plugin != nil {
if !ac.plugin.ShouldPass(measurement, tags) {
return
}
}

// Apply BatchPoints tags to tags passed in, giving precedence to those
// passed in. This is so that plugins have the ability to override global
// tags.
for k, v := range bp.Tags {
_, ok := tags[k]
if !ok {
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
}

if bp.Debug {
var tg []string
if ac.prefix != "" {
measurement = ac.prefix + measurement
}

for k, v := range tags {
tg = append(tg, fmt.Sprintf("%s=\"%s\"", k, v))
}
pt := client.NewPoint(measurement, tags, fields, timestamp)
if ac.debug {
fmt.Println("> " + pt.String())
}
ac.points <- pt
}

var vals []string
func (ac *accumulator) SetDefaultTags(tags map[string]string) {
ac.defaultTags = tags
}

for k, v := range fields {
vals = append(vals, fmt.Sprintf("%s=%v", k, v))
}
func (ac *accumulator) AddDefaultTag(key, value string) {
ac.defaultTags[key] = value
}

sort.Strings(tg)
sort.Strings(vals)
func (ac *accumulator) Prefix() string {
return ac.prefix
}

fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
}
func (ac *accumulator) SetPrefix(prefix string) {
ac.prefix = prefix
}

func (ac *accumulator) Debug() bool {
return ac.debug
}

bp.Points = append(bp.Points, client.Point{
Measurement: measurement,
Tags: tags,
Fields: fields,
})
func (ac *accumulator) SetDebug(debug bool) {
ac.debug = debug
}
Loading