-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #47 from jipperinbham/rethinkdb-plugin
add RethinkDB plugin
- Loading branch information
Showing
6 changed files
with
647 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package rethinkdb | ||
|
||
import ( | ||
"fmt" | ||
"net/url" | ||
"sync" | ||
|
||
"github.com/influxdb/telegraf/plugins" | ||
|
||
"gopkg.in/dancannon/gorethink.v1" | ||
) | ||
|
||
type RethinkDB struct { | ||
Servers []string | ||
} | ||
|
||
var sampleConfig = ` | ||
# An array of URI to gather stats about. Specify an ip or hostname | ||
# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105, | ||
# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc. | ||
# | ||
# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port. | ||
servers = ["127.0.0.1:28015"]` | ||
|
||
func (r *RethinkDB) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (r *RethinkDB) Description() string { | ||
return "Read metrics from one or many RethinkDB servers" | ||
} | ||
|
||
var localhost = &Server{Url: &url.URL{Host: "127.0.0.1:28015"}} | ||
|
||
// Reads stats from all configured servers accumulates stats. | ||
// Returns one of the errors encountered while gather stats (if any). | ||
func (r *RethinkDB) Gather(acc plugins.Accumulator) error { | ||
if len(r.Servers) == 0 { | ||
r.gatherServer(localhost, acc) | ||
return nil | ||
} | ||
|
||
var wg sync.WaitGroup | ||
|
||
var outerr error | ||
|
||
for _, serv := range r.Servers { | ||
u, err := url.Parse(serv) | ||
if err != nil { | ||
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err) | ||
} else if u.Scheme == "" { | ||
// fallback to simple string based address (i.e. "10.0.0.1:10000") | ||
u.Host = serv | ||
} | ||
wg.Add(1) | ||
go func(serv string) { | ||
defer wg.Done() | ||
outerr = r.gatherServer(&Server{Url: u}, acc) | ||
}(serv) | ||
} | ||
|
||
wg.Wait() | ||
|
||
return outerr | ||
} | ||
|
||
func (r *RethinkDB) gatherServer(server *Server, acc plugins.Accumulator) error { | ||
var err error | ||
connectOpts := gorethink.ConnectOpts{ | ||
Address: server.Url.Host, | ||
DiscoverHosts: false, | ||
} | ||
if server.Url.User != nil { | ||
pwd, set := server.Url.User.Password() | ||
if set && pwd != "" { | ||
connectOpts.AuthKey = pwd | ||
} | ||
} | ||
server.session, err = gorethink.Connect(connectOpts) | ||
if err != nil { | ||
return fmt.Errorf("Unable to connect to RethinkDB, %s\n", err.Error()) | ||
} | ||
defer server.session.Close() | ||
|
||
return server.gatherData(acc) | ||
} | ||
|
||
func init() { | ||
plugins.Add("rethinkdb", func() plugins.Plugin { | ||
return &RethinkDB{} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package rethinkdb | ||
|
||
import ( | ||
"reflect" | ||
"time" | ||
|
||
"github.com/influxdb/telegraf/plugins" | ||
) | ||
|
||
type serverStatus struct { | ||
Id string `gorethink:"id"` | ||
Network struct { | ||
Addresses []Address `gorethink:"canonical_addresses"` | ||
Hostname string `gorethink:"hostname"` | ||
DriverPort int `gorethink:"reql_port"` | ||
} `gorethink:"network"` | ||
Process struct { | ||
Version string `gorethink:"version"` | ||
RunningSince time.Time `gorethink:"time_started"` | ||
} `gorethink:"process"` | ||
} | ||
|
||
type Address struct { | ||
Host string `gorethink:"host"` | ||
Port int `gorethink:"port"` | ||
} | ||
|
||
type stats struct { | ||
Engine Engine `gorethink:"query_engine"` | ||
} | ||
|
||
type Engine struct { | ||
ClientConns int64 `gorethink:"client_connections,omitempty"` | ||
ClientActive int64 `gorethink:"clients_active,omitempty"` | ||
QueriesPerSec int64 `gorethink:"queries_per_sec,omitempty"` | ||
TotalQueries int64 `gorethink:"queries_total,omitempty"` | ||
ReadsPerSec int64 `gorethink:"read_docs_per_sec,omitempty"` | ||
TotalReads int64 `gorethink:"read_docs_total,omitempty"` | ||
WritesPerSec int64 `gorethink:"written_docs_per_sec,omitempty"` | ||
TotalWrites int64 `gorethink:"written_docs_total,omitempty"` | ||
} | ||
|
||
type tableStatus struct { | ||
Id string `gorethink:"id"` | ||
DB string `gorethink:"db"` | ||
Name string `gorethink:"name"` | ||
} | ||
|
||
type tableStats struct { | ||
Engine Engine `gorethink:"query_engine"` | ||
Storage Storage `gorethink:"storage_engine"` | ||
} | ||
|
||
type Storage struct { | ||
Cache Cache `gorethink:"cache"` | ||
Disk Disk `gorethink:"disk"` | ||
} | ||
|
||
type Cache struct { | ||
BytesInUse int64 `gorethink:"in_use_bytes"` | ||
} | ||
|
||
type Disk struct { | ||
ReadBytesPerSec int64 `gorethink:"read_bytes_per_sec"` | ||
ReadBytesTotal int64 `gorethink:"read_bytes_total"` | ||
WriteBytesPerSec int64 `gorethik:"written_bytes_per_sec"` | ||
WriteBytesTotal int64 `gorethink:"written_bytes_total"` | ||
SpaceUsage SpaceUsage `gorethink:"space_usage"` | ||
} | ||
|
||
type SpaceUsage struct { | ||
Data int64 `gorethink:"data_bytes"` | ||
Garbage int64 `gorethink:"garbage_bytes"` | ||
Metadata int64 `gorethink:"metadata_bytes"` | ||
Prealloc int64 `gorethink:"preallocated_bytes"` | ||
} | ||
|
||
var engineStats = map[string]string{ | ||
"active_clients": "ClientActive", | ||
"clients": "ClientConns", | ||
"queries_per_sec": "QueriesPerSec", | ||
"total_queries": "TotalQueries", | ||
"read_docs_per_sec": "ReadsPerSec", | ||
"total_reads": "TotalReads", | ||
"written_docs_per_sec": "WritesPerSec", | ||
"total_writes": "TotalWrites", | ||
} | ||
|
||
func (e *Engine) AddEngineStats(keys []string, acc plugins.Accumulator, tags map[string]string) { | ||
engine := reflect.ValueOf(e).Elem() | ||
for _, key := range keys { | ||
acc.Add( | ||
key, | ||
engine.FieldByName(engineStats[key]).Interface(), | ||
tags, | ||
) | ||
} | ||
} | ||
|
||
func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) { | ||
acc.Add("cache_bytes_in_use", s.Cache.BytesInUse, tags) | ||
acc.Add("disk_read_bytes_per_sec", s.Disk.ReadBytesPerSec, tags) | ||
acc.Add("disk_read_bytes_total", s.Disk.ReadBytesTotal, tags) | ||
acc.Add("disk_written_bytes_per_sec", s.Disk.WriteBytesPerSec, tags) | ||
acc.Add("disk_written_bytes_total", s.Disk.WriteBytesTotal, tags) | ||
acc.Add("disk_usage_data_bytes", s.Disk.SpaceUsage.Data, tags) | ||
acc.Add("disk_usage_garbage_bytes", s.Disk.SpaceUsage.Garbage, tags) | ||
acc.Add("disk_usage_metadata_bytes", s.Disk.SpaceUsage.Metadata, tags) | ||
acc.Add("disk_usage_preallocated_bytes", s.Disk.SpaceUsage.Prealloc, tags) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
package rethinkdb | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/influxdb/telegraf/testutil" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
var tags = make(map[string]string) | ||
|
||
func TestAddEngineStats(t *testing.T) { | ||
engine := &Engine{ | ||
ClientConns: 0, | ||
ClientActive: 0, | ||
QueriesPerSec: 0, | ||
TotalQueries: 0, | ||
ReadsPerSec: 0, | ||
TotalReads: 0, | ||
WritesPerSec: 0, | ||
TotalWrites: 0, | ||
} | ||
|
||
var acc testutil.Accumulator | ||
|
||
keys := []string{ | ||
"active_clients", | ||
"clients", | ||
"queries_per_sec", | ||
"total_queries", | ||
"read_docs_per_sec", | ||
"total_reads", | ||
"written_docs_per_sec", | ||
"total_writes", | ||
} | ||
engine.AddEngineStats(keys, &acc, tags) | ||
|
||
for _, metric := range keys { | ||
assert.True(t, acc.HasIntValue(metric)) | ||
} | ||
} | ||
|
||
func TestAddEngineStatsPartial(t *testing.T) { | ||
engine := &Engine{ | ||
ClientConns: 0, | ||
ClientActive: 0, | ||
QueriesPerSec: 0, | ||
ReadsPerSec: 0, | ||
WritesPerSec: 0, | ||
} | ||
|
||
var acc testutil.Accumulator | ||
|
||
keys := []string{ | ||
"active_clients", | ||
"clients", | ||
"queries_per_sec", | ||
"read_docs_per_sec", | ||
"written_docs_per_sec", | ||
} | ||
|
||
missing_keys := []string{ | ||
"total_queries", | ||
"total_reads", | ||
"total_writes", | ||
} | ||
engine.AddEngineStats(keys, &acc, tags) | ||
|
||
for _, metric := range missing_keys { | ||
assert.False(t, acc.HasIntValue(metric)) | ||
} | ||
} | ||
|
||
func TestAddStorageStats(t *testing.T) { | ||
storage := &Storage{ | ||
Cache: Cache{ | ||
BytesInUse: 0, | ||
}, | ||
Disk: Disk{ | ||
ReadBytesPerSec: 0, | ||
ReadBytesTotal: 0, | ||
WriteBytesPerSec: 0, | ||
WriteBytesTotal: 0, | ||
SpaceUsage: SpaceUsage{ | ||
Data: 0, | ||
Garbage: 0, | ||
Metadata: 0, | ||
Prealloc: 0, | ||
}, | ||
}, | ||
} | ||
|
||
var acc testutil.Accumulator | ||
|
||
keys := []string{ | ||
"cache_bytes_in_use", | ||
"disk_read_bytes_per_sec", | ||
"disk_read_bytes_total", | ||
"disk_written_bytes_per_sec", | ||
"disk_written_bytes_total", | ||
"disk_usage_data_bytes", | ||
"disk_usage_garbage_bytes", | ||
"disk_usage_metadata_bytes", | ||
"disk_usage_preallocated_bytes", | ||
} | ||
|
||
storage.AddStats(&acc, tags) | ||
|
||
for _, metric := range keys { | ||
assert.True(t, acc.HasIntValue(metric)) | ||
} | ||
} |
Oops, something went wrong.