Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pass/drop/tagpass/tagdrop for outputs #401

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5)
path = [ "/opt", "/home" ]
```

Below is how to configure `pass` and `drop` parameters (added in 0.1.5)

```
# Drop all metrics for guest CPU usage
[[plugins.cpu]]
drop = [ "cpu_usage_guest" ]

# Only store inode related metrics for disks
[[plugins.disk]]
pass = [ "disk_inodes" ]
```


Additional plugins (or outputs) of the same type can be specified,
just define another instance in the config file:

Expand Down Expand Up @@ -224,6 +237,27 @@ Telegraf also supports specifying multiple output sinks to send data to,
configuring each output sink is different, but examples can be
found by running `telegraf -sample-config`.

Outputs also support the same configurable options as plugins (pass, drop, tagpass, tagdrop)

```
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "telegraf"
# Drop all measurements that start with "aerospike"
drop = ["aerospike"]

# Send to a different database
[[outputs.influxdb]]
urls = [ "http://localhost:8086" ]
database = "mydb"
precision = "s"

# Only store measurements where the tag "mytag" matches the value "B"
[outputs.influxdb.tagpass]
mytag = ["B"]
```


## Supported Outputs

* influxdb
Expand Down
2 changes: 1 addition & 1 deletion accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (ac *accumulator) AddFields(
}

if ac.pluginConfig != nil {
if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) {
if !ac.pluginConfig.Filter.ShouldPass(measurement) || !ac.pluginConfig.Filter.ShouldTagsPass(tags) {
return
}
}
Expand Down
5 changes: 3 additions & 2 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,13 @@ func (a *Agent) writeOutput(
start := time.Now()

for {
err := ro.Output.Write(points)
filtered := ro.FilterPoints(points)
err := ro.Output.Write(filtered)
if err == nil {
// Write successful
elapsed := time.Since(start)
log.Printf("Flushed %d metrics to output %s in %s\n",
len(points), ro.Name, elapsed)
len(filtered), ro.Name, elapsed)
return
}

Expand Down
155 changes: 110 additions & 45 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

"github.com/naoina/toml"
"github.com/naoina/toml/ast"

"github.com/influxdb/influxdb/client/v2"
)

// Config specifies the URL/user/password for the database that telegraf
Expand Down Expand Up @@ -88,6 +90,7 @@ type TagFilter struct {
type RunningOutput struct {
Name string
Output outputs.Output
Config *OutputConfig
}

type RunningPlugin struct {
Expand All @@ -96,34 +99,61 @@ type RunningPlugin struct {
Config *PluginConfig
}

// PluginConfig containing a name, interval, and drop/pass prefix lists
// Also lists the tags to filter
type PluginConfig struct {
Name string

// Filter containing drop/pass and tagdrop/tagpass rules
type Filter struct {
Drop []string
Pass []string

TagDrop []TagFilter
TagPass []TagFilter

IsActive bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea 👍

}

// PluginConfig containing a name, interval, and filter
type PluginConfig struct {
Name string
Filter Filter
Interval time.Duration
}

// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Filter Filter
}

// Filter returns filtered slice of client.Points based on whether filters
// are active for this RunningOutput.
func (ro *RunningOutput) FilterPoints(points []*client.Point) []*client.Point {
if !ro.Config.Filter.IsActive {
return points
}

var filteredPoints []*client.Point
for i := range points {
if !ro.Config.Filter.ShouldPass(points[i].Name()) || !ro.Config.Filter.ShouldTagsPass(points[i].Tags()) {
continue
}
filteredPoints = append(filteredPoints, points[i])
}
return filteredPoints
}

// ShouldPass returns true if the metric should pass, false if should drop
// based on the drop/pass plugin parameters
func (cp *PluginConfig) ShouldPass(measurement string) bool {
if cp.Pass != nil {
for _, pat := range cp.Pass {
// based on the drop/pass filter parameters
func (f Filter) ShouldPass(measurement string) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you made these non-pointer receivers? (Filter instead of *Filter)

if f.Pass != nil {
for _, pat := range f.Pass {
if strings.HasPrefix(measurement, pat) {
return true
}
}
return false
}

if cp.Drop != nil {
for _, pat := range cp.Drop {
if f.Drop != nil {
for _, pat := range f.Drop {
if strings.HasPrefix(measurement, pat) {
return false
}
Expand All @@ -135,10 +165,10 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool {
}

// ShouldTagsPass returns true if the metric should pass, false if should drop
// based on the tagdrop/tagpass plugin parameters
func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
if cp.TagPass != nil {
for _, pat := range cp.TagPass {
// based on the tagdrop/tagpass filter parameters
func (f Filter) ShouldTagsPass(tags map[string]string) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-pointer receiver ditto

if f.TagPass != nil {
for _, pat := range f.TagPass {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
Expand All @@ -150,8 +180,8 @@ func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool {
return false
}

if cp.TagDrop != nil {
for _, pat := range cp.TagDrop {
if f.TagDrop != nil {
for _, pat := range f.TagDrop {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
Expand Down Expand Up @@ -469,15 +499,21 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
if !ok {
return fmt.Errorf("Undefined but requested output: %s", name)
}
o := creator()
output := creator()

if err := toml.UnmarshalTable(table, o); err != nil {
outputConfig, err := buildOutput(name, table)
if err != nil {
return err
}

if err := toml.UnmarshalTable(table, output); err != nil {
return err
}

ro := &RunningOutput{
Name: name,
Output: o,
Output: output,
Config: outputConfig,
}
c.Outputs = append(c.Outputs, ro)
return nil
Expand All @@ -493,10 +529,15 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
}
plugin := creator()

pluginConfig, err := applyPlugin(name, table, plugin)
pluginConfig, err := buildPlugin(name, table)
if err != nil {
return err
}

if err := toml.UnmarshalTable(table, plugin); err != nil {
return err
}

rp := &RunningPlugin{
Name: name,
Plugin: plugin,
Expand All @@ -506,18 +547,19 @@ func (c *Config) addPlugin(name string, table *ast.Table) error {
return nil
}

// applyPlugin takes defined plugin names and applies them to the given
// interface, returning a PluginConfig object in the end that can
// be inserted into a runningPlugin by the agent.
func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) {
cp := &PluginConfig{Name: name}
// buildFilter builds a Filter (tagpass/tagdrop/pass/drop) to
// be inserted into the OutputConfig/PluginConfig to be used for prefix
// filtering on tags and measurements
func buildFilter(tbl *ast.Table) Filter {
f := Filter{}

if node, ok := tbl.Fields["pass"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Pass = append(cp.Pass, str.Value)
f.Pass = append(f.Pass, str.Value)
f.IsActive = true
}
}
}
Expand All @@ -529,26 +571,14 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
cp.Drop = append(cp.Drop, str.Value)
f.Drop = append(f.Drop, str.Value)
f.IsActive = true
}
}
}
}
}

if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

cp.Interval = dur
}
}
}

if node, ok := tbl.Fields["tagpass"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
Expand All @@ -561,7 +591,8 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
}
}
}
cp.TagPass = append(cp.TagPass, *tagfilter)
f.TagPass = append(f.TagPass, *tagfilter)
f.IsActive = true
}
}
}
Expand All @@ -579,16 +610,50 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig,
}
}
}
cp.TagDrop = append(cp.TagDrop, *tagfilter)
f.TagDrop = append(f.TagDrop, *tagfilter)
f.IsActive = true
}
}
}
}

delete(tbl.Fields, "drop")
delete(tbl.Fields, "pass")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tagdrop")
delete(tbl.Fields, "tagpass")
return cp, toml.UnmarshalTable(tbl, p)
return f
}

// buildPlugin parses plugin specific items from the ast.Table, builds the filter and returns a
// PluginConfig to be inserted into RunningPlugin
func buildPlugin(name string, tbl *ast.Table) (*PluginConfig, error) {
cp := &PluginConfig{Name: name}
if node, ok := tbl.Fields["interval"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
dur, err := time.ParseDuration(str.Value)
if err != nil {
return nil, err
}

cp.Interval = dur
}
}
}
delete(tbl.Fields, "interval")
cp.Filter = buildFilter(tbl)
return cp, nil

}

// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
// OutputConfig to be inserted into RunningPlugin
// Note: error exists in the return for future calls that might require error
func buildOutput(name string, tbl *ast.Table) (*OutputConfig, error) {
oc := &OutputConfig{
Name: name,
Filter: buildFilter(tbl),
}
return oc, nil

}
Loading