Skip to content

Commit

Permalink
Statsd listener plugin
Browse files Browse the repository at this point in the history
implement gauges, sets, counters
  • Loading branch information
sparrc committed Oct 6, 2015
1 parent 5112d07 commit 61d0e5c
Show file tree
Hide file tree
Showing 9 changed files with 584 additions and 30 deletions.
2 changes: 0 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ and submit new plugins.
### Plugin Guidelines

* A plugin must conform to the `plugins.Plugin` interface.
* Telegraf promises to run each plugin's Gather function serially. This means
developers don't have to worry about thread safety within these functions.
* Each generated metric automatically has the name of the plugin that generated
it prepended. This is to keep plugins honest.
* Plugins should call `plugins.Add` in their `init` function to register themselves.
Expand Down
14 changes: 14 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,20 @@ func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup

for _, plugin := range a.plugins {

// Start service of any ServicePlugins
switch p := plugin.plugin.(type) {
case plugins.ServicePlugin:
if err := p.Start(); err != nil {
log.Printf("Service for plugin %s failed to start, exiting\n%s\n",
plugin.name, err.Error())
return err
}
defer p.Stop()
}

// Special handling for plugins that have their own collection interval
// configured. Default intervals are handled below with crankParallel
if plugin.config.Interval != 0 {
wg.Add(1)
go func(plugin *runningPlugin) {
Expand Down
52 changes: 37 additions & 15 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,25 @@ var header = `# Telegraf configuration
[outputs]
`

var header2 = `
var pluginHeader = `
###############################################################################
# PLUGINS #
###############################################################################
`

var servicePluginHeader = `
###############################################################################
# SERVICE PLUGINS #
###############################################################################
`

// PrintSampleConfig prints the sample config
func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
fmt.Printf(header)

// Print Outputs
// Filter outputs
var onames []string
for oname := range outputs.Outputs {
if len(outputFilters) == 0 || sliceContains(oname, outputFilters) {
Expand All @@ -397,6 +404,7 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
}
sort.Strings(onames)

// Print Outputs
for _, oname := range onames {
creator := outputs.Outputs[oname]
output := creator()
Expand All @@ -411,9 +419,7 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
}
}

fmt.Printf(header2)

// Print Plugins
// Filter plugins
var pnames []string
for pname := range plugins.Plugins {
if len(pluginFilters) == 0 || sliceContains(pname, pluginFilters) {
Expand All @@ -422,18 +428,36 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
}
sort.Strings(pnames)

// Print Plugins
fmt.Printf(pluginHeader)
servPlugins := make(map[string]plugins.ServicePlugin)
for _, pname := range pnames {
creator := plugins.Plugins[pname]
plugin := creator()

fmt.Printf("\n# %s\n[%s]", plugin.Description(), pname)

config := plugin.SampleConfig()
if config == "" {
fmt.Printf("\n # no configuration\n")
} else {
fmt.Printf(config)
switch p := plugin.(type) {
case plugins.ServicePlugin:
servPlugins[pname] = p
continue
}

printConfig(pname, plugin)
}

// Print Service Plugins
fmt.Printf(servicePluginHeader)
for name, plugin := range servPlugins {
printConfig(name, plugin)
}
}

func printConfig(name string, plugin plugins.Plugin) {
fmt.Printf("\n# %s\n[%s]", plugin.Description(), name)
config := plugin.SampleConfig()
if config == "" {
fmt.Printf("\n # no configuration\n")
} else {
fmt.Printf(config)
}
}

Expand All @@ -449,9 +473,7 @@ func sliceContains(name string, list []string) bool {
// PrintPluginConfig prints the config usage of a single plugin.
func PrintPluginConfig(name string) error {
if creator, ok := plugins.Plugins[name]; ok {
plugin := creator()
fmt.Printf("# %s\n[%s]", plugin.Description(), name)
fmt.Printf(plugin.SampleConfig())
printConfig(name, creator())
} else {
return errors.New(fmt.Sprintf("Plugin %s not found", name))
}
Expand Down
22 changes: 9 additions & 13 deletions outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,18 @@ type InfluxDB struct {

var sampleConfig = `
# The full HTTP endpoint URL for your InfluxDB instance
# Multiple urls can be specified for InfluxDB cluster support. Server to
# write to will be randomly chosen each interval.
urls = ["http://localhost:8086"] # required.
# The target database for metrics. This database must already exist
database = "telegraf" # required.
# Connection timeout (for the connection with InfluxDB), formatted as a string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# If not provided, will default to 0 (no timeout)
# Multiple urls can be specified for InfluxDB cluster support.
urls = ["http://localhost:8086"] # required
# The target database for metrics (telegraf will create it if not exists)
database = "telegraf" # required
# # Connection timeout (for the connection with InfluxDB), formatted as a string.
# # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# # If not provided, will default to 0 (no timeout)
# timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
# Set the user agent for the POSTs (can be useful for log differentiation)
# # Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
`

Expand Down
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ import (
_ "github.com/influxdb/telegraf/plugins/rabbitmq"
_ "github.com/influxdb/telegraf/plugins/redis"
_ "github.com/influxdb/telegraf/plugins/rethinkdb"
_ "github.com/influxdb/telegraf/plugins/statsd"
_ "github.com/influxdb/telegraf/plugins/system"
)
24 changes: 24 additions & 0 deletions plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,35 @@ type Accumulator interface {
}

type Plugin interface {
// SampleConfig returns the default configuration of the Plugin
SampleConfig() string

// Description returns a one-sentence description on the Plugin
Description() string

// Gather takes in an accumulator and adds the metrics that the Plugin
// gathers. This is called every "interval"
Gather(Accumulator) error
}

type ServicePlugin interface {
// SampleConfig returns the default configuration of the Plugin
SampleConfig() string

// Description returns a one-sentence description on the Plugin
Description() string

// Gather takes in an accumulator and adds the metrics that the Plugin
// gathers. This is called every "interval"
Gather(Accumulator) error

// Start starts the ServicePlugin's service, whatever that may be
Start() error

// Stop stops the services and closes any necessary channels and connections
Stop()
}

type Creator func() Plugin

var Plugins = map[string]Creator{}
Expand Down
79 changes: 79 additions & 0 deletions plugins/statsd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Telegraf Service Plugin: statsd

#### Plugin arguments:

- **service_address** string: Address to listen for statsd UDP packets on
- **delete_gauges** boolean: Delete gauges on every collection interval
- **delete_counters** boolean: Delete counters on every collection interval
- **delete_sets** boolean: Delete set counters on every collection interval
- **allowed_pending_messages** integer: Number of messages allowed to queue up
on the UDP listener before the next flush. NOTE: gauge, counter, and set
measurements are aggregated as they arrive, so this is not a straight counter of
the number of total messages that the listener can handle between flushes.

#### Statsd bucket -> InfluxDB Mapping

By default, statsd buckets are converted to measurement names with the rules:
- "." -> "_"
- "-" -> "__"

This plugin also accepts a list of config tables to describe a mapping of a statsd
bucket to an InfluxDB measurement name and tags.

Each mapping must specify a match glob pattern. It can optionally take a name
for the measurement and a map of bucket indices to tag names.

For example, the following configuration:

```
[[statsd.mappings]]
match = "users.current.*.*"
name = "current_users"
[statsd.mappings.tagmap]
unit = 0
server = 2
service = 3
[[statsd.mappings]]
match = "deploys.*.*"
name = "service_deploys"
[statsd.mappings.tagmap]
service_type = 1
service_name = 2
```

Will map statsd -> influx like so:
```
users.current.den001.myapp:32|g
=> [server="den001" service="myapp" unit="users"] statsd_current_users_gauge value=32
deploys.test.myservice:1|c
=> [service_name="myservice" service_type="test"] statsd_service_deploys_counter value=1
random.jumping-sheep:10|c
=> [] statsd_random_jumping__sheep_counter value=10
```

#### Description

The statsd plugin is a special type of plugin which runs a backgrounded statsd
listener service while telegraf is running.

The format of the statsd messages was based on the format described in the
original [etsy statsd](https://github.com/etsy/statsd/blob/master/docs/metric_types.md)
implementation. In short, the telegraf statsd listener will accept:

- Gauges
- `users.current.den001.myapp:32|g` <- standard
- `users.current.den001.myapp:+10|g` <- additive
- `users.current.den001.myapp:-10|g`
- Counters
- `deploys.test.myservice:1|c` <- increments by 1
- `deploys.test.myservice:101|c` <- increments by 101
- `deploys.test.myservice:1|c|@0.1` <- sample rate, increments by 10
- Sets
- `users.unique:101|s`
- `users.unique:101|s`
- `users.unique:102|s` <- would result in a count of 2 for `users.unique`
- Timers
- TODO
Loading

0 comments on commit 61d0e5c

Please sign in to comment.