Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional output plugin for writing to OpenTSDB using telnet mode #182

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ found by running `telegraf -sample-config`
* influxdb
* kafka
* datadog
* opentsdb

## Contributing

Expand Down
1 change: 1 addition & 0 deletions outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ import (
_ "github.com/influxdb/telegraf/outputs/datadog"
_ "github.com/influxdb/telegraf/outputs/influxdb"
_ "github.com/influxdb/telegraf/outputs/kafka"
_ "github.com/influxdb/telegraf/outputs/opentsdb"
)
78 changes: 78 additions & 0 deletions outputs/opentsdb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# OpenTSDB Output Plugin

This plugin writes to a OpenTSDB instance using the "telnet" mode

## Transfer "Protocol" in the telnet mode

The expected input from OpenTSDB is specified in the following way:

```
put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
```

The telegraf output plugin adds an optional prefix to the metric keys so
that a subamount can be selected.

```
put <[prefix.]metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
```

### Example

```
put nine.telegraf.system_load1 1441910356 0.430000 dc=homeoffice host=irimame scope=green
put nine.telegraf.system_load5 1441910356 0.580000 dc=homeoffice host=irimame scope=green
put nine.telegraf.system_load15 1441910356 0.730000 dc=homeoffice host=irimame scope=green
put nine.telegraf.system_uptime 1441910356 3655970.000000 dc=homeoffice host=irimame scope=green
put nine.telegraf.system_uptime_format 1441910356 dc=homeoffice host=irimame scope=green
put nine.telegraf.mem_total 1441910356 4145426432 dc=homeoffice host=irimame scope=green
...
put nine.telegraf.io_write_bytes 1441910366 0 dc=homeoffice host=irimame name=vda2 scope=green
put nine.telegraf.io_read_time 1441910366 0 dc=homeoffice host=irimame name=vda2 scope=green
put nine.telegraf.io_write_time 1441910366 0 dc=homeoffice host=irimame name=vda2 scope=green
put nine.telegraf.io_io_time 1441910366 0 dc=homeoffice host=irimame name=vda2 scope=green
put nine.telegraf.ping_packets_transmitted 1441910366 dc=homeoffice host=irimame scope=green url=www.google.com
put nine.telegraf.ping_packets_received 1441910366 dc=homeoffice host=irimame scope=green url=www.google.com
put nine.telegraf.ping_percent_packet_loss 1441910366 0.000000 dc=homeoffice host=irimame scope=green url=www.google.com
put nine.telegraf.ping_average_response_ms 1441910366 24.006000 dc=homeoffice host=irimame scope=green url=www.google.com
...
```

##

The OpenTSDB interface can be simulated with this reader:

```
// opentsdb_telnet_mode_mock.go
package main

import (
"io"
"log"
"net"
"os"
)

func main() {
l, err := net.Listen("tcp", "localhost:4242")
if err != nil {
log.Fatal(err)
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
go func(c net.Conn) {
defer c.Close()
io.Copy(os.Stdout, c)
}(conn)
}
}

```

## Allowed values for metrics

OpenTSDB allows `integers` and `floats` as input values
161 changes: 161 additions & 0 deletions outputs/opentsdb/opentsdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package opentsdb

import (
"fmt"
"net"
"sort"
"strconv"
"strings"
"time"

"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs"
)

type OpenTSDB struct {
Prefix string

Host string
Port int

Debug bool
}

var sampleConfig = `
# prefix for metrics keys
prefix = "my.specific.prefix."

## Telnet Mode ##
# DNS name of the OpenTSDB server in telnet mode
host = "opentsdb.example.com"

# Port of the OpenTSDB server in telnet mode
port = 4242

# Debug true - Prints OpenTSDB communication
debug = false
`

type MetricLine struct {
Metric string
Timestamp int64
Value string
Tags string
}

func (o *OpenTSDB) Connect() error {
// Test Connection to OpenTSDB Server
uri := fmt.Sprintf("%s:%d", o.Host, o.Port)
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
if err != nil {
return fmt.Errorf("OpenTSDB: TCP address cannot be resolved")
}
connection, err := net.DialTCP("tcp", nil, tcpAddr)
defer connection.Close()
if err != nil {
return fmt.Errorf("OpenTSDB: Telnet connect fail")
}
return nil
}

func (o *OpenTSDB) Write(bp client.BatchPoints) error {
if len(bp.Points) == 0 {
return nil
}
var timeNow = time.Now()
// Send Data with telnet / socket communication
uri := fmt.Sprintf("%s:%d", o.Host, o.Port)
tcpAddr, _ := net.ResolveTCPAddr("tcp", uri)
connection, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return fmt.Errorf("OpenTSDB: Telnet connect fail")
}
for _, pt := range bp.Points {
metric := &MetricLine{
Metric: fmt.Sprintf("%s%s", o.Prefix, pt.Measurement),
Timestamp: timeNow.Unix(),
}
metricValue, buildError := buildValue(bp, pt)
if buildError != nil {
fmt.Printf("OpenTSDB: %s\n", buildError.Error())
continue
}
metric.Value = metricValue

tagsSlice := buildTags(bp.Tags, pt.Tags)
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))

messageLine := fmt.Sprintf("put %s %v %s %s\n", metric.Metric, metric.Timestamp, metric.Value, metric.Tags)
if o.Debug {
fmt.Print(messageLine)
}
_, err := connection.Write([]byte(messageLine))
if err != nil {
fmt.Errorf("OpenTSDB: Telnet writing error %s", err.Error())
}
}
defer connection.Close()

return nil
}

func buildTags(bpTags map[string]string, ptTags map[string]string) []string {
tags := make([]string, (len(bpTags) + len(ptTags)))
index := 0
for k, v := range bpTags {
tags[index] = fmt.Sprintf("%s=%s", k, v)
index += 1
}
for k, v := range ptTags {
tags[index] = fmt.Sprintf("%s=%s", k, v)
index += 1
}
sort.Strings(tags)
return tags
}

func buildValue(bp client.BatchPoints, pt client.Point) (string, error) {
var retv string
var v = pt.Fields["value"]
switch p := v.(type) {
case int64:
retv = IntToString(int64(p))
case uint64:
retv = UIntToString(uint64(p))
case float64:
retv = FloatToString(float64(p))
default:
return retv, fmt.Errorf("unexpected type %T with value %v for OpenTSDB", v, v)
}
return retv, nil
}

func IntToString(input_num int64) string {
return strconv.FormatInt(input_num, 10)
}

func UIntToString(input_num uint64) string {
return strconv.FormatUint(input_num, 10)
}

func FloatToString(input_num float64) string {
return strconv.FormatFloat(input_num, 'f', 6, 64)
}

func (o *OpenTSDB) SampleConfig() string {
return sampleConfig
}

func (o *OpenTSDB) Description() string {
return "Configuration for OpenTSDB server to send metrics to"
}

func (o *OpenTSDB) Close() error {
return nil
}

func init() {
outputs.Add("opentsdb", func() outputs.Output {
return &OpenTSDB{}
})
}
95 changes: 95 additions & 0 deletions outputs/opentsdb/opentsdb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package opentsdb

import (
"reflect"
"testing"
"time"

"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func TestBuildTagsTelnet(t *testing.T) {
var tagtests = []struct {
bpIn map[string]string
ptIn map[string]string
outTags []string
}{
{
map[string]string{"one": "two"},
map[string]string{"three": "four"},
[]string{"one=two", "three=four"},
},
{
map[string]string{"aaa": "bbb"},
map[string]string{},
[]string{"aaa=bbb"},
},
{
map[string]string{"one": "two"},
map[string]string{"aaa": "bbb"},
[]string{"aaa=bbb", "one=two"},
},
{
map[string]string{},
map[string]string{},
[]string{},
},
}
for _, tt := range tagtests {
tags := buildTags(tt.bpIn, tt.ptIn)
if !reflect.DeepEqual(tags, tt.outTags) {
t.Errorf("\nexpected %+v\ngot %+v\n", tt.outTags, tags)
}
}
}
func TestWrite(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

o := &OpenTSDB{
Host: testutil.GetLocalHost(),
Port: 24242,
Prefix: "prefix.test.",
}

// Verify that we can connect to the OpenTSDB instance
err := o.Connect()
require.NoError(t, err)

// Verify that we can successfully write data to OpenTSDB
err = o.Write(testutil.MockBatchPoints())
require.NoError(t, err)

// Verify postive and negative test cases of writing data
var bp client.BatchPoints
bp.Time = time.Now()
bp.Tags = map[string]string{"testkey": "testvalue"}
bp.Points = []client.Point{
{
Measurement: "justametric.float",
Fields: map[string]interface{}{"value": float64(1.0)},
},
{
Measurement: "justametric.int",
Fields: map[string]interface{}{"value": int64(123456789)},
},
{
Measurement: "justametric.uint",
Fields: map[string]interface{}{"value": uint64(123456789012345)},
},
{
Measurement: "justametric.string",
Fields: map[string]interface{}{"value": "Lorem Ipsum"},
},
{
Measurement: "justametric.anotherfloat",
Fields: map[string]interface{}{"value": float64(42.0)},
},
}
err = o.Write(bp)
require.NoError(t, err)

}
6 changes: 6 additions & 0 deletions scripts/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ kafka:
environment:
ADVERTISED_HOST:
ADVERTISED_PORT: 9092

opentsdb:
image: lancope/opentsdb
ports:
- "24242:4242"