Skip to content

Commit

Permalink
Followup to issue #77, create configured database name from toml file
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Aug 12, 2015
1 parent 53969ae commit e690423
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 51 deletions.
62 changes: 24 additions & 38 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package telegraf
import (
"fmt"
"log"
"net/url"
"os"
"sort"
"strings"
Expand Down Expand Up @@ -71,14 +70,15 @@ func NewAgent(config *Config) (*Agent, error) {
// Connect connects to the agent's config URL
func (a *Agent) Connect() error {
for _, o := range a.outputs {
err := o.output.Connect(a.Hostname)
err := o.output.Connect()
if err != nil {
return err
}
}
return nil
}

// LoadOutputs loads the agent's outputs
func (a *Agent) LoadOutputs() ([]string, error) {
var names []string

Expand All @@ -99,15 +99,7 @@ func (a *Agent) LoadOutputs() ([]string, error) {
names = append(names, name)
}

_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE telegraf"),
})

if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
}

a.conn = c
sort.Strings(names)

return names, nil
}
Expand All @@ -128,8 +120,6 @@ func (a *Agent) LoadPlugins(pluginsFilter string) ([]string, error) {
return nil, fmt.Errorf("Undefined but requested plugin: %s", name)
}

plugin := creator()

isPluginEnabled := false
if len(filters) > 0 {
for _, runeValue := range filters {
Expand Down Expand Up @@ -190,60 +180,56 @@ func (a *Agent) crankParallel() error {

close(points)

var acc BatchPoints
acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
var bp BatchPoints
bp.Time = time.Now()
bp.Tags = a.Config.Tags

for sub := range points {
acc.Points = append(acc.Points, sub.Points...)
bp.Points = append(bp.Points, sub.Points...)
}

_, err := a.conn.Write(acc.BatchPoints)
return err
return a.flush(&bp)
}

func (a *Agent) crank() error {
var acc BatchPoints
var bp BatchPoints

acc.Debug = a.Debug
bp.Debug = a.Debug

for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
err := plugin.plugin.Gather(&bp)
if err != nil {
return err
}
}

acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
bp.Time = time.Now()
bp.Tags = a.Config.Tags

return a.flush(&acc)
return a.flush(&bp)
}

func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
ticker := time.NewTicker(plugin.config.Interval)

for {
var acc BatchPoints
var bp BatchPoints

acc.Debug = a.Debug
bp.Debug = a.Debug

acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
err := plugin.plugin.Gather(&bp)
if err != nil {
return err
}

acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
bp.Tags = a.Config.Tags
bp.Time = time.Now()

err = a.flush(&acc)
err = a.flush(&bp)
if err != nil {
return err
}
Expand Down
22 changes: 9 additions & 13 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
// will be logging to, as well as all the plugins that the user has
// specified
type Config struct {
Tags map[string]string

agent *ast.Table
plugins map[string]*ast.Table
outputs map[string]*ast.Table
Expand All @@ -43,10 +45,6 @@ type Config struct {
func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins
}
type TagFilter struct {
Name string
Filter []string
}

// Outputs returns the configured outputs as a map of name -> output toml
func (c *Config) Outputs() map[string]*ast.Table {
Expand All @@ -66,9 +64,6 @@ type ConfiguredPlugin struct {

Drop []string
Pass []string
TagDrop []TagFilter

TagPass []TagFilter

TagDrop []TagFilter
TagPass []TagFilter
Expand Down Expand Up @@ -131,7 +126,8 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]strin
func (c *Config) ApplyOutput(name string, v interface{}) error {
if c.outputs[name] != nil {
return toml.UnmarshalTable(c.outputs[name], v)
}
}
return nil
}

// ApplyAgent loads the toml config into the given interface
Expand Down Expand Up @@ -246,15 +242,15 @@ func (c *Config) OutputsDeclared() []string {
}

func declared(endpoints map[string]*ast.Table) []string {
var plugins []string
var names []string

for name := range c.plugins {
plugins = append(plugins, name)
for name := range endpoints {
names = append(names, name)
}

sort.Strings(plugins)
sort.Strings(names)

return plugins
return names
}

// DefaultConfig returns an empty default configuration
Expand Down
5 changes: 5 additions & 0 deletions outputs/all/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package all

import (
_ "github.com/influxdb/telegraf/outputs/influxdb"
)
67 changes: 67 additions & 0 deletions outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package influxdb

import (
"fmt"
"log"
"net/url"
"strings"

"github.com/influxdb/influxdb/client"
t "github.com/influxdb/telegraf"
"github.com/influxdb/telegraf/outputs"
)

type InfluxDB struct {
URL string
Username string
Password string
Database string
UserAgent string
Timeout t.Duration

conn *client.Client
}

func (i *InfluxDB) Connect() error {
u, err := url.Parse(i.URL)
if err != nil {
return err
}

c, err := client.NewClient(client.Config{
URL: *u,
Username: i.Username,
Password: i.Password,
UserAgent: i.UserAgent,
Timeout: i.Timeout.Duration,
})

if err != nil {
return err
}

_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE telegraf"),
})

if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
}

i.conn = c
return nil
}

func (i *InfluxDB) Write(bp client.BatchPoints) error {
bp.Database = i.Database
if _, err := i.conn.Write(bp); err != nil {
return err
}
return nil
}

func init() {
outputs.Add("influxdb", func() outputs.Output {
return &InfluxDB{}
})
}
18 changes: 18 additions & 0 deletions outputs/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package outputs

import (
"github.com/influxdb/influxdb/client"
)

type Output interface {
Connect() error
Write(client.BatchPoints) error
}

type Creator func() Output

var Outputs = map[string]Creator{}

func Add(name string, creator Creator) {
Outputs[name] = creator
}

0 comments on commit e690423

Please sign in to comment.