Skip to content

Commit

Permalink
Implement the difference function
Browse files Browse the repository at this point in the history
The difference function is implemented very similar to how derivative is
implemented. It is an aggregate function that acts over the entire
aggregate. This function will also have the same problems that
derivative has with getting values from the previous interval or point.
This will be fixed separately as part of #5943.

Fixes #1825.
  • Loading branch information
jsternberg committed Mar 24, 2016
1 parent 5e8e849 commit 9c22c7e
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 11 deletions.
213 changes: 213 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,219 @@ cpu value=20 1278010021000000000
}
}

// Ensure the server can handle various group by time difference queries.
func TestServer_Query_SelectGroupByTimeDifference(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
t.Fatal(err)
}

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=10 1278010020000000000
cpu value=15 1278010021000000000
cpu value=20 1278010022000000000
cpu value=25 1278010023000000000
`)},
}

test.addQueries([]*Query{
&Query{
name: "calculate difference of count",
command: `SELECT difference(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of mean",
command: `SELECT difference(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate difference of median",
command: `SELECT difference(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate difference of sum",
command: `SELECT difference(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",20]]}]}]}`,
},
&Query{
name: "calculate difference of first",
command: `SELECT difference(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate difference of last",
command: `SELECT difference(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate difference of min",
command: `SELECT difference(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate difference of max",
command: `SELECT difference(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
&Query{
name: "calculate difference of percentile",
command: `SELECT difference(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",10]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

// Ensure the server can handle various group by time difference queries.
func TestServer_Query_SelectGroupByTimeDifferenceWithFill(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
t.Fatal(err)
}

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: fmt.Sprintf(`cpu value=10 1278010020000000000
cpu value=20 1278010021000000000
`)},
}

test.addQueries([]*Query{
&Query{
name: "calculate difference of count with fill 0",
command: `SELECT difference(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-2]]}]}]}`,
},
&Query{
name: "calculate difference of count with fill previous",
command: `SELECT difference(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of mean with fill 0",
command: `SELECT difference(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-15]]}]}]}`,
},
&Query{
name: "calculate difference of mean with fill previous",
command: `SELECT difference(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of median with fill 0",
command: `SELECT difference(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-15]]}]}]}`,
},
&Query{
name: "calculate difference of median with fill previous",
command: `SELECT difference(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of sum with fill 0",
command: `SELECT difference(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-30]]}]}]}`,
},
&Query{
name: "calculate difference of sum with fill previous",
command: `SELECT difference(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of first with fill 0",
command: `SELECT difference(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-10]]}]}]}`,
},
&Query{
name: "calculate difference of first with fill previous",
command: `SELECT difference(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of last with fill 0",
command: `SELECT difference(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-20]]}]}]}`,
},
&Query{
name: "calculate difference of last with fill previous",
command: `SELECT difference(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of min with fill 0",
command: `SELECT difference(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-10]]}]}]}`,
},
&Query{
name: "calculate difference of min with fill previous",
command: `SELECT difference(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of max with fill 0",
command: `SELECT difference(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-20]]}]}]}`,
},
&Query{
name: "calculate difference of max with fill previous",
command: `SELECT difference(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
&Query{
name: "calculate difference of percentile with fill 0",
command: `SELECT difference(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",-10]]}]}]}`,
},
&Query{
name: "calculate difference of percentile with fill previous",
command: `SELECT difference(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","difference"],"values":[["2010-07-01T18:47:02Z",0]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

// mergeMany ensures that when merging many series together and some of them have a different number
// of points than others in a group by interval the results are correct
func TestServer_Query_MergeMany(t *testing.T) {
Expand Down
13 changes: 10 additions & 3 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,12 +1455,19 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
for _, f := range s.Fields {
for _, expr := range walkFunctionCalls(f.Expr) {
switch expr.Name {
case "derivative", "non_negative_derivative":
case "derivative", "non_negative_derivative", "difference":
if err := s.validSelectWithAggregate(); err != nil {
return err
}
if min, max, got := 1, 2, len(expr.Args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", expr.Name, min, max, got)
switch expr.Name {
case "derivative", "non_negative_derivative":
if min, max, got := 1, 2, len(expr.Args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", expr.Name, min, max, got)
}
case "difference":
if len(expr.Args) != 1 {
return fmt.Errorf("invalid number of arguments for difference, expected 1, got 0")
}
}
// Validate that if they have grouping by time, they need a sub-call like min/max, etc.
groupByInterval, err := s.GroupByInterval()
Expand Down
66 changes: 66 additions & 0 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,10 @@ func NewIntegerPercentileReduceSliceFunc(percentile float64) IntegerReduceSliceF

// newDerivativeIterator returns an iterator for operating on a derivative() call.
func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) {
// Derivatives do not use GROUP BY intervals or time constraints, so clear these options.
opt.Interval = Interval{}
opt.StartTime, opt.EndTime = MinTime, MaxTime

switch input := input.(type) {
case FloatIterator:
floatDerivativeReduceSlice := NewFloatDerivativeReduceSliceFunc(interval, isNonNegative)
Expand Down Expand Up @@ -1112,3 +1116,65 @@ func NewIntegerDerivativeReduceSliceFunc(interval Interval, isNonNegative bool)
return output
}
}

// newDifferenceIterator returns an iterator for operating on a difference() call.
func newDifferenceIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
// Differences do not use GROUP BY intervals or time constraints, so clear these options.
opt.Interval = Interval{}
opt.StartTime, opt.EndTime = MinTime, MaxTime

switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(FloatDifferenceReduceSlice)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(IntegerDifferenceReduceSlice)
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
default:
return nil, fmt.Errorf("unsupported difference iterator type: %T", input)
}
}

func FloatDifferenceReduceSlice(a []FloatPoint) []FloatPoint {
if len(a) < 2 {
return []FloatPoint{}
}
prev := a[0]

output := make([]FloatPoint, 0, len(a)-1)
for i := 1; i < len(a); i++ {
p := &a[i]

// Calculate the difference of successive points.
value := p.Value - prev.Value
prev = *p

output = append(output, FloatPoint{Time: p.Time, Value: value})
}
return output
}

func IntegerDifferenceReduceSlice(a []IntegerPoint) []IntegerPoint {
if len(a) < 2 {
return []IntegerPoint{}
}
prev := a[0]

output := make([]IntegerPoint, 0, len(a)-1)
for i := 1; i < len(a); i++ {
p := &a[i]

// Calculate the difference of successive points.
value := p.Value - prev.Value
prev = *p

output = append(output, IntegerPoint{Time: p.Time, Value: value})
}
return output
}
58 changes: 58 additions & 0 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,56 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// difference
{
s: `SELECT difference(field1) FROM myseries;`,
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{Name: "difference", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
},
},

{
s: fmt.Sprintf(`SELECT difference(max(field1)) FROM myseries WHERE time > '%s' GROUP BY time(1m)`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{
Expr: &influxql.Call{
Name: "difference",
Args: []influxql.Expr{
&influxql.Call{
Name: "max",
Args: []influxql.Expr{
&influxql.VarRef{Val: "field1"},
},
},
},
},
},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
Dimensions: []*influxql.Dimension{
{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{Val: time.Minute},
},
},
},
},
Condition: &influxql.BinaryExpr{
Op: influxql.GT,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.TimeLiteral{Val: now.UTC()},
},
},
},

// SELECT statement (lowercase)
{
s: `select my_field from myseries`,
Expand Down Expand Up @@ -1730,6 +1780,14 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SELECT non_negative_derivative(max()) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`},
{s: `SELECT non_negative_derivative(mean(value), 1h) FROM myseries where time < now() and time > now() - 1d`, err: `non_negative_derivative aggregate requires a GROUP BY interval`},
{s: `SELECT non_negative_derivative(percentile(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`},
{s: `SELECT difference(), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`},
{s: `SELECT difference() from myseries`, err: `invalid number of arguments for difference, expected 1, got 0`},
{s: `SELECT difference(value) FROM myseries group by time(1h)`, err: `aggregate function required inside the call to difference`},
{s: `SELECT difference(top(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for top, expected at least 2, got 1`},
{s: `SELECT difference(bottom(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for bottom, expected at least 2, got 1`},
{s: `SELECT difference(max()) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`},
{s: `SELECT difference(percentile(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`},
{s: `SELECT difference(mean(value)) FROM myseries where time < now() and time > now() - 1d`, err: `difference aggregate requires a GROUP BY interval`},
{s: `SELECT field1 from myseries WHERE host =~ 'asd' LIMIT 1`, err: `found asd, expected regex at line 1, char 42`},
{s: `SELECT value > 2 FROM cpu`, err: `invalid operator > in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},
{s: `SELECT value = 2 FROM cpu`, err: `invalid operator = in SELECT clause at line 1, char 8; operator is intended for WHERE clause`},
Expand Down
Loading

0 comments on commit 9c22c7e

Please sign in to comment.