From ea6d517f455ad8bb7db633dec372a9c72efff415 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Sun, 6 Aug 2017 01:16:02 +0200 Subject: [PATCH 01/39] cratedb: basic scaffolding --- plugins/outputs/all/all.go | 1 + plugins/outputs/cratedb/cratedb.go | 96 +++++++++++++++++++++ plugins/outputs/cratedb/cratedb_test.go | 38 ++++++++ plugins/outputs/cratedb/docker-compose.yaml | 7 ++ 4 files changed, 142 insertions(+) create mode 100644 plugins/outputs/cratedb/cratedb.go create mode 100644 plugins/outputs/cratedb/cratedb_test.go create mode 100644 plugins/outputs/cratedb/docker-compose.yaml diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 089a5690977e5..972acebb2d9e7 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -4,6 +4,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/amon" _ "github.com/influxdata/telegraf/plugins/outputs/amqp" _ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch" + _ "github.com/influxdata/telegraf/plugins/outputs/cratedb" _ "github.com/influxdata/telegraf/plugins/outputs/datadog" _ "github.com/influxdata/telegraf/plugins/outputs/discard" _ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go new file mode 100644 index 0000000000000..7178da2850877 --- /dev/null +++ b/plugins/outputs/cratedb/cratedb.go @@ -0,0 +1,96 @@ +package cratedb + +import ( + "context" + "database/sql" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + _ "github.com/lib/pq" +) + +type CrateDB struct { + URL string + Timeout internal.Duration + Table string + TableCreate bool + DB *sql.DB +} + +var sampleConfig = ` + # A lib/pq connection string. + # See http://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters + url = "postgres://user:password@localhost/?sslmode=disable. + # The timouet for writing metrics. + timeout = "5s" +` + +func (c *CrateDB) Connect() error { + db, err := sql.Open("postgres", c.URL) + if err != nil { + return err + } else if c.TableCreate { + // Insert + sql := ` +CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( + "timestamp" TIMESTAMP, + "name" STRING, + "tags_hash" STRING, + "tags" OBJECT(DYNAMIC), + "value" DOUBLE, + PRIMARY KEY ("timestamp", "tags_hash") +); +` + ctx, _ := context.WithTimeout(context.Background(), c.Timeout.Duration) + if _, err := db.ExecContext(ctx, sql); err != nil { + return err + } + } + c.DB = db + return nil +} + +func (c *CrateDB) Write(metrics []telegraf.Metric) error { + sql := ` +INSERT INTO ` + c.Table + ` ("name", "timestamp", "tags", "tags_hash") +VALUES ($1, $2, $3, $4); +` + ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) + defer cancel() + + //stmt, err := c.DB.PrepareContext(ctx, sql) + //if err != nil { + //return err + //} + //defer stmt.Close() + + //_ = stmt + for _, m := range metrics { + if _, err := c.DB.ExecContext(ctx, sql, m.Name(), m.Time(), 1, 2); err != nil { + return err + } + } + return nil +} + +func (c *CrateDB) SampleConfig() string { + return sampleConfig +} + +func (c *CrateDB) Description() string { + return "Configuration for CrateDB to send metrics to." +} + +func (c *CrateDB) Close() error { + return c.DB.Close() +} + +func init() { + outputs.Add("cratedb", func() telegraf.Output { + return &CrateDB{ + Timeout: internal.Duration{Duration: time.Second * 5}, + } + }) +} diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go new file mode 100644 index 0000000000000..ea29073e2f770 --- /dev/null +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -0,0 +1,38 @@ +package cratedb + +import ( + "os" + "testing" + "time" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWrite(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + c := &CrateDB{ + URL: testURL(), + Table: "test", + Timeout: internal.Duration{Duration: time.Second * 5}, + TableCreate: true, + } + + require.NoError(t, c.Connect()) + require.NoError(t, c.Write(testutil.MockMetrics())) + require.NoError(t, c.Close()) + +} + +func testURL() string { + url := os.Getenv("CRATE_URL") + if url == "" { + // @TODO use telegraf helper func for hostname + return "postgres://localhost:6543/test?sslmode=disable" + } + return url +} diff --git a/plugins/outputs/cratedb/docker-compose.yaml b/plugins/outputs/cratedb/docker-compose.yaml new file mode 100644 index 0000000000000..19462e3e525de --- /dev/null +++ b/plugins/outputs/cratedb/docker-compose.yaml @@ -0,0 +1,7 @@ +version: "3" +services: + cratedb: + image: crate + command: crate -Cnetwork.host=0.0.0.0 -Ctransport.host=localhost + ports: + - "6543:5432" From bbb45eba26dda2205d71e4702d93f99c015e4627 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 8 Aug 2017 16:58:56 +0200 Subject: [PATCH 02/39] WIP: mostly worked on escaping stuff --- plugins/outputs/cratedb/cratedb.go | 82 +++++++++++++++++++++++-- plugins/outputs/cratedb/cratedb_test.go | 42 +++++++++++++ 2 files changed, 118 insertions(+), 6 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 7178da2850877..a5a4f7d57fd3f 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -3,6 +3,8 @@ package cratedb import ( "context" "database/sql" + "fmt" + "strings" "time" "github.com/influxdata/telegraf" @@ -57,24 +59,92 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error { INSERT INTO ` + c.Table + ` ("name", "timestamp", "tags", "tags_hash") VALUES ($1, $2, $3, $4); ` + ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) defer cancel() - //stmt, err := c.DB.PrepareContext(ctx, sql) - //if err != nil { - //return err - //} - //defer stmt.Close() + stmt, err := c.DB.PrepareContext(ctx, sql) + if err != nil { + return err + } + defer stmt.Close() //_ = stmt for _, m := range metrics { - if _, err := c.DB.ExecContext(ctx, sql, m.Name(), m.Time(), 1, 2); err != nil { + if _, err := stmt.ExecContext(ctx, m.Name(), m.Time(), 1, 2); err != nil { return err } } return nil } +func insertSQL(table string, metrics []telegraf.Metric) (string, error) { + rows := make([]string, len(metrics)) + for i, m := range metrics { + cols := []interface{}{ + m.Name(), + m.Time(), + m.Tags(), + } + escapedCols := make([]string, len(cols)) + for i, col := range cols { + escaped, err := escapeValue(col) + if err != nil { + return "", err + } + escapedCols[i] = escaped + } + rows[i] = `(` + strings.Join(escapedCols, ",") + `)` + } + sql := `INSERT INTO ` + table + ` ("name", "timestamp", "tags", "tags_hash", "value") +VALUES +` + strings.Join(rows, " ,\n") + `;` + return sql, nil +} + +// escapeValue returns a string version of val that is suitable for being used +// inside of a VALUES expression or similar. Unsupported types return an error. +// +// Warning: This is not ideal from a security perspective, but unfortunately +// CrateDB does not support enough of the PostgreSQL wire protocol to allow +// using lib/pq with $1, $2 placeholders. Security conscious users of this +// plugin should probably refrain from using it in combination with untrusted +// inputs. +func escapeValue(val interface{}) (string, error) { + switch t := val.(type) { + case string: + return escapeString(t, `'`), nil + case time.Time: + // see https://crate.io/docs/crate/reference/sql/data_types.html#timestamp + return escapeValue(t.Format("2006-01-02T15:04:05.999-0700")) + case map[string]string: + // There is a decent chance that the implementation below doesn't catch all + // edge cases, but it's hard to tell since the format seems to be a bit + // underspecified. Anyway, luckily we only have to deal with a + // map[string]string here, giving a higher chance that the code below is + // correct. + // See https://crate.io/docs/crate/reference/sql/data_types.html#object + pairs := make([]string, 0, len(t)) + for k, v := range t { + val, err := escapeValue(v) + if err != nil { + return "", err + } + pairs = append(pairs, escapeString(k, `"`)+" = "+val) + } + return `{` + strings.Join(pairs, ", ") + `}`, nil + default: + // This might be panic worthy under normal circumstances, but it's probably + // better to not shut down the entire telegraf process because of one + // misbehaving plugin. + return "", fmt.Errorf("unexpected type: %#v", t) + } +} + +func escapeString(s string, quote string) string { + return quote + strings.Replace(s, quote, quote+quote, -1) + quote +} + func (c *CrateDB) SampleConfig() string { return sampleConfig } diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index ea29073e2f770..449e7fad32fd6 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -5,12 +5,14 @@ import ( "testing" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) func TestConnectAndWrite(t *testing.T) { + return if testing.Short() { t.Skip("Skipping integration test in short mode") } @@ -25,7 +27,47 @@ func TestConnectAndWrite(t *testing.T) { require.NoError(t, c.Connect()) require.NoError(t, c.Write(testutil.MockMetrics())) require.NoError(t, c.Close()) +} + +func Test_insertSQL(t *testing.T) { + return + tests := []struct { + Metrics []telegraf.Metric + Want string + }{ + { + Metrics: testutil.MockMetrics(), + Want: "INSERT ...", + }, + } + for _, test := range tests { + if got, err := insertSQL("my_table", test.Metrics); err != nil { + t.Error(err) + } else if got != test.Want { + t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) + } + } +} + +func Test_escapeValue(t *testing.T) { + tests := []struct { + Val interface{} + Want string + }{ + {`foo`, `'foo'`}, + {`foo'bar 'yeah`, `'foo''bar ''yeah'`}, + {time.Date(2017, 8, 7, 16, 44, 52, 123*1000*1000, time.FixedZone("Dreamland", 5400)), `'2017-08-07T16:44:52.123+0130'`}, + {map[string]string{"foo": "bar"}, `{"foo" = 'bar'}`}, + } + + for _, test := range tests { + if got, err := escapeValue(test.Val); err != nil { + t.Errorf("val: %#v: %s", test.Val, err) + } else if got != test.Want { + t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) + } + } } func testURL() string { From 4f468339c2aa384cd05ba4319a9f507cc809fcb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 10 Aug 2017 19:38:31 +0200 Subject: [PATCH 03/39] Get basic INSERT to work! :) --- plugins/outputs/cratedb/cratedb.go | 104 ++++++++++++++---------- plugins/outputs/cratedb/cratedb_test.go | 49 +++++++++-- 2 files changed, 106 insertions(+), 47 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index a5a4f7d57fd3f..f204d73d20f29 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "sort" "strings" "time" @@ -34,15 +35,13 @@ func (c *CrateDB) Connect() error { if err != nil { return err } else if c.TableCreate { - // Insert + // TODO(fg) PRIMARY KEY based on hash value sql := ` CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( - "timestamp" TIMESTAMP, - "name" STRING, - "tags_hash" STRING, - "tags" OBJECT(DYNAMIC), - "value" DOUBLE, - PRIMARY KEY ("timestamp", "tags_hash") + "timestamp" TIMESTAMP NOT NULL, + "name" STRING, + "tags" OBJECT(DYNAMIC), + "fields" OBJECT(DYNAMIC) ); ` ctx, _ := context.WithTimeout(context.Background(), c.Timeout.Duration) @@ -55,25 +54,15 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( } func (c *CrateDB) Write(metrics []telegraf.Metric) error { - sql := ` -INSERT INTO ` + c.Table + ` ("name", "timestamp", "tags", "tags_hash") -VALUES ($1, $2, $3, $4); -` - + // TODO(fg) test timeouts ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) defer cancel() - stmt, err := c.DB.PrepareContext(ctx, sql) - if err != nil { + if sql, err := insertSQL(c.Table, metrics); err != nil { + return err + } else if _, err := c.DB.ExecContext(ctx, sql); err != nil { + fmt.Printf("%s\n", sql) return err - } - defer stmt.Close() - - //_ = stmt - for _, m := range metrics { - if _, err := stmt.ExecContext(ctx, m.Name(), m.Time(), 1, 2); err != nil { - return err - } } return nil } @@ -82,10 +71,12 @@ func insertSQL(table string, metrics []telegraf.Metric) (string, error) { rows := make([]string, len(metrics)) for i, m := range metrics { cols := []interface{}{ - m.Name(), m.Time(), + m.Name(), m.Tags(), + m.Fields(), } + escapedCols := make([]string, len(cols)) for i, col := range cols { escaped, err := escapeValue(col) @@ -94,11 +85,11 @@ func insertSQL(table string, metrics []telegraf.Metric) (string, error) { } escapedCols[i] = escaped } - rows[i] = `(` + strings.Join(escapedCols, ",") + `)` + rows[i] = `(` + strings.Join(escapedCols, ", ") + `)` } - sql := `INSERT INTO ` + table + ` ("name", "timestamp", "tags", "tags_hash", "value") + sql := `INSERT INTO ` + table + ` ("timestamp", "name", "tags", "fields") VALUES -` + strings.Join(rows, " ,\n") + `;` +` + strings.Join(rows, " ,\n") + `;` return sql, nil } @@ -114,33 +105,62 @@ func escapeValue(val interface{}) (string, error) { switch t := val.(type) { case string: return escapeString(t, `'`), nil + case int, int32, int64, float32, float64: + return fmt.Sprint(t), nil case time.Time: // see https://crate.io/docs/crate/reference/sql/data_types.html#timestamp return escapeValue(t.Format("2006-01-02T15:04:05.999-0700")) case map[string]string: - // There is a decent chance that the implementation below doesn't catch all - // edge cases, but it's hard to tell since the format seems to be a bit - // underspecified. Anyway, luckily we only have to deal with a - // map[string]string here, giving a higher chance that the code below is - // correct. - // See https://crate.io/docs/crate/reference/sql/data_types.html#object - pairs := make([]string, 0, len(t)) - for k, v := range t { - val, err := escapeValue(v) - if err != nil { - return "", err - } - pairs = append(pairs, escapeString(k, `"`)+" = "+val) - } - return `{` + strings.Join(pairs, ", ") + `}`, nil + return escapeObject(convertMap(t)) + case map[string]interface{}: + return escapeObject(t) default: // This might be panic worthy under normal circumstances, but it's probably // better to not shut down the entire telegraf process because of one // misbehaving plugin. - return "", fmt.Errorf("unexpected type: %#v", t) + return "", fmt.Errorf("unexpected type: %T: %#v", t, t) + } +} + +// convertMap converts m from map[string]string to map[string]interface{} by +// copying it. Generics, oh generics where art thou? +func convertMap(m map[string]string) map[string]interface{} { + c := make(map[string]interface{}, len(m)) + for k, v := range m { + c[k] = v + } + return c +} + +func escapeObject(m map[string]interface{}) (string, error) { + // There is a decent chance that the implementation below doesn't catch all + // edge cases, but it's hard to tell since the format seems to be a bit + // underspecified. + // See https://crate.io/docs/crate/reference/sql/data_types.html#object + + // We find all keys and sort them first because iterating a map in go is + // randomized and we need consistent output for our unit tests. + keys := make([]string, 0, len(m)) + for k, _ := range m { + keys = append(keys, k) + } + sort.Strings(keys) + + // Now we build our key = val pairs + pairs := make([]string, 0, len(m)) + for _, k := range keys { + // escape the value of our key k (potentially recursive) + val, err := escapeValue(m[k]) + if err != nil { + return "", err + } + pairs = append(pairs, escapeString(k, `"`)+" = "+val) } + return `{` + strings.Join(pairs, ", ") + `}`, nil } +// escapeString wraps s in the given quote string and replaces all occurences +// of it inside of s with a double quote. func escapeString(s string, quote string) string { return quote + strings.Replace(s, quote, quote+quote, -1) + quote } diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index 449e7fad32fd6..2b81170d77bb4 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -1,7 +1,9 @@ package cratedb import ( + "database/sql" "os" + "strings" "testing" "time" @@ -12,14 +14,28 @@ import ( ) func TestConnectAndWrite(t *testing.T) { - return if testing.Short() { t.Skip("Skipping integration test in short mode") } + url := testURL() + table := "test" + + // dropSQL drops our table before each test. This simplifies changing the + // schema during development :). + dropSQL := "DROP TABLE IF EXISTS " + escapeString(table, `"`) + db, err := sql.Open("postgres", url) + if err != nil { + t.Fatal(err) + } else if _, err := db.Exec(dropSQL); err != nil { + t.Fatal(err) + } else if err := db.Close(); err != nil { + t.Fatal(err) + } + c := &CrateDB{ - URL: testURL(), - Table: "test", + URL: url, + Table: table, Timeout: internal.Duration{Duration: time.Second * 5}, TableCreate: true, } @@ -30,14 +46,17 @@ func TestConnectAndWrite(t *testing.T) { } func Test_insertSQL(t *testing.T) { - return tests := []struct { Metrics []telegraf.Metric Want string }{ { Metrics: testutil.MockMetrics(), - Want: "INSERT ...", + Want: strings.TrimSpace(` +INSERT INTO my_table ("timestamp", "name", "tags", "fields") +VALUES +('2009-11-11T00:00:00+0100', 'test1', {"tag1" = 'value1'}, {"value" = 1}); +`), }, } @@ -55,10 +74,30 @@ func Test_escapeValue(t *testing.T) { Val interface{} Want string }{ + // string {`foo`, `'foo'`}, {`foo'bar 'yeah`, `'foo''bar ''yeah'`}, + // int types + {123, `123`}, // int + {int64(123), `123`}, + {int32(123), `123`}, + // float types + {123.456, `123.456`}, + {float32(123.456), `123.456`}, // floating point SNAFU + {float64(123.456), `123.456`}, + // time.Time {time.Date(2017, 8, 7, 16, 44, 52, 123*1000*1000, time.FixedZone("Dreamland", 5400)), `'2017-08-07T16:44:52.123+0130'`}, + // map[string]string + {map[string]string{}, `{}`}, + {map[string]string(nil), `{}`}, {map[string]string{"foo": "bar"}, `{"foo" = 'bar'}`}, + {map[string]string{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`}, + // map[string]interface{} + {map[string]interface{}{}, `{}`}, + {map[string]interface{}(nil), `{}`}, + {map[string]interface{}{"foo": "bar"}, `{"foo" = 'bar'}`}, + {map[string]interface{}{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`}, + {map[string]interface{}{"foo": map[string]interface{}{"one": "more"}}, `{"foo" = {"one" = 'more'}}`}, } for _, test := range tests { From 4eaa15800904048dc487f411c830c8b30404389b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 17:58:03 +0200 Subject: [PATCH 04/39] Add hash_id field and PRIMARY KEY --- plugins/outputs/cratedb/cratedb.go | 11 +++++++---- plugins/outputs/cratedb/cratedb_test.go | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index f204d73d20f29..ab70953929f9a 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -35,13 +35,14 @@ func (c *CrateDB) Connect() error { if err != nil { return err } else if c.TableCreate { - // TODO(fg) PRIMARY KEY based on hash value sql := ` CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( + "hash_id" LONG, "timestamp" TIMESTAMP NOT NULL, "name" STRING, "tags" OBJECT(DYNAMIC), - "fields" OBJECT(DYNAMIC) + "fields" OBJECT(DYNAMIC), + PRIMARY KEY ("timestamp", "hash_id") ); ` ctx, _ := context.WithTimeout(context.Background(), c.Timeout.Duration) @@ -70,7 +71,9 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error { func insertSQL(table string, metrics []telegraf.Metric) (string, error) { rows := make([]string, len(metrics)) for i, m := range metrics { + m.HashID() cols := []interface{}{ + m.HashID(), m.Time(), m.Name(), m.Tags(), @@ -87,7 +90,7 @@ func insertSQL(table string, metrics []telegraf.Metric) (string, error) { } rows[i] = `(` + strings.Join(escapedCols, ", ") + `)` } - sql := `INSERT INTO ` + table + ` ("timestamp", "name", "tags", "fields") + sql := `INSERT INTO ` + table + ` ("hash_id", "timestamp", "name", "tags", "fields") VALUES ` + strings.Join(rows, " ,\n") + `;` return sql, nil @@ -105,7 +108,7 @@ func escapeValue(val interface{}) (string, error) { switch t := val.(type) { case string: return escapeString(t, `'`), nil - case int, int32, int64, float32, float64: + case int, uint, int32, uint32, int64, uint64, float32, float64: return fmt.Sprint(t), nil case time.Time: // see https://crate.io/docs/crate/reference/sql/data_types.html#timestamp diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index 2b81170d77bb4..bfee57dc3eaf3 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -53,9 +53,9 @@ func Test_insertSQL(t *testing.T) { { Metrics: testutil.MockMetrics(), Want: strings.TrimSpace(` -INSERT INTO my_table ("timestamp", "name", "tags", "fields") +INSERT INTO my_table ("hash_id", "timestamp", "name", "tags", "fields") VALUES -('2009-11-11T00:00:00+0100', 'test1', {"tag1" = 'value1'}, {"value" = 1}); +(1845393540509842047, '2009-11-11T00:00:00+0100', 'test1', {"tag1" = 'value1'}, {"value" = 1}); `), }, } From 6321c16d46e541781bdf412221e0be86b2064d39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 18:21:56 +0200 Subject: [PATCH 05/39] smoke test for making sure row was written --- plugins/outputs/cratedb/cratedb_test.go | 36 +++++++++++++++++++------ 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index bfee57dc3eaf3..133baf79e8157 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -25,13 +25,10 @@ func TestConnectAndWrite(t *testing.T) { // schema during development :). dropSQL := "DROP TABLE IF EXISTS " + escapeString(table, `"`) db, err := sql.Open("postgres", url) - if err != nil { - t.Fatal(err) - } else if _, err := db.Exec(dropSQL); err != nil { - t.Fatal(err) - } else if err := db.Close(); err != nil { - t.Fatal(err) - } + require.NoError(t, err) + _, err = db.Exec(dropSQL) + require.NoError(t, err) + defer db.Close() c := &CrateDB{ URL: url, @@ -40,8 +37,31 @@ func TestConnectAndWrite(t *testing.T) { TableCreate: true, } + metrics := testutil.MockMetrics() require.NoError(t, c.Connect()) - require.NoError(t, c.Write(testutil.MockMetrics())) + require.NoError(t, c.Write(metrics)) + + // The code below verifies that the metrics were written. We have to select + // the rows using their primary keys in order to take advantage of + // read-after-write consistency in CrateDB. + for _, m := range metrics { + hashID, err := escapeValue(m.HashID()) + require.NoError(t, err) + timestamp, err := escapeValue(m.Time()) + require.NoError(t, err) + + var id uint64 + row := db.QueryRow( + "SELECT hash_id FROM " + escapeString(table, `"`) + " " + + "WHERE hash_id = " + hashID + " " + + "AND timestamp = " + timestamp, + ) + require.NoError(t, row.Scan(&id)) + // We could check the whole row, but this is meant to be more of a smoke + // test, so just checking the HashID seems fine. + require.Equal(t, id, m.HashID()) + } + require.NoError(t, c.Close()) } From e1a97d043746af2d65e1ce3305d4e6da2d90c113 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 18:50:36 +0200 Subject: [PATCH 06/39] fix: HashID is uint64 but CrateDB wants int64 --- plugins/outputs/cratedb/cratedb.go | 16 +++++++++++++--- plugins/outputs/cratedb/cratedb_test.go | 6 +++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index ab70953929f9a..4ff9cc0a86c3e 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -71,9 +71,16 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error { func insertSQL(table string, metrics []telegraf.Metric) (string, error) { rows := make([]string, len(metrics)) for i, m := range metrics { - m.HashID() + // Note: We have to convert HashID from uint64 to int64 below because + // CrateDB only supports a signed 64 bit LONG type which would give us + // problems, e.g.: + // + // CREATE TABLE my_long (val LONG); + // INSERT INTO my_long(val) VALUES (14305102049502225714); + // -> ERROR: SQLParseException: For input string: "14305102049502225714" + cols := []interface{}{ - m.HashID(), + int64(m.HashID()), m.Time(), m.Name(), m.Tags(), @@ -108,7 +115,10 @@ func escapeValue(val interface{}) (string, error) { switch t := val.(type) { case string: return escapeString(t, `'`), nil - case int, uint, int32, uint32, int64, uint64, float32, float64: + // We don't handle uint, uint32 and uint64 here because CrateDB doesn't + // seem to support unsigned types. But it seems like input plugins don't + // produce those types, so it's hopefully ok. + case int, int32, int64, float32, float64: return fmt.Sprint(t), nil case time.Time: // see https://crate.io/docs/crate/reference/sql/data_types.html#timestamp diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index 133baf79e8157..dcb16d3cb2da7 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -45,12 +45,12 @@ func TestConnectAndWrite(t *testing.T) { // the rows using their primary keys in order to take advantage of // read-after-write consistency in CrateDB. for _, m := range metrics { - hashID, err := escapeValue(m.HashID()) + hashID, err := escapeValue(int64(m.HashID())) require.NoError(t, err) timestamp, err := escapeValue(m.Time()) require.NoError(t, err) - var id uint64 + var id int64 row := db.QueryRow( "SELECT hash_id FROM " + escapeString(table, `"`) + " " + "WHERE hash_id = " + hashID + " " + @@ -59,7 +59,7 @@ func TestConnectAndWrite(t *testing.T) { require.NoError(t, row.Scan(&id)) // We could check the whole row, but this is meant to be more of a smoke // test, so just checking the HashID seems fine. - require.Equal(t, id, m.HashID()) + require.Equal(t, id, int64(m.HashID())) } require.NoError(t, c.Close()) From 91e10af513f08d7baa1741a432d5ad4e6a6a2c5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 18:52:09 +0200 Subject: [PATCH 07/39] remove comment --- plugins/outputs/cratedb/cratedb.go | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 4ff9cc0a86c3e..7b75df2036cb6 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -55,7 +55,6 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( } func (c *CrateDB) Write(metrics []telegraf.Metric) error { - // TODO(fg) test timeouts ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) defer cancel() From 4f72f48792252338f7ba5f4fd4d571a1803cd481 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 18:53:08 +0200 Subject: [PATCH 08/39] no need to cancel our contexts --- plugins/outputs/cratedb/cratedb.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 7b75df2036cb6..155c3c6472bdf 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -55,9 +55,7 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( } func (c *CrateDB) Write(metrics []telegraf.Metric) error { - ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) - defer cancel() - + ctx, _ := context.WithTimeout(context.Background(), c.Timeout.Duration) if sql, err := insertSQL(c.Table, metrics); err != nil { return err } else if _, err := c.DB.ExecContext(ctx, sql); err != nil { From f4d159f44d95354637924d62693376083c2f0add Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 19:05:08 +0200 Subject: [PATCH 09/39] Finish sample config --- etc/telegraf.conf | 13 +++++++++++++ plugins/outputs/cratedb/cratedb.go | 10 +++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 4831f934b3688..4190870d99674 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -201,6 +201,19 @@ # namespace = "InfluxData/Telegraf" +# # Configuration for CrateDB to send metrics to. +# [[outputs.cratedb]] +# # A lib/pq connection string. +# # See http://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters +# url = "postgres://user:password@localhost/schema?sslmode=disable" +# # Timeout for all CrateDB queries. +# timeout = "5s" +# # Name of the table to store metrics in. +# table = "metrics" +# # If true, and the metrics table does not exist, create it automatically. +# table_create = true + + # # Configuration for DataDog API to send metrics to. # [[outputs.datadog]] # ## Datadog API key diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 155c3c6472bdf..76e36f3ae934d 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -18,16 +18,20 @@ type CrateDB struct { URL string Timeout internal.Duration Table string - TableCreate bool + TableCreate bool `toml:"table_create"` DB *sql.DB } var sampleConfig = ` # A lib/pq connection string. # See http://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters - url = "postgres://user:password@localhost/?sslmode=disable. - # The timouet for writing metrics. + url = "postgres://user:password@localhost/schema?sslmode=disable" + # Timeout for all CrateDB queries. timeout = "5s" + # Name of the table to store metrics in. + table = "metrics" + # If true, and the metrics table does not exist, create it automatically. + table_create = true ` func (c *CrateDB) Connect() error { From 1393f7e6495b7377212f2252b80914fdd704f323 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 19:12:40 +0200 Subject: [PATCH 10/39] Whitespace --- plugins/outputs/cratedb/cratedb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 76e36f3ae934d..552587bd92201 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -46,7 +46,7 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( "name" STRING, "tags" OBJECT(DYNAMIC), "fields" OBJECT(DYNAMIC), - PRIMARY KEY ("timestamp", "hash_id") + PRIMARY KEY ("timestamp", "hash_id") ); ` ctx, _ := context.WithTimeout(context.Background(), c.Timeout.Duration) From 50a76650f9fe6910f6274a9b7114954502546f90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 19:12:51 +0200 Subject: [PATCH 11/39] Initial docs --- plugins/outputs/cratedb/README.md | 38 +++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 plugins/outputs/cratedb/README.md diff --git a/plugins/outputs/cratedb/README.md b/plugins/outputs/cratedb/README.md new file mode 100644 index 0000000000000..cdc355590b9d6 --- /dev/null +++ b/plugins/outputs/cratedb/README.md @@ -0,0 +1,38 @@ +# CrateDB Output Plugin for Telegraf + +This plugin writes to [CrateDB](https://crate.io/) via its [PostgreSQL protocol](https://crate.io/docs/crate/reference/protocols/postgres.html). + +## Table Schema + +The plugin requires a a table with the following schema. + + +```sql +CREATE TABLE my_metrics ( + "hash_id" LONG, + "timestamp" TIMESTAMP NOT NULL, + "name" STRING, + "tags" OBJECT(DYNAMIC), + "fields" OBJECT(DYNAMIC), + PRIMARY KEY ("timestamp", "hash_id") +); +``` + +The plugin can also create this table for you automatically via the +`table_create` config option, see below. + +## Configuration + +```toml +# Configuration for CrateDB to send metrics to. +[[outputs.cratedb]] + # A lib/pq connection string. + # See http://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters + url = "postgres://user:password@localhost/schema?sslmode=disable" + # Timeout for all CrateDB queries. + timeout = "5s" + # Name of the table to store metrics in. + table = "metrics" + # If true, and the metrics table does not exist, create it automatically. + table_create = true +``` From 3e146c6bd94cdc7ff689c9e08c5534bc5efb2b29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 19:21:43 +0200 Subject: [PATCH 12/39] use testutil.GetLocalHost() --- plugins/outputs/cratedb/cratedb_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index dcb16d3cb2da7..3a16f2b0031f2 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -132,8 +132,7 @@ func Test_escapeValue(t *testing.T) { func testURL() string { url := os.Getenv("CRATE_URL") if url == "" { - // @TODO use telegraf helper func for hostname - return "postgres://localhost:6543/test?sslmode=disable" + return "postgres://" + testutil.GetLocalHost() + ":6543/test?sslmode=disable" } return url } From ab1a638a3664ff7f986f4caeff9429bf103e423a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 19:22:21 +0200 Subject: [PATCH 13/39] docs: wording --- plugins/outputs/cratedb/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/outputs/cratedb/README.md b/plugins/outputs/cratedb/README.md index cdc355590b9d6..5756fcd227f61 100644 --- a/plugins/outputs/cratedb/README.md +++ b/plugins/outputs/cratedb/README.md @@ -18,8 +18,8 @@ CREATE TABLE my_metrics ( ); ``` -The plugin can also create this table for you automatically via the -`table_create` config option, see below. +The plugin can create this table for you automatically via the `table_create` +config option, see below. ## Configuration From 83d09810d710d1c41821370ced544521016c3fd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 22 Aug 2017 19:31:46 +0200 Subject: [PATCH 14/39] add link from main README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 178d2a4d3bacf..28b14e70c3a04 100644 --- a/README.md +++ b/README.md @@ -262,6 +262,7 @@ formats may be used with input plugins supporting the `data_format` option: * [amqp](./plugins/outputs/amqp) (rabbitmq) * [aws kinesis](./plugins/outputs/kinesis) * [aws cloudwatch](./plugins/outputs/cloudwatch) +* [cratedb](./plugins/outputs/cratedb) * [datadog](./plugins/outputs/datadog) * [discard](./plugins/outputs/discard) * [elasticsearch](./plugins/outputs/elasticsearch) From 5ccca8bb7a481c28e4a679ee3a339dd3ecec4378 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Sat, 2 Sep 2017 19:21:21 +0200 Subject: [PATCH 15/39] hash_id INDEX OFF and remove NOT NULL --- plugins/outputs/cratedb/README.md | 4 ++-- plugins/outputs/cratedb/cratedb.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/cratedb/README.md b/plugins/outputs/cratedb/README.md index 5756fcd227f61..ed35f3df115e1 100644 --- a/plugins/outputs/cratedb/README.md +++ b/plugins/outputs/cratedb/README.md @@ -9,8 +9,8 @@ The plugin requires a a table with the following schema. ```sql CREATE TABLE my_metrics ( - "hash_id" LONG, - "timestamp" TIMESTAMP NOT NULL, + "hash_id" LONG INDEX OFF, + "timestamp" TIMESTAMP, "name" STRING, "tags" OBJECT(DYNAMIC), "fields" OBJECT(DYNAMIC), diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 552587bd92201..85cc6d55555d3 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -41,8 +41,8 @@ func (c *CrateDB) Connect() error { } else if c.TableCreate { sql := ` CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( - "hash_id" LONG, - "timestamp" TIMESTAMP NOT NULL, + "hash_id" LONG INDEX OFF, + "timestamp" TIMESTAMP, "name" STRING, "tags" OBJECT(DYNAMIC), "fields" OBJECT(DYNAMIC), From 6dd8f59ee8b6ee0bdb4d77ebf6bdac0d32bd41ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 8 Sep 2017 15:53:03 +0200 Subject: [PATCH 16/39] Godeps: add lib/pq --- Godeps | 1 + 1 file changed, 1 insertion(+) diff --git a/Godeps b/Godeps index 48f9138e8a874..69f0ebf8c131d 100644 --- a/Godeps +++ b/Godeps @@ -37,6 +37,7 @@ github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 +github.com/lib/pq 8837942c3e09574accbc5f150e2c5e057189cace github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c github.com/Microsoft/go-winio ce2922f643c8fd76b46cadc7f404a06282678b34 github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 From 11b0e3940573ada3af9d8dd438513f4f2e2e50a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 8 Sep 2017 16:08:27 +0200 Subject: [PATCH 17/39] govet: call cancel() func Not really needed here, but I want circleci to be happy :) https://circleci.com/gh/influxdata/telegraf/8149?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link --- plugins/outputs/cratedb/cratedb.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 85cc6d55555d3..9651f720ac2ba 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -49,7 +49,8 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( PRIMARY KEY ("timestamp", "hash_id") ); ` - ctx, _ := context.WithTimeout(context.Background(), c.Timeout.Duration) + ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) + defer cancel() if _, err := db.ExecContext(ctx, sql); err != nil { return err } @@ -59,7 +60,8 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( } func (c *CrateDB) Write(metrics []telegraf.Metric) error { - ctx, _ := context.WithTimeout(context.Background(), c.Timeout.Duration) + ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) + defer cancel() if sql, err := insertSQL(c.Table, metrics); err != nil { return err } else if _, err := c.DB.ExecContext(ctx, sql); err != nil { From dc71427d4d159fef66c512f9c76e4e314ee53d3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 8 Sep 2017 16:31:31 +0200 Subject: [PATCH 18/39] make: add cratedb for docker tests See https://circleci.com/gh/influxdata/telegraf/8150?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link --- Makefile | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index bf4c8dfec0bce..81b172324e9af 100644 --- a/Makefile +++ b/Makefile @@ -106,12 +106,13 @@ docker-run-circle: -e SLAPD_CONFIG_ROOTPW="secret" \ -p "389:389" -p "636:636" \ -d cobaugh/openldap-alpine + docker run --name cratedb -p "6543:5432" -d crate crate -Cnetwork.host=0.0.0.0 -Ctransport.host=localhost docker-kill: -docker kill aerospike elasticsearch kafka memcached mqtt mysql nats nsq \ - openldap postgres rabbitmq redis riemann zookeeper + openldap postgres rabbitmq redis riemann zookeeper cratedb -docker rm aerospike elasticsearch kafka memcached mqtt mysql nats nsq \ - openldap postgres rabbitmq redis riemann zookeeper + openldap postgres rabbitmq redis riemann zookeeper cratedb .PHONY: deps telegraf telegraf.exe install test test-windows lint test-all \ package clean docker-run docker-run-circle docker-kill From 71516e9e70989949e8e84bc236ba57b51cdcd496 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 8 Sep 2017 16:57:35 +0200 Subject: [PATCH 19/39] make: disable crate enterprise edition Allows connecting without setting up auth stuff. Probably also a better idea for this in general. --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 81b172324e9af..08bb90e6bbd30 100644 --- a/Makefile +++ b/Makefile @@ -106,7 +106,7 @@ docker-run-circle: -e SLAPD_CONFIG_ROOTPW="secret" \ -p "389:389" -p "636:636" \ -d cobaugh/openldap-alpine - docker run --name cratedb -p "6543:5432" -d crate crate -Cnetwork.host=0.0.0.0 -Ctransport.host=localhost + docker run --name cratedb -p "6543:5432" -d crate crate -Cnetwork.host=0.0.0.0 -Ctransport.host=localhost -Clicense.enterprise=false docker-kill: -docker kill aerospike elasticsearch kafka memcached mqtt mysql nats nsq \ From 86f700214869ea7705372e4a785232856a4782cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 8 Sep 2017 17:01:44 +0200 Subject: [PATCH 20/39] remove debugging statement --- plugins/outputs/cratedb/cratedb.go | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 9651f720ac2ba..4b82580876f9e 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -65,7 +65,6 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error { if sql, err := insertSQL(c.Table, metrics); err != nil { return err } else if _, err := c.DB.ExecContext(ctx, sql); err != nil { - fmt.Printf("%s\n", sql) return err } return nil From 6b4f4f8de021fd4394af42307bd80b8bd27b612a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 8 Sep 2017 17:19:16 +0200 Subject: [PATCH 21/39] fix: test needs fixed time.Location Otherwise it passes on my machine, but not on circle-ci. See https://circleci.com/gh/influxdata/telegraf/8152?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link --- plugins/outputs/cratedb/cratedb.go | 6 +++--- plugins/outputs/cratedb/cratedb_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 4b82580876f9e..90eca7d82a87d 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -62,7 +62,7 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( func (c *CrateDB) Write(metrics []telegraf.Metric) error { ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) defer cancel() - if sql, err := insertSQL(c.Table, metrics); err != nil { + if sql, err := insertSQL(c.Table, metrics, time.Local); err != nil { return err } else if _, err := c.DB.ExecContext(ctx, sql); err != nil { return err @@ -70,7 +70,7 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error { return nil } -func insertSQL(table string, metrics []telegraf.Metric) (string, error) { +func insertSQL(table string, metrics []telegraf.Metric, loc *time.Location) (string, error) { rows := make([]string, len(metrics)) for i, m := range metrics { // Note: We have to convert HashID from uint64 to int64 below because @@ -83,7 +83,7 @@ func insertSQL(table string, metrics []telegraf.Metric) (string, error) { cols := []interface{}{ int64(m.HashID()), - m.Time(), + m.Time().In(loc), m.Name(), m.Tags(), m.Fields(), diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index 3a16f2b0031f2..902ef2cc3e130 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -75,13 +75,13 @@ func Test_insertSQL(t *testing.T) { Want: strings.TrimSpace(` INSERT INTO my_table ("hash_id", "timestamp", "name", "tags", "fields") VALUES -(1845393540509842047, '2009-11-11T00:00:00+0100', 'test1', {"tag1" = 'value1'}, {"value" = 1}); +(1845393540509842047, '2009-11-10T23:00:00+0000', 'test1', {"tag1" = 'value1'}, {"value" = 1}); `), }, } for _, test := range tests { - if got, err := insertSQL("my_table", test.Metrics); err != nil { + if got, err := insertSQL("my_table", test.Metrics, time.UTC); err != nil { t.Error(err) } else if got != test.Want { t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) From 90c909d3021995c9dbb22b76a2f2f628332b052c Mon Sep 17 00:00:00 2001 From: Thomas Vogel Date: Thu, 14 Sep 2017 14:31:30 +0200 Subject: [PATCH 22/39] add partitioning by day for crate.io output plugin --- plugins/outputs/cratedb/README.md | 4 ++-- plugins/outputs/cratedb/cratedb.go | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/cratedb/README.md b/plugins/outputs/cratedb/README.md index ed35f3df115e1..b41d83951969e 100644 --- a/plugins/outputs/cratedb/README.md +++ b/plugins/outputs/cratedb/README.md @@ -14,8 +14,8 @@ CREATE TABLE my_metrics ( "name" STRING, "tags" OBJECT(DYNAMIC), "fields" OBJECT(DYNAMIC), - PRIMARY KEY ("timestamp", "hash_id") -); + PRIMARY KEY ("timestamp", "hash_id","day") +)PARTITIONED BY("day"); ``` The plugin can create this table for you automatically via the `table_create` diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 90eca7d82a87d..ddfe4a6b112ae 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -46,8 +46,9 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( "name" STRING, "tags" OBJECT(DYNAMIC), "fields" OBJECT(DYNAMIC), - PRIMARY KEY ("timestamp", "hash_id") -); + "day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"), + PRIMARY KEY ("timestamp", "hash_id","day") +)PARTITIONED BY("day"); ` ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) defer cancel() From a2a651b263a4b694b555fa7e0ab5fda75666125f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Wed, 27 Sep 2017 17:22:36 +0200 Subject: [PATCH 23/39] fix local dev/test setup --- plugins/outputs/cratedb/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/outputs/cratedb/docker-compose.yaml b/plugins/outputs/cratedb/docker-compose.yaml index 19462e3e525de..610372be70e84 100644 --- a/plugins/outputs/cratedb/docker-compose.yaml +++ b/plugins/outputs/cratedb/docker-compose.yaml @@ -2,6 +2,6 @@ version: "3" services: cratedb: image: crate - command: crate -Cnetwork.host=0.0.0.0 -Ctransport.host=localhost + command: crate -Cnetwork.host=0.0.0.0 -Ctransport.host=localhost -Clicense.enterprise=false ports: - "6543:5432" From 7fd030829aa701236fc40587e42af95b1adcc748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Wed, 27 Sep 2017 17:22:40 +0200 Subject: [PATCH 24/39] Whitespace --- plugins/outputs/cratedb/README.md | 4 ++-- plugins/outputs/cratedb/cratedb.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/cratedb/README.md b/plugins/outputs/cratedb/README.md index b41d83951969e..3fde4cb327207 100644 --- a/plugins/outputs/cratedb/README.md +++ b/plugins/outputs/cratedb/README.md @@ -14,8 +14,8 @@ CREATE TABLE my_metrics ( "name" STRING, "tags" OBJECT(DYNAMIC), "fields" OBJECT(DYNAMIC), - PRIMARY KEY ("timestamp", "hash_id","day") -)PARTITIONED BY("day"); + PRIMARY KEY ("timestamp", "hash_id","day") +) PARTITIONED BY("day"); ``` The plugin can create this table for you automatically via the `table_create` diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index ddfe4a6b112ae..f96a427f650d9 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -46,9 +46,9 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( "name" STRING, "tags" OBJECT(DYNAMIC), "fields" OBJECT(DYNAMIC), - "day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"), + "day" TIMESTAMP GENERATED ALWAYS AS date_trunc('day', "timestamp"), PRIMARY KEY ("timestamp", "hash_id","day") -)PARTITIONED BY("day"); +) PARTITIONED BY("day"); ` ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) defer cancel() From a5598675484c5b1d6e33acbbafc39726aef97fbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Mon, 16 Oct 2017 13:40:48 +0200 Subject: [PATCH 25/39] use jackc/pgx --- plugins/outputs/cratedb/cratedb.go | 4 ++-- plugins/outputs/cratedb/cratedb_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index f96a427f650d9..f6ebce239b3ab 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" - _ "github.com/lib/pq" + _ "github.com/jackc/pgx/stdlib" ) type CrateDB struct { @@ -35,7 +35,7 @@ var sampleConfig = ` ` func (c *CrateDB) Connect() error { - db, err := sql.Open("postgres", c.URL) + db, err := sql.Open("pgx", c.URL) if err != nil { return err } else if c.TableCreate { diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index 902ef2cc3e130..4849778b2d257 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -24,7 +24,7 @@ func TestConnectAndWrite(t *testing.T) { // dropSQL drops our table before each test. This simplifies changing the // schema during development :). dropSQL := "DROP TABLE IF EXISTS " + escapeString(table, `"`) - db, err := sql.Open("postgres", url) + db, err := sql.Open("pgx", url) require.NoError(t, err) _, err = db.Exec(dropSQL) require.NoError(t, err) From 1c6f0e7e465e71bde220d0e13231b3ecb549a20a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Mon, 16 Oct 2017 13:44:27 +0200 Subject: [PATCH 26/39] Makefile: cleanup cratedb integration - Add commands for regular integration tests - Don't go over 80 chars per line --- Makefile | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 0b09a332a4f21..54f84d92c04b7 100644 --- a/Makefile +++ b/Makefile @@ -86,6 +86,12 @@ docker-run: -e SLAPD_CONFIG_ROOTPW="secret" \ -p "389:389" -p "636:636" \ -d cobaugh/openldap-alpine + docker run \--name cratedb \ + -p "6543:5432" \ + -d crate crate \ + -Cnetwork.host=0.0.0.0 \ + -Ctransport.host=localhost \ + -Clicense.enterprise=false # Run docker containers necessary for integration tests; skipping services provided # by CircleCI @@ -110,7 +116,12 @@ docker-run-circle: -e SLAPD_CONFIG_ROOTPW="secret" \ -p "389:389" -p "636:636" \ -d cobaugh/openldap-alpine - docker run --name cratedb -p "6543:5432" -d crate crate -Cnetwork.host=0.0.0.0 -Ctransport.host=localhost -Clicense.enterprise=false + docker run \--name cratedb \ + -p "6543:5432" \ + -d crate crate \ + -Cnetwork.host=0.0.0.0 \ + -Ctransport.host=localhost \ + -Clicense.enterprise=false docker-kill: -docker kill aerospike elasticsearch kafka memcached mqtt mysql nats nsq \ From 7d8ed8b7dee8528fbb04179e92171eeeb0649b7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Mon, 16 Oct 2017 13:47:03 +0200 Subject: [PATCH 27/39] Revert "Godeps: add lib/pq" This reverts commit 6dd8f59ee8b6ee0bdb4d77ebf6bdac0d32bd41ec. --- Godeps | 1 - 1 file changed, 1 deletion(-) diff --git a/Godeps b/Godeps index 3496ede6d97fe..0802675ba7b38 100644 --- a/Godeps +++ b/Godeps @@ -37,7 +37,6 @@ github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142 -github.com/lib/pq 8837942c3e09574accbc5f150e2c5e057189cace github.com/matttproud/golang_protobuf_extensions c12348ce28de40eed0136aa2b644d0ee0650e56c github.com/Microsoft/go-winio ce2922f643c8fd76b46cadc7f404a06282678b34 github.com/miekg/dns 99f84ae56e75126dd77e5de4fae2ea034a468ca1 From 0615712ae7b82cf7d5cc2a4e67191a169ecda4a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Mon, 16 Oct 2017 13:48:30 +0200 Subject: [PATCH 28/39] Godeps: upgrade jackc/pgx See: - https://github.com/influxdata/telegraf/pull/3210#issuecomment-329616630 - https://github.com/jackc/pgx/issues/320 - https://github.com/jackc/pgx/pull/336 --- Godeps | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Godeps b/Godeps index 0802675ba7b38..88e4ce5f720b3 100644 --- a/Godeps +++ b/Godeps @@ -32,7 +32,7 @@ github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f github.com/influxdata/tail a395bf99fe07c233f41fba0735fa2b13b58588ea github.com/influxdata/toml 5d1d907f22ead1cd47adde17ceec5bda9cacaf8f github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec -github.com/jackc/pgx b84338d7d62598f75859b2b146d830b22f1b9ec8 +github.com/jackc/pgx 63f58fd32edb5684b9e9f4cfaac847c6b42b3917 github.com/jmespath/go-jmespath bd40a432e4c76585ef6b72d3fd96fb9b6dc7b68d github.com/kardianos/osext c2c54e542fb797ad986b31721e1baedf214ca413 github.com/kardianos/service 6d3a0ee7d3425d9d835debc51a0ca1ffa28f4893 From 34e53d93fff4ea0745ec51046fc53142c6251c90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Mon, 23 Oct 2017 16:14:47 +0200 Subject: [PATCH 29/39] fix postgres test after upgrading pgx pgx has made some changes when it comes to scanning row values into interface{} values. Those required some small adjustments to the types expected by the tests. see https://github.com/jackc/pgx/issues/343 --- plugins/inputs/postgresql/postgresql_test.go | 7 +++---- .../postgresql_extensible_test.go | 14 ++++++-------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/plugins/inputs/postgresql/postgresql_test.go b/plugins/inputs/postgresql/postgresql_test.go index 03c09936dbf57..410b9b4214c8d 100644 --- a/plugins/inputs/postgresql/postgresql_test.go +++ b/plugins/inputs/postgresql/postgresql_test.go @@ -51,12 +51,12 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "checkpoints_req", "checkpoints_timed", "maxwritten_clean", - } - - int32Metrics := []string{ + "datid", "numbackends", } + int32Metrics := []string{} + floatMetrics := []string{ "blk_read_time", "blk_write_time", @@ -66,7 +66,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { stringMetrics := []string{ "datname", - "datid", } metricsCounted := 0 diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go index 466cdfd98c9c1..4545a247800d3 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -53,12 +53,12 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "temp_files", "temp_bytes", "deadlocks", - } - - int32Metrics := []string{ "numbackends", + "datid", } + int32Metrics := []string{} + floatMetrics := []string{ "blk_read_time", "blk_write_time", @@ -66,7 +66,6 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { stringMetrics := []string{ "datname", - "datid", } metricsCounted := 0 @@ -175,12 +174,12 @@ func TestPostgresqlFieldOutput(t *testing.T) { "temp_files", "temp_bytes", "deadlocks", - } - - int32Metrics := []string{ "numbackends", + "datid", } + int32Metrics := []string{} + floatMetrics := []string{ "blk_read_time", "blk_write_time", @@ -188,7 +187,6 @@ func TestPostgresqlFieldOutput(t *testing.T) { stringMetrics := []string{ "datname", - "datid", } for _, field := range intMetrics { From 3b22f2eefaf9553dbc653fce135893f37d213707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 2 Nov 2017 19:44:57 +0100 Subject: [PATCH 30/39] make: remove backslash typo --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 54f84d92c04b7..d331f7681a088 100644 --- a/Makefile +++ b/Makefile @@ -86,7 +86,7 @@ docker-run: -e SLAPD_CONFIG_ROOTPW="secret" \ -p "389:389" -p "636:636" \ -d cobaugh/openldap-alpine - docker run \--name cratedb \ + docker run --name cratedb \ -p "6543:5432" \ -d crate crate \ -Cnetwork.host=0.0.0.0 \ @@ -116,7 +116,7 @@ docker-run-circle: -e SLAPD_CONFIG_ROOTPW="secret" \ -p "389:389" -p "636:636" \ -d cobaugh/openldap-alpine - docker run \--name cratedb \ + docker run --name cratedb \ -p "6543:5432" \ -d crate crate \ -Cnetwork.host=0.0.0.0 \ From 6c2df9e9c765cea1a8631e12dcd828b40f58b467 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 2 Nov 2017 19:45:31 +0100 Subject: [PATCH 31/39] README: remove typo --- plugins/outputs/cratedb/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/outputs/cratedb/README.md b/plugins/outputs/cratedb/README.md index 3fde4cb327207..b2187947480e4 100644 --- a/plugins/outputs/cratedb/README.md +++ b/plugins/outputs/cratedb/README.md @@ -4,7 +4,7 @@ This plugin writes to [CrateDB](https://crate.io/) via its [PostgreSQL protocol] ## Table Schema -The plugin requires a a table with the following schema. +The plugin requires a table with the following schema. ```sql From a54054f906f0835731e6757b277e2e54852fe788 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 2 Nov 2017 19:48:52 +0100 Subject: [PATCH 32/39] remove docker-compose.yaml I just used this for local testing. --- plugins/outputs/cratedb/docker-compose.yaml | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 plugins/outputs/cratedb/docker-compose.yaml diff --git a/plugins/outputs/cratedb/docker-compose.yaml b/plugins/outputs/cratedb/docker-compose.yaml deleted file mode 100644 index 610372be70e84..0000000000000 --- a/plugins/outputs/cratedb/docker-compose.yaml +++ /dev/null @@ -1,7 +0,0 @@ -version: "3" -services: - cratedb: - image: crate - command: crate -Cnetwork.host=0.0.0.0 -Ctransport.host=localhost -Clicense.enterprise=false - ports: - - "6543:5432" From e2460329bb4820af9e9d009e5e4495fb392316b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 2 Nov 2017 19:52:59 +0100 Subject: [PATCH 33/39] Send timestamps in UTC instead of local time CrateDB converts to UTC anyway, and this simplifies the code a bit. --- plugins/outputs/cratedb/cratedb.go | 6 +++--- plugins/outputs/cratedb/cratedb_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index f6ebce239b3ab..186ca01933cce 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -63,7 +63,7 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( func (c *CrateDB) Write(metrics []telegraf.Metric) error { ctx, cancel := context.WithTimeout(context.Background(), c.Timeout.Duration) defer cancel() - if sql, err := insertSQL(c.Table, metrics, time.Local); err != nil { + if sql, err := insertSQL(c.Table, metrics); err != nil { return err } else if _, err := c.DB.ExecContext(ctx, sql); err != nil { return err @@ -71,7 +71,7 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error { return nil } -func insertSQL(table string, metrics []telegraf.Metric, loc *time.Location) (string, error) { +func insertSQL(table string, metrics []telegraf.Metric) (string, error) { rows := make([]string, len(metrics)) for i, m := range metrics { // Note: We have to convert HashID from uint64 to int64 below because @@ -84,7 +84,7 @@ func insertSQL(table string, metrics []telegraf.Metric, loc *time.Location) (str cols := []interface{}{ int64(m.HashID()), - m.Time().In(loc), + m.Time().UTC(), m.Name(), m.Tags(), m.Fields(), diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index 4849778b2d257..b53c426e47428 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -81,7 +81,7 @@ VALUES } for _, test := range tests { - if got, err := insertSQL("my_table", test.Metrics, time.UTC); err != nil { + if got, err := insertSQL("my_table", test.Metrics); err != nil { t.Error(err) } else if got != test.Want { t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) From c43432203e0fe64d2b274c43d02c58d0bfe248c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 2 Nov 2017 19:57:22 +0100 Subject: [PATCH 34/39] escaping add additional test case --- plugins/outputs/cratedb/cratedb_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index b53c426e47428..1657b52d89bbc 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -118,6 +118,7 @@ func Test_escapeValue(t *testing.T) { {map[string]interface{}{"foo": "bar"}, `{"foo" = 'bar'}`}, {map[string]interface{}{"foo": "bar", "one": "more"}, `{"foo" = 'bar', "one" = 'more'}`}, {map[string]interface{}{"foo": map[string]interface{}{"one": "more"}}, `{"foo" = {"one" = 'more'}}`}, + {map[string]interface{}{`fo"o`: `b'ar`, `ab'c`: `xy"z`, `on"""e`: `mo'''re`}, `{"ab'c" = 'xy"z', "fo""o" = 'b''ar', "on""""""e" = 'mo''''''re'}`}, } for _, test := range tests { From 8b85f0bbbe7eb4cfbcc26e22d94a639eb7b15a8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 2 Nov 2017 20:12:37 +0100 Subject: [PATCH 35/39] escaping: add smoke test --- plugins/outputs/cratedb/cratedb_test.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index 1657b52d89bbc..5ca918e5fab55 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -121,12 +121,24 @@ func Test_escapeValue(t *testing.T) { {map[string]interface{}{`fo"o`: `b'ar`, `ab'c`: `xy"z`, `on"""e`: `mo'''re`}, `{"ab'c" = 'xy"z', "fo""o" = 'b''ar', "on""""""e" = 'mo''''''re'}`}, } + url := testURL() + db, err := sql.Open("pgx", url) + require.NoError(t, err) + defer db.Close() + for _, test := range tests { - if got, err := escapeValue(test.Val); err != nil { + got, err := escapeValue(test.Val) + if err != nil { t.Errorf("val: %#v: %s", test.Val, err) } else if got != test.Want { t.Errorf("got:\n%s\n\nwant:\n%s", got, test.Want) } + + // This is a smoke test that will blow up if our escaping causing a SQL + // syntax error, which may allow for an attack. + var reply interface{} + row := db.QueryRow("SELECT " + got) + require.NoError(t, row.Scan(&reply)) } } From 626f6532dbd2b5f7395e49d69d2ed8b9ee02b30d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 2 Nov 2017 20:42:42 +0100 Subject: [PATCH 36/39] Implement new hashID func See https://github.com/influxdata/telegraf/pull/3210#discussion_r148411201 --- plugins/outputs/cratedb/cratedb.go | 42 ++++++++++++--- plugins/outputs/cratedb/cratedb_test.go | 72 +++++++++++++++++++++++-- 2 files changed, 102 insertions(+), 12 deletions(-) diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 186ca01933cce..785da2a44014b 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -2,7 +2,9 @@ package cratedb import ( "context" + "crypto/sha512" "database/sql" + "encoding/binary" "fmt" "sort" "strings" @@ -74,16 +76,9 @@ func (c *CrateDB) Write(metrics []telegraf.Metric) error { func insertSQL(table string, metrics []telegraf.Metric) (string, error) { rows := make([]string, len(metrics)) for i, m := range metrics { - // Note: We have to convert HashID from uint64 to int64 below because - // CrateDB only supports a signed 64 bit LONG type which would give us - // problems, e.g.: - // - // CREATE TABLE my_long (val LONG); - // INSERT INTO my_long(val) VALUES (14305102049502225714); - // -> ERROR: SQLParseException: For input string: "14305102049502225714" cols := []interface{}{ - int64(m.HashID()), + hashID(m), m.Time().UTC(), m.Name(), m.Tags(), @@ -181,6 +176,37 @@ func escapeString(s string, quote string) string { return quote + strings.Replace(s, quote, quote+quote, -1) + quote } +// hashID returns a cryptographic hash int64 hash that includes the metric name +// and tags. It's used instead of m.HashID() because it's not considered stable +// and because a cryptogtaphic hash makes more sense for the use case of +// deduplication. +// [1] https://github.com/influxdata/telegraf/pull/3210#discussion_r148411201 +func hashID(m telegraf.Metric) int64 { + h := sha512.New() + h.Write([]byte(m.Name())) + tags := m.Tags() + tmp := make([]string, len(tags)) + i := 0 + for k, v := range tags { + tmp[i] = k + v + i++ + } + sort.Strings(tmp) + + for _, s := range tmp { + h.Write([]byte(s)) + } + sum := h.Sum(nil) + + // Note: We have to convert from uint64 to int64 below because CrateDB only + // supports a signed 64 bit LONG type: + // + // CREATE TABLE my_long (val LONG); + // INSERT INTO my_long(val) VALUES (14305102049502225714); + // -> ERROR: SQLParseException: For input string: "14305102049502225714" + return int64(binary.LittleEndian.Uint64(sum)) +} + func (c *CrateDB) SampleConfig() string { return sampleConfig } diff --git a/plugins/outputs/cratedb/cratedb_test.go b/plugins/outputs/cratedb/cratedb_test.go index 5ca918e5fab55..5900988349b27 100644 --- a/plugins/outputs/cratedb/cratedb_test.go +++ b/plugins/outputs/cratedb/cratedb_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -45,7 +46,7 @@ func TestConnectAndWrite(t *testing.T) { // the rows using their primary keys in order to take advantage of // read-after-write consistency in CrateDB. for _, m := range metrics { - hashID, err := escapeValue(int64(m.HashID())) + hashIDVal, err := escapeValue(hashID(m)) require.NoError(t, err) timestamp, err := escapeValue(m.Time()) require.NoError(t, err) @@ -53,13 +54,13 @@ func TestConnectAndWrite(t *testing.T) { var id int64 row := db.QueryRow( "SELECT hash_id FROM " + escapeString(table, `"`) + " " + - "WHERE hash_id = " + hashID + " " + + "WHERE hash_id = " + hashIDVal + " " + "AND timestamp = " + timestamp, ) require.NoError(t, row.Scan(&id)) // We could check the whole row, but this is meant to be more of a smoke // test, so just checking the HashID seems fine. - require.Equal(t, id, int64(m.HashID())) + require.Equal(t, id, hashID(m)) } require.NoError(t, c.Close()) @@ -75,7 +76,7 @@ func Test_insertSQL(t *testing.T) { Want: strings.TrimSpace(` INSERT INTO my_table ("hash_id", "timestamp", "name", "tags", "fields") VALUES -(1845393540509842047, '2009-11-10T23:00:00+0000', 'test1', {"tag1" = 'value1'}, {"value" = 1}); +(-4023501406646044814, '2009-11-10T23:00:00+0000', 'test1', {"tag1" = 'value1'}, {"value" = 1}); `), }, } @@ -142,6 +143,69 @@ func Test_escapeValue(t *testing.T) { } } +func Test_hashID(t *testing.T) { + tests := []struct { + Name string + Tags map[string]string + Fields map[string]interface{} + Want int64 + }{ + { + Name: "metric1", + Tags: map[string]string{"tag1": "val1", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 8973971082006474188, + }, + + // This metric has a different tag order (in a perhaps non-ideal attempt to + // trigger different pseudo-random map iteration)) and fields (none) + // compared to the previous metric, but should still get the same hash. + { + Name: "metric1", + Tags: map[string]string{"tag2": "val2", "tag1": "val1"}, + Fields: map[string]interface{}{"field3": "val3"}, + Want: 8973971082006474188, + }, + + // Different metric name -> different hash + { + Name: "metric2", + Tags: map[string]string{"tag1": "val1", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 306487682448261783, + }, + + // Different tag val -> different hash + { + Name: "metric1", + Tags: map[string]string{"tag1": "new-val", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 1938713695181062970, + }, + + // Different tag key -> different hash + { + Name: "metric1", + Tags: map[string]string{"new-key": "val1", "tag2": "val2"}, + Fields: map[string]interface{}{"field1": "val1", "field2": "val2"}, + Want: 7678889081527706328, + }, + } + + for i, test := range tests { + m, err := metric.New( + test.Name, + test.Tags, + test.Fields, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + require.NoError(t, err) + if got := hashID(m); got != test.Want { + t.Errorf("test #%d: got=%d want=%d", i, got, test.Want) + } + } +} + func testURL() string { url := os.Getenv("CRATE_URL") if url == "" { From 221165d3a4b3f79b2cfc2c4ed907dd310f957753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 2 Nov 2017 20:49:38 +0100 Subject: [PATCH 37/39] remove references to lib/pq --- etc/telegraf.conf | 4 ++-- plugins/outputs/cratedb/README.md | 4 ++-- plugins/outputs/cratedb/cratedb.go | 8 +++++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index f1e21c7f9f54f..f747aa59020c2 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -201,8 +201,8 @@ # # Configuration for CrateDB to send metrics to. # [[outputs.cratedb]] -# # A lib/pq connection string. -# # See http://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters +# # A github.com/jackc/pgx connection string. +# # See https://godoc.org/github.com/jackc/pgx#ParseDSN # url = "postgres://user:password@localhost/schema?sslmode=disable" # # Timeout for all CrateDB queries. # timeout = "5s" diff --git a/plugins/outputs/cratedb/README.md b/plugins/outputs/cratedb/README.md index b2187947480e4..a8a01fdfe99d7 100644 --- a/plugins/outputs/cratedb/README.md +++ b/plugins/outputs/cratedb/README.md @@ -26,8 +26,8 @@ config option, see below. ```toml # Configuration for CrateDB to send metrics to. [[outputs.cratedb]] - # A lib/pq connection string. - # See http://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters + # A github.com/jackc/pgx connection string. + # See https://godoc.org/github.com/jackc/pgx#ParseDSN url = "postgres://user:password@localhost/schema?sslmode=disable" # Timeout for all CrateDB queries. timeout = "5s" diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index 785da2a44014b..5a5987c772a7c 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -25,8 +25,8 @@ type CrateDB struct { } var sampleConfig = ` - # A lib/pq connection string. - # See http://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters + # A github.com/jackc/pgx connection string. + # See https://godoc.org/github.com/jackc/pgx#ParseDSN url = "postgres://user:password@localhost/schema?sslmode=disable" # Timeout for all CrateDB queries. timeout = "5s" @@ -106,9 +106,11 @@ VALUES // // Warning: This is not ideal from a security perspective, but unfortunately // CrateDB does not support enough of the PostgreSQL wire protocol to allow -// using lib/pq with $1, $2 placeholders. Security conscious users of this +// using pgx with $1, $2 placeholders [1]. Security conscious users of this // plugin should probably refrain from using it in combination with untrusted // inputs. +// +// [1] https://github.com/influxdata/telegraf/pull/3210#issuecomment-339273371 func escapeValue(val interface{}) (string, error) { switch t := val.(type) { case string: From 8379cb2890a7b9e79a7f539dfb34a2c93d6f14f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 9 Nov 2017 13:19:38 +0100 Subject: [PATCH 38/39] empty commit (trigger ci) From b0d6723cdaf4a2f77b0fb3964ed54dc1d161b246 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 9 Nov 2017 13:33:39 +0100 Subject: [PATCH 39/39] trigger ci The test below failed on the previous run, but it seems unrelated to this PR. --- FAIL: TestRunTimeout (0.10s) :1: Error Trace: internal_test.go:57 Error: Should be true FAIL FAIL github.com/influxdata/telegraf/internal 0.267s