Skip to content

Commit

Permalink
Merge pull request #6291 from influxdata/js-6261-optimize-distinct
Browse files Browse the repository at this point in the history
Optimize the distinct call
  • Loading branch information
jsternberg committed Apr 12, 2016
2 parents 93a76ee + f7f35af commit 50bd784
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 73 deletions.
76 changes: 4 additions & 72 deletions influxql/call_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,25 +363,25 @@ func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error)
switch input := input.(type) {
case FloatIterator:
createFn := func() (FloatPointAggregator, FloatPointEmitter) {
fn := NewFloatSliceFuncReducer(FloatDistinctReduceSlice)
fn := NewFloatDistinctReducer()
return fn, fn
}
return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil
case IntegerIterator:
createFn := func() (IntegerPointAggregator, IntegerPointEmitter) {
fn := NewIntegerSliceFuncReducer(IntegerDistinctReduceSlice)
fn := NewIntegerDistinctReducer()
return fn, fn
}
return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil
case StringIterator:
createFn := func() (StringPointAggregator, StringPointEmitter) {
fn := NewStringSliceFuncReducer(StringDistinctReduceSlice)
fn := NewStringDistinctReducer()
return fn, fn
}
return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil
case BooleanIterator:
createFn := func() (BooleanPointAggregator, BooleanPointEmitter) {
fn := NewBooleanSliceFuncReducer(BooleanDistinctReduceSlice)
fn := NewBooleanDistinctReducer()
return fn, fn
}
return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil
Expand All @@ -390,74 +390,6 @@ func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error)
}
}

// FloatDistinctReduceSlice returns the distinct value within a window.
func FloatDistinctReduceSlice(a []FloatPoint) []FloatPoint {
m := make(map[float64]FloatPoint)
for _, p := range a {
if _, ok := m[p.Value]; !ok {
m[p.Value] = p
}
}

points := make([]FloatPoint, 0, len(m))
for _, p := range m {
points = append(points, FloatPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(floatPoints(points))
return points
}

// IntegerDistinctReduceSlice returns the distinct value within a window.
func IntegerDistinctReduceSlice(a []IntegerPoint) []IntegerPoint {
m := make(map[int64]IntegerPoint)
for _, p := range a {
if _, ok := m[p.Value]; !ok {
m[p.Value] = p
}
}

points := make([]IntegerPoint, 0, len(m))
for _, p := range m {
points = append(points, IntegerPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(integerPoints(points))
return points
}

// StringDistinctReduceSlice returns the distinct value within a window.
func StringDistinctReduceSlice(a []StringPoint) []StringPoint {
m := make(map[string]StringPoint)
for _, p := range a {
if _, ok := m[p.Value]; !ok {
m[p.Value] = p
}
}

points := make([]StringPoint, 0, len(m))
for _, p := range m {
points = append(points, StringPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(stringPoints(points))
return points
}

// BooleanDistinctReduceSlice returns the distinct value within a window.
func BooleanDistinctReduceSlice(a []BooleanPoint) []BooleanPoint {
m := make(map[bool]BooleanPoint)
for _, p := range a {
if _, ok := m[p.Value]; !ok {
m[p.Value] = p
}
}

points := make([]BooleanPoint, 0, len(m))
for _, p := range m {
points = append(points, BooleanPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(booleanPoints(points))
return points
}

// newMeanIterator returns an iterator for operating on a mean() call.
func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) {
switch input := input.(type) {
Expand Down
27 changes: 27 additions & 0 deletions influxql/call_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,33 @@ func benchmarkCallIterator(b *testing.B, opt influxql.IteratorOptions, pointN in
}
}

func BenchmarkDistinctIterator_1K(b *testing.B) { benchmarkDistinctIterator(b, 1000) }
func BenchmarkDistinctIterator_100K(b *testing.B) { benchmarkDistinctIterator(b, 100000) }
func BenchmarkDistinctIterator_1M(b *testing.B) { benchmarkDistinctIterator(b, 1000000) }

func benchmarkDistinctIterator(b *testing.B, pointN int) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
// Create a lightweight point generator.
p := influxql.FloatPoint{Name: "cpu"}
input := FloatPointGenerator{
N: pointN,
Fn: func(i int) *influxql.FloatPoint {
p.Value = float64(i % 10)
return &p
},
}

// Execute call against input.
itr, err := influxql.NewDistinctIterator(&input, influxql.IteratorOptions{})
if err != nil {
b.Fatal(err)
}
influxql.DrainIterator(itr)
}
}

type FloatPointGenerator struct {
i int
N int
Expand Down
110 changes: 110 additions & 0 deletions influxql/functions.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package influxql

import "sort"

// FloatPointAggregator aggregates points to produce a single point.
type FloatPointAggregator interface {
AggregateFloat(p *FloatPoint)
Expand Down Expand Up @@ -255,6 +257,33 @@ func (r *FloatSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}

// FloatDistinctReducer returns the distinct points in a series.
type FloatDistinctReducer struct {
m map[float64]FloatPoint
}

// NewFloatDistinctReducer creates a new FloatDistinctReducer.
func NewFloatDistinctReducer() *FloatDistinctReducer {
return &FloatDistinctReducer{m: make(map[float64]FloatPoint)}
}

// AggregateFloat aggregates a point into the reducer.
func (r *FloatDistinctReducer) AggregateFloat(p *FloatPoint) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}

// Emit emits the distinct points that have been aggregated into the reducer.
func (r *FloatDistinctReducer) Emit() []FloatPoint {
points := make([]FloatPoint, 0, len(r.m))
for _, p := range r.m {
points = append(points, FloatPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(floatPoints(points))
return points
}

// IntegerPointAggregator aggregates points to produce a single point.
type IntegerPointAggregator interface {
AggregateInteger(p *IntegerPoint)
Expand Down Expand Up @@ -504,6 +533,33 @@ func (r *IntegerSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}

// IntegerDistinctReducer returns the distinct points in a series.
type IntegerDistinctReducer struct {
m map[int64]IntegerPoint
}

// NewIntegerDistinctReducer creates a new IntegerDistinctReducer.
func NewIntegerDistinctReducer() *IntegerDistinctReducer {
return &IntegerDistinctReducer{m: make(map[int64]IntegerPoint)}
}

// AggregateInteger aggregates a point into the reducer.
func (r *IntegerDistinctReducer) AggregateInteger(p *IntegerPoint) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}

// Emit emits the distinct points that have been aggregated into the reducer.
func (r *IntegerDistinctReducer) Emit() []IntegerPoint {
points := make([]IntegerPoint, 0, len(r.m))
for _, p := range r.m {
points = append(points, IntegerPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(integerPoints(points))
return points
}

// StringPointAggregator aggregates points to produce a single point.
type StringPointAggregator interface {
AggregateString(p *StringPoint)
Expand Down Expand Up @@ -753,6 +809,33 @@ func (r *StringSliceFuncBooleanReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}

// StringDistinctReducer returns the distinct points in a series.
type StringDistinctReducer struct {
m map[string]StringPoint
}

// NewStringDistinctReducer creates a new StringDistinctReducer.
func NewStringDistinctReducer() *StringDistinctReducer {
return &StringDistinctReducer{m: make(map[string]StringPoint)}
}

// AggregateString aggregates a point into the reducer.
func (r *StringDistinctReducer) AggregateString(p *StringPoint) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}

// Emit emits the distinct points that have been aggregated into the reducer.
func (r *StringDistinctReducer) Emit() []StringPoint {
points := make([]StringPoint, 0, len(r.m))
for _, p := range r.m {
points = append(points, StringPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(stringPoints(points))
return points
}

// BooleanPointAggregator aggregates points to produce a single point.
type BooleanPointAggregator interface {
AggregateBoolean(p *BooleanPoint)
Expand Down Expand Up @@ -1001,3 +1084,30 @@ func (r *BooleanSliceFuncReducer) AggregateBooleanBulk(points []BooleanPoint) {
func (r *BooleanSliceFuncReducer) Emit() []BooleanPoint {
return r.fn(r.points)
}

// BooleanDistinctReducer returns the distinct points in a series.
type BooleanDistinctReducer struct {
m map[bool]BooleanPoint
}

// NewBooleanDistinctReducer creates a new BooleanDistinctReducer.
func NewBooleanDistinctReducer() *BooleanDistinctReducer {
return &BooleanDistinctReducer{m: make(map[bool]BooleanPoint)}
}

// AggregateBoolean aggregates a point into the reducer.
func (r *BooleanDistinctReducer) AggregateBoolean(p *BooleanPoint) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}

// Emit emits the distinct points that have been aggregated into the reducer.
func (r *BooleanDistinctReducer) Emit() []BooleanPoint {
points := make([]BooleanPoint, 0, len(r.m))
for _, p := range r.m {
points = append(points, BooleanPoint{Time: p.Time, Value: p.Value})
}
sort.Sort(booleanPoints(points))
return points
}
33 changes: 32 additions & 1 deletion influxql/functions.gen.go.tmpl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package influxql

import "sort"

{{with $types := .}}{{range $k := $types}}

// {{$k.Name}}PointAggregator aggregates points to produce a single point.
Expand Down Expand Up @@ -87,4 +89,33 @@ func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer)
func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Emit() []{{$v.Name}}Point {
return r.fn(r.points)
}
{{end}}{{end}}{{end}}
{{end}}

// {{$k.Name}}DistinctReducer returns the distinct points in a series.
type {{$k.Name}}DistinctReducer struct {
m map[{{$k.Type}}]{{$k.Name}}Point
}

// New{{$k.Name}}DistinctReducer creates a new {{$k.Name}}DistinctReducer.
func New{{$k.Name}}DistinctReducer() *{{$k.Name}}DistinctReducer {
return &{{$k.Name}}DistinctReducer{m: make(map[{{$k.Type}}]{{$k.Name}}Point)}
}

// Aggregate{{$k.Name}} aggregates a point into the reducer.
func (r *{{$k.Name}}DistinctReducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) {
if _, ok := r.m[p.Value]; !ok {
r.m[p.Value] = *p
}
}

// Emit emits the distinct points that have been aggregated into the reducer.
func (r *{{$k.Name}}DistinctReducer) Emit() []{{$k.Name}}Point {
points := make([]{{$k.Name}}Point, 0, len(r.m))
for _, p := range r.m {
points = append(points, {{$k.Name}}Point{Time: p.Time, Value: p.Value})
}
sort.Sort({{$k.name}}Points(points))
return points
}

{{end}}{{end}}

0 comments on commit 50bd784

Please sign in to comment.