Skip to content

Commit

Permalink
Implement derivatives across intervals for aggregate queries
Browse files Browse the repository at this point in the history
For aggregate queries, derivatives will now alter the start time to one
interval behind and will use that interval to find the derivative of the
first point instead of giving no value for that interval. Null values
will still be discarded so if the interval before the one you are
querying is null, then it will be discarded like if it were in the
middle of the query. You can use `fill(0)` to fill in these values.

This does not apply to raw queries yet.

Also modified the derivative and difference aggregates to use the stream
iterator instead of the reduce slice iterator for space efficiency.

Fixes #3247. Contributes to #5943.
  • Loading branch information
jsternberg committed Apr 13, 2016
1 parent 36ca49c commit ddeedc7
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 234 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [#6237](https://github.com/influxdata/influxdb/issues/6237): Enable continuous integration testing on Windows platform via AppVeyor. Thanks @mvadu
- [#6263](https://github.com/influxdata/influxdb/pull/6263): Reduce UDP Service allocation size.
- [#6228](https://github.com/influxdata/influxdb/pull/6228): Support for multiple listeners for collectd and OpenTSDB inputs.
- [#3247](https://github.com/influxdata/influxdb/issues/3247): Implement derivatives across intervals for aggregate queries.

### Bugfixes

Expand Down
80 changes: 40 additions & 40 deletions cmd/influxd/run/server_test.go

Large diffs are not rendered by default.

150 changes: 8 additions & 142 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,178 +941,44 @@ func NewIntegerPercentileReduceSliceFunc(percentile float64) IntegerReduceSliceF

// newDerivativeIterator returns an iterator for operating on a derivative() call.
func newDerivativeIterator(input Iterator, opt IteratorOptions, interval Interval, isNonNegative bool) (Iterator, error) {
// Derivatives do not use GROUP BY intervals or time constraints, so clear these options.
opt.Interval = Interval{}
opt.StartTime, opt.EndTime = MinTime, MaxTime

switch input := input.(type) {
case FloatIterator:
floatDerivativeReduceSlice := NewFloatDerivativeReduceSliceFunc(interval, isNonNegative)
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(floatDerivativeReduceSlice)
fn := NewFloatDerivativeReducer(interval, isNonNegative, opt.Ascending)
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return newFloatStreamFloatIterator(input, createFn, opt), nil
case IntegerIterator:
integerDerivativeReduceSlice := NewIntegerDerivativeReduceSliceFunc(interval, isNonNegative)
createFn := func() (IntegerPointAggregator, FloatPointEmitter) {
fn := NewIntegerSliceFuncFloatReducer(integerDerivativeReduceSlice)
fn := NewIntegerDerivativeReducer(interval, isNonNegative, opt.Ascending)
return fn, fn
}
return &integerReduceFloatIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return newIntegerStreamFloatIterator(input, createFn, opt), nil
default:
return nil, fmt.Errorf("unsupported derivative iterator type: %T", input)
}
}

// NewFloatDerivativeReduceSliceFunc returns the derivative value within a window.
func NewFloatDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) FloatReduceSliceFunc {
prev := FloatPoint{Nil: true}

return func(a []FloatPoint) []FloatPoint {
if len(a) == 0 {
return a
} else if len(a) == 1 {
return []FloatPoint{{Time: a[0].Time, Nil: true}}
}

if prev.Nil {
prev = a[0]
}

output := make([]FloatPoint, 0, len(a)-1)
for i := 1; i < len(a); i++ {
p := &a[i]

// Calculate the derivative of successive points by dividing the
// difference of each value by the elapsed time normalized to the interval.
diff := p.Value - prev.Value
elapsed := p.Time - prev.Time

value := 0.0
if elapsed > 0 {
value = diff / (float64(elapsed) / float64(interval.Duration))
}

prev = *p

// Drop negative values for non-negative derivatives.
if isNonNegative && diff < 0 {
continue
}

output = append(output, FloatPoint{Time: p.Time, Value: value})
}
return output
}
}

// NewIntegerDerivativeReduceSliceFunc returns the derivative value within a window.
func NewIntegerDerivativeReduceSliceFunc(interval Interval, isNonNegative bool) IntegerReduceFloatSliceFunc {
prev := IntegerPoint{Nil: true}

return func(a []IntegerPoint) []FloatPoint {
if len(a) == 0 {
return []FloatPoint{}
} else if len(a) == 1 {
return []FloatPoint{{Time: a[0].Time, Nil: true}}
}

if prev.Nil {
prev = a[0]
}

output := make([]FloatPoint, 0, len(a)-1)
for i := 1; i < len(a); i++ {
p := &a[i]

// Calculate the derivative of successive points by dividing the
// difference of each value by the elapsed time normalized to the interval.
diff := float64(p.Value - prev.Value)
elapsed := p.Time - prev.Time

value := 0.0
if elapsed > 0 {
value = diff / (float64(elapsed) / float64(interval.Duration))
}

prev = *p

// Drop negative values for non-negative derivatives.
if isNonNegative && diff < 0 {
continue
}

output = append(output, FloatPoint{Time: p.Time, Value: value})
}
return output
}
}

// newDifferenceIterator returns an iterator for operating on a difference() call.
func newDifferenceIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
// Differences do not use GROUP BY intervals or time constraints, so clear these options.
opt.Interval = Interval{}
opt.StartTime, opt.EndTime = MinTime, MaxTime

switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(FloatDifferenceReduceSlice)
fn := NewFloatDifferenceReducer()
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
return newFloatStreamFloatIterator(input, createFn, opt), nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(IntegerDifferenceReduceSlice)
fn := NewIntegerDifferenceReducer()
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
return newIntegerStreamIntegerIterator(input, createFn, opt), nil
default:
return nil, fmt.Errorf("unsupported difference iterator type: %T", input)
}
}

// FloatDifferenceReduceSlice returns the difference values within a window.
func FloatDifferenceReduceSlice(a []FloatPoint) []FloatPoint {
if len(a) < 2 {
return []FloatPoint{}
}
prev := a[0]

output := make([]FloatPoint, 0, len(a)-1)
for i := 1; i < len(a); i++ {
p := &a[i]

// Calculate the difference of successive points.
value := p.Value - prev.Value
prev = *p

output = append(output, FloatPoint{Time: p.Time, Value: value})
}
return output
}

// IntegerDifferenceReduceSlice returns the difference values within a window.
func IntegerDifferenceReduceSlice(a []IntegerPoint) []IntegerPoint {
if len(a) < 2 {
return []IntegerPoint{}
}
prev := a[0]

output := make([]IntegerPoint, 0, len(a)-1)
for i := 1; i < len(a); i++ {
p := &a[i]

// Calculate the difference of successive points.
value := p.Value - prev.Value
prev = *p

output = append(output, IntegerPoint{Time: p.Time, Value: value})
}
return output
}

// newMovingAverageIterator returns an iterator for operating on a moving_average() call.
func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
Expand Down
162 changes: 162 additions & 0 deletions influxql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,168 @@ func (r *IntegerMeanReducer) Emit() []FloatPoint {
}}
}

// FloatDerivativeReducer calculates the derivative of the aggregated points.
type FloatDerivativeReducer struct {
interval Interval
prev FloatPoint
curr FloatPoint
isNonNegative bool
ascending bool
}

// NewFloatDerivativeReducer creates a new FloatDerivativeReducer.
func NewFloatDerivativeReducer(interval Interval, isNonNegative, ascending bool) *FloatDerivativeReducer {
return &FloatDerivativeReducer{
interval: interval,
isNonNegative: isNonNegative,
ascending: ascending,
prev: FloatPoint{Nil: true},
curr: FloatPoint{Nil: true},
}
}

// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatDerivativeReducer) AggregateFloat(p *FloatPoint) {
r.prev = r.curr
r.curr = *p
}

// Emit emits the derivative of the reducer at the current point.
func (r *FloatDerivativeReducer) Emit() []FloatPoint {
if !r.prev.Nil {
// Calculate the derivative of successive points by dividing the
// difference of each value by the elapsed time normalized to the interval.
diff := r.curr.Value - r.prev.Value
elapsed := r.curr.Time - r.prev.Time
if !r.ascending {
elapsed = -elapsed
}

value := 0.0
if elapsed > 0 {
value = diff / (float64(elapsed) / float64(r.interval.Duration))
}

// Drop negative values for non-negative derivatives.
if r.isNonNegative && diff < 0 {
return nil
}
return []FloatPoint{{Time: r.curr.Time, Value: value}}
}
return nil
}

// IntegerDerivativeReducer calculates the derivative of the aggregated points.
type IntegerDerivativeReducer struct {
interval Interval
prev IntegerPoint
curr IntegerPoint
isNonNegative bool
ascending bool
}

// NewIntegerDerivativeReducer creates a new IntegerDerivativeReducer.
func NewIntegerDerivativeReducer(interval Interval, isNonNegative, ascending bool) *IntegerDerivativeReducer {
return &IntegerDerivativeReducer{
interval: interval,
isNonNegative: isNonNegative,
ascending: ascending,
prev: IntegerPoint{Nil: true},
curr: IntegerPoint{Nil: true},
}
}

// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerDerivativeReducer) AggregateInteger(p *IntegerPoint) {
r.prev = r.curr
r.curr = *p
}

// Emit emits the derivative of the reducer at the current point.
func (r *IntegerDerivativeReducer) Emit() []FloatPoint {
if !r.prev.Nil {
// Calculate the derivative of successive points by dividing the
// difference of each value by the elapsed time normalized to the interval.
diff := float64(r.curr.Value - r.prev.Value)
elapsed := r.curr.Time - r.prev.Time
if !r.ascending {
elapsed = -elapsed
}

value := 0.0
if elapsed > 0 {
value = diff / (float64(elapsed) / float64(r.interval.Duration))
}

// Drop negative values for non-negative derivatives.
if r.isNonNegative && diff < 0 {
return nil
}
return []FloatPoint{{Time: r.curr.Time, Value: value}}
}
return nil
}

// FloatDifferenceReducer calculates the derivative of the aggregated points.
type FloatDifferenceReducer struct {
prev FloatPoint
curr FloatPoint
}

// NewFloatDifferenceReducer creates a new FloatDifferenceReducer.
func NewFloatDifferenceReducer() *FloatDifferenceReducer {
return &FloatDifferenceReducer{
prev: FloatPoint{Nil: true},
curr: FloatPoint{Nil: true},
}
}

// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatDifferenceReducer) AggregateFloat(p *FloatPoint) {
r.prev = r.curr
r.curr = *p
}

// Emit emits the difference of the reducer at the current point.
func (r *FloatDifferenceReducer) Emit() []FloatPoint {
if !r.prev.Nil {
// Calculate the difference of successive points.
value := r.curr.Value - r.prev.Value
return []FloatPoint{{Time: r.curr.Time, Value: value}}
}
return nil
}

// IntegerDifferenceReducer calculates the derivative of the aggregated points.
type IntegerDifferenceReducer struct {
prev IntegerPoint
curr IntegerPoint
}

// NewIntegerDifferenceReducer creates a new IntegerDifferenceReducer.
func NewIntegerDifferenceReducer() *IntegerDifferenceReducer {
return &IntegerDifferenceReducer{
prev: IntegerPoint{Nil: true},
curr: IntegerPoint{Nil: true},
}
}

// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerDifferenceReducer) AggregateInteger(p *IntegerPoint) {
r.prev = r.curr
r.curr = *p
}

// Emit emits the difference of the reducer at the current point.
func (r *IntegerDifferenceReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
// Calculate the difference of successive points.
value := r.curr.Value - r.prev.Value
return []IntegerPoint{{Time: r.curr.Time, Value: value}}
}
return nil
}

// FloatMovingAverageReducer calculates the moving average of the aggregated points.
type FloatMovingAverageReducer struct {
pos int
Expand Down
Loading

0 comments on commit ddeedc7

Please sign in to comment.