Skip to content

Commit

Permalink
Merged in rrk/improve-test-coverage (pull request timescale#74)
Browse files Browse the repository at this point in the history
Improve test coverage for InfluxDB loader

Approved-by: Lee Hampton <leejhampton@gmail.com>
  • Loading branch information
RobAtticus committed Jul 27, 2018
2 parents f018743 + 4721413 commit ba19b8b
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 63 deletions.
74 changes: 24 additions & 50 deletions cmd/tsbs_generate_queries/databases/influx/devops.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,10 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t
selectClauses := d.getSelectClausesAggMetrics("max", metrics)
whereHosts := d.getHostWhereString(nHosts)

v := url.Values{}
v.Set("q", fmt.Sprintf("SELECT %s from cpu where %s and time >= '%s' and time < '%s' group by time(1m)", strings.Join(selectClauses, ", "), whereHosts, interval.StartString(), interval.EndString()))

humanLabel := fmt.Sprintf("Influx %d cpu metric(s), random %4d hosts, random %s by 1m", numMetrics, nHosts, timeRange)
q := qi.(*query.HTTP)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(fmt.Sprintf("%s: %s", humanLabel, interval.StartString()))
q.Method = []byte("GET")
q.Path = []byte(fmt.Sprintf("/query?%s", v.Encode()))
q.Body = nil
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
influxql := fmt.Sprintf("SELECT %s from cpu where %s and time >= '%s' and time < '%s' group by time(1m)", strings.Join(selectClauses, ", "), whereHosts, interval.StartString(), interval.EndString())
d.fillInQuery(qi, humanLabel, humanDesc, influxql)
}

// GroupByOrderByLimit benchmarks a query that has a time WHERE clause, that groups by a truncated date, orders by that date, and takes a limit:
Expand All @@ -83,19 +77,12 @@ func (d *Devops) GroupByTime(qi query.Query, nHosts, numMetrics int, timeRange t
// LIMIT $LIMIT
func (d *Devops) GroupByOrderByLimit(qi query.Query) {
interval := d.Interval.RandWindow(time.Hour)

where := fmt.Sprintf("WHERE time < '%s'", interval.EndString())

v := url.Values{}
v.Set("q", fmt.Sprintf(`SELECT max(usage_user) from cpu %s group by time(1m) limit 5`, where))

humanLabel := "Influx max cpu over last 5 min-intervals (random end)"
q := qi.(*query.HTTP)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(fmt.Sprintf("%s: %s", humanLabel, interval.StartString()))
q.Method = []byte("GET")
q.Path = []byte(fmt.Sprintf("/query?%s", v.Encode()))
q.Body = nil
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
influxql := fmt.Sprintf(`SELECT max(usage_user) from cpu %s group by time(1m) limit 5`, where)
d.fillInQuery(qi, humanLabel, humanDesc, influxql)
}

// GroupByTimeAndPrimaryTag selects the AVG of numMetrics metrics under 'cpu' per device per hour for a day,
Expand All @@ -110,16 +97,10 @@ func (d *Devops) GroupByTimeAndPrimaryTag(qi query.Query, numMetrics int) {
interval := d.Interval.RandWindow(devops.DoubleGroupByDuration)
selectClauses := d.getSelectClausesAggMetrics("mean", metrics)

v := url.Values{}
v.Set("q", fmt.Sprintf("SELECT %s from cpu where time >= '%s' and time < '%s' group by time(1h),hostname", strings.Join(selectClauses, ", "), interval.StartString(), interval.EndString()))

humanLabel := devops.GetDoubleGroupByLabel("Influx", numMetrics)
q := qi.(*query.HTTP)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(fmt.Sprintf("%s: %s", humanLabel, interval.StartString()))
q.Method = []byte("GET")
q.Path = []byte(fmt.Sprintf("/query?%s", v.Encode()))
q.Body = nil
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
influxql := fmt.Sprintf("SELECT %s from cpu where time >= '%s' and time < '%s' group by time(1h),hostname", strings.Join(selectClauses, ", "), interval.StartString(), interval.EndString())
d.fillInQuery(qi, humanLabel, humanDesc, influxql)
}

// MaxAllCPU selects the MAX of all metrics under 'cpu' per hour for nhosts hosts,
Expand All @@ -134,30 +115,18 @@ func (d *Devops) MaxAllCPU(qi query.Query, nHosts int) {
whereHosts := d.getHostWhereString(nHosts)
selectClauses := d.getSelectClausesAggMetrics("max", devops.GetAllCPUMetrics())

v := url.Values{}
v.Set("q", fmt.Sprintf("SELECT %s from cpu where %s and time >= '%s' and time < '%s' group by time(1m)", strings.Join(selectClauses, ","), whereHosts, interval.StartString(), interval.EndString()))

humanLabel := devops.GetMaxAllLabel("Influx", nHosts)
q := qi.(*query.HTTP)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(fmt.Sprintf("%s: %s", humanLabel, interval.StartString()))
q.Method = []byte("GET")
q.Path = []byte(fmt.Sprintf("/query?%s", v.Encode()))
q.Body = nil
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval.StartString())
influxql := fmt.Sprintf("SELECT %s from cpu where %s and time >= '%s' and time < '%s' group by time(1m)", strings.Join(selectClauses, ","), whereHosts, interval.StartString(), interval.EndString())
d.fillInQuery(qi, humanLabel, humanDesc, influxql)
}

// LastPointPerHost finds the last row for every host in the dataset
func (d *Devops) LastPointPerHost(qi query.Query) {
v := url.Values{}
v.Set("q", "SELECT * from cpu group by \"hostname\" order by time desc limit 1")

humanLabel := "Influx last row per host"
q := qi.(*query.HTTP)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(fmt.Sprintf("%s: cpu", humanLabel))
q.Method = []byte("GET")
q.Path = []byte(fmt.Sprintf("/query?%s", v.Encode()))
q.Body = nil
humanDesc := humanLabel + ": cpu"
influxql := "SELECT * from cpu group by \"hostname\" order by time desc limit 1"
d.fillInQuery(qi, humanLabel, humanDesc, influxql)
}

// HighCPUForHosts populates a query that gets CPU metrics when the CPU has high
Expand All @@ -177,13 +146,18 @@ func (d *Devops) HighCPUForHosts(qi query.Query, nHosts int) {
hostWhereClause = fmt.Sprintf("and %s", d.getHostWhereString(nHosts))
}

v := url.Values{}
v.Set("q", fmt.Sprintf("SELECT * from cpu where usage_user > 90.0 %s and time >= '%s' and time < '%s'", hostWhereClause, interval.StartString(), interval.EndString()))

humanLabel := devops.GetHighCPULabel("Influx", nHosts)
humanDesc := fmt.Sprintf("%s: %s", humanLabel, interval)
influxql := fmt.Sprintf("SELECT * from cpu where usage_user > 90.0 %s and time >= '%s' and time < '%s'", hostWhereClause, interval.StartString(), interval.EndString())
d.fillInQuery(qi, humanLabel, humanDesc, influxql)
}

func (d *Devops) fillInQuery(qi query.Query, humanLabel, humanDesc, influxql string) {
v := url.Values{}
v.Set("q", influxql)
q := qi.(*query.HTTP)
q.HumanLabel = []byte(humanLabel)
q.HumanDescription = []byte(fmt.Sprintf("%s: %s", humanLabel, interval))
q.HumanDescription = []byte(humanDesc)
q.Method = []byte("GET")
q.Path = []byte(fmt.Sprintf("/query?%s", v.Encode()))
q.Body = nil
Expand Down
116 changes: 116 additions & 0 deletions cmd/tsbs_generate_queries/databases/influx/devops_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package influx

import (
"net/url"
"strings"
"testing"
"time"

"bitbucket.org/440-labs/tsbs/query"
)

func TestDevopsGetHostWhereWithHostnames(t *testing.T) {
cases := []struct {
desc string
hostnames []string
want string
}{
{
desc: "single host",
hostnames: []string{"foo1"},
want: "(hostname = 'foo1')",
},
{
desc: "multi host (2)",
hostnames: []string{"foo1", "foo2"},
want: "(hostname = 'foo1' or hostname = 'foo2')",
},
{
desc: "multi host (3)",
hostnames: []string{"foo1", "foo2", "foo3"},
want: "(hostname = 'foo1' or hostname = 'foo2' or hostname = 'foo3')",
},
}

for _, c := range cases {
d := NewDevops(time.Now(), time.Now(), 10)

if got := d.getHostWhereWithHostnames(c.hostnames); got != c.want {
t.Errorf("%s: incorrect output: got %s want %s", c.desc, got, c.want)
}
}
}

func TestDevopsGetSelectClausesAggMetrics(t *testing.T) {
cases := []struct {
desc string
agg string
metrics []string
want string
}{
{
desc: "single metric - max",
agg: "max",
metrics: []string{"foo"},
want: "max(foo)",
},
{
desc: "multiple metric - max",
agg: "max",
metrics: []string{"foo", "bar"},
want: "max(foo),max(bar)",
},
{
desc: "multiple metric - avg",
agg: "avg",
metrics: []string{"foo", "bar"},
want: "avg(foo),avg(bar)",
},
}

for _, c := range cases {
d := NewDevops(time.Now(), time.Now(), 10)

if got := strings.Join(d.getSelectClausesAggMetrics(c.agg, c.metrics), ","); got != c.want {
t.Errorf("%s: incorrect output: got %s want %s", c.desc, got, c.want)
}
}
}

func TestDevopsFillInQuery(t *testing.T) {
humanLabel := "this is my label"
humanDesc := "and now my description"
influxql := "SELECT * from cpu where usage_user > 90.0 and time < '2017-01-01'"
d := NewDevops(time.Now(), time.Now(), 10)
qi := d.GenerateEmptyQuery()
q := qi.(*query.HTTP)
if len(q.HumanLabel) != 0 {
t.Errorf("empty query has non-zero length human label")
}
if len(q.HumanDescription) != 0 {
t.Errorf("empty query has non-zero length human desc")
}
if len(q.Method) != 0 {
t.Errorf("empty query has non-zero length method")
}
if len(q.Path) != 0 {
t.Errorf("empty query has non-zero length path")
}

d.fillInQuery(q, humanLabel, humanDesc, influxql)
if string(q.HumanLabel) != humanLabel {
t.Errorf("filled query mislabeled: got %s want %s", string(q.HumanLabel), humanLabel)
}
if string(q.HumanDescription) != humanDesc {
t.Errorf("filled query mis-described: got %s want %s", string(q.HumanDescription), humanDesc)
}
if string(q.Method) != "GET" {
t.Errorf("filled query has wrong method: got %s want GET", string(q.Method))
}
v := url.Values{}
v.Set("q", influxql)
encoded := v.Encode()
if string(q.Path) != "/query?"+encoded {
t.Errorf("filled query has wrong path: got %s want /query?%s", string(q.Path), encoded)
}
}
3 changes: 3 additions & 0 deletions cmd/tsbs_load_influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ var consistencyChoices = map[string]struct{}{
"all": struct{}{},
}

// allows for testing
var fatal = log.Fatalf

// Parse args:
func init() {
loader = load.GetBenchmarkRunner()
Expand Down
13 changes: 10 additions & 3 deletions cmd/tsbs_load_influx/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package main
import (
"bufio"
"bytes"
"log"
"strings"

"bitbucket.org/440-labs/tsbs/load"
)

const errNotThreeTuplesFmt = "parse error: line does not have 3 tuples, has %d"

var newLine = []byte("\n")

type decoder struct {
Expand All @@ -20,7 +21,8 @@ func (d *decoder) Decode(_ *bufio.Reader) *load.Point {
if !ok && d.scanner.Err() == nil { // nothing scanned & no error = EOF
return nil
} else if !ok {
log.Fatalf("scan error: %v", d.scanner.Err())
fatal("scan error: %v", d.scanner.Err())
return nil
}
return load.NewPoint(d.scanner.Bytes())
}
Expand All @@ -41,7 +43,12 @@ func (b *batch) Append(item *load.Point) {
b.rows++
// Each influx line is format "csv-tags csv-fields timestamp", so we split by space
// and then on the middle element, we split by comma to count number of fields added
b.metrics += uint64(len(strings.Split(strings.Split(thatStr, " ")[1], ",")))
args := strings.Split(thatStr, " ")
if len(args) != 3 {
fatal(errNotThreeTuplesFmt, len(args))
return
}
b.metrics += uint64(len(strings.Split(args[1], ",")))

b.buf.Write(that)
b.buf.Write(newLine)
Expand Down
105 changes: 105 additions & 0 deletions cmd/tsbs_load_influx/scan_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"bufio"
"bytes"
"fmt"
"sync"
"testing"

"bitbucket.org/440-labs/tsbs/load"
)

func TestBatch(t *testing.T) {
bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 4*1024*1024))
},
}
f := &factory{}
b := f.New().(*batch)
if b.Len() != 0 {
t.Errorf("batch not initialized with count 0")
}
p := &load.Point{
Data: []byte("tag1=tag1val,tag2=tag2val col1=0.0,col2=0.0 140"),
}
b.Append(p)
if b.Len() != 1 {
t.Errorf("batch count is not 1 after first append")
}
if b.rows != 1 {
t.Errorf("batch row count is not 1 after first append")
}
if b.metrics != 2 {
t.Errorf("batch metric count is not 2 after first append")
}

p = &load.Point{
Data: []byte("tag1=tag1val,tag2=tag2val col1=1.0,col2=1.0 190"),
}
b.Append(p)
if b.Len() != 2 {
t.Errorf("batch count is not 1 after first append")
}
if b.rows != 2 {
t.Errorf("batch row count is not 1 after first append")
}
if b.metrics != 4 {
t.Errorf("batch metric count is not 2 after first append")
}

p = &load.Point{
Data: []byte("bad_point"),
}
errMsg := ""
fatal = func(f string, args ...interface{}) {
errMsg = fmt.Sprintf(f, args...)
}
b.Append(p)
if errMsg == "" {
t.Errorf("batch append did not error with ill-formed point")
}
}

func TestDecode(t *testing.T) {
cases := []struct {
desc string
input string
result []byte
shouldFatal bool
}{
{
desc: "correct input",
input: "cpu,tag1=tag1text,tag2=tag2text col1=0.0,col2=0.0 140\n",
result: []byte("cpu,tag1=tag1text,tag2=tag2text col1=0.0,col2=0.0 140"),
},
{
desc: "correct input with extra",
input: "cpu,tag1=tag1text,tag2=tag2text col1=0.0,col2=0.0 140\nextra_is_ignored",
result: []byte("cpu,tag1=tag1text,tag2=tag2text col1=0.0,col2=0.0 140"),
},
}

for _, c := range cases {
br := bufio.NewReader(bytes.NewReader([]byte(c.input)))
decoder := &decoder{scanner: bufio.NewScanner(br)}
p := decoder.Decode(br)
data := p.Data.([]byte)
if !bytes.Equal(data, c.result) {
t.Errorf("%s: incorrect result: got\n%v\nwant\n%v", c.desc, data, c.result)
}
}
}

func TestDecodeEOF(t *testing.T) {
input := []byte("cpu,tag1=tag1text,tag2=tag2text col1=0.0,col2=0.0 140")
br := bufio.NewReader(bytes.NewReader([]byte(input)))
decoder := &decoder{scanner: bufio.NewScanner(br)}
_ = decoder.Decode(br)
// nothing left, should be EOF
p := decoder.Decode(br)
if p != nil {
t.Errorf("expected p to be nil, got %v", p)
}
}
Loading

0 comments on commit ba19b8b

Please sign in to comment.