From 6708d0c439479a1b0faf0503719f19a0047f60cf Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 7 Apr 2016 17:37:48 -0400 Subject: [PATCH 1/2] Optimize the distinct call Change distinct so it uses a custom reducer that keeps internal state instead of requiring all of the points to be kept as a slice in memory. Fixes #6261. --- influxql/call_iterator.go | 76 ++--------------------- influxql/functions.gen.go | 110 +++++++++++++++++++++++++++++++++ influxql/functions.gen.go.tmpl | 33 +++++++++- 3 files changed, 146 insertions(+), 73 deletions(-) diff --git a/influxql/call_iterator.go b/influxql/call_iterator.go index 2eeb20dda70..efa9e785125 100644 --- a/influxql/call_iterator.go +++ b/influxql/call_iterator.go @@ -363,25 +363,25 @@ func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) switch input := input.(type) { case FloatIterator: createFn := func() (FloatPointAggregator, FloatPointEmitter) { - fn := NewFloatSliceFuncReducer(FloatDistinctReduceSlice) + fn := NewFloatDistinctReducer() return fn, fn } return &floatReduceFloatIterator{input: newBufFloatIterator(input), opt: opt, create: createFn}, nil case IntegerIterator: createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { - fn := NewIntegerSliceFuncReducer(IntegerDistinctReduceSlice) + fn := NewIntegerDistinctReducer() return fn, fn } return &integerReduceIntegerIterator{input: newBufIntegerIterator(input), opt: opt, create: createFn}, nil case StringIterator: createFn := func() (StringPointAggregator, StringPointEmitter) { - fn := NewStringSliceFuncReducer(StringDistinctReduceSlice) + fn := NewStringDistinctReducer() return fn, fn } return &stringReduceStringIterator{input: newBufStringIterator(input), opt: opt, create: createFn}, nil case BooleanIterator: createFn := func() (BooleanPointAggregator, BooleanPointEmitter) { - fn := NewBooleanSliceFuncReducer(BooleanDistinctReduceSlice) + fn := NewBooleanDistinctReducer() return fn, fn } return &booleanReduceBooleanIterator{input: newBufBooleanIterator(input), opt: opt, create: createFn}, nil @@ -390,74 +390,6 @@ func NewDistinctIterator(input Iterator, opt IteratorOptions) (Iterator, error) } } -// FloatDistinctReduceSlice returns the distinct value within a window. -func FloatDistinctReduceSlice(a []FloatPoint) []FloatPoint { - m := make(map[float64]FloatPoint) - for _, p := range a { - if _, ok := m[p.Value]; !ok { - m[p.Value] = p - } - } - - points := make([]FloatPoint, 0, len(m)) - for _, p := range m { - points = append(points, FloatPoint{Time: p.Time, Value: p.Value}) - } - sort.Sort(floatPoints(points)) - return points -} - -// IntegerDistinctReduceSlice returns the distinct value within a window. -func IntegerDistinctReduceSlice(a []IntegerPoint) []IntegerPoint { - m := make(map[int64]IntegerPoint) - for _, p := range a { - if _, ok := m[p.Value]; !ok { - m[p.Value] = p - } - } - - points := make([]IntegerPoint, 0, len(m)) - for _, p := range m { - points = append(points, IntegerPoint{Time: p.Time, Value: p.Value}) - } - sort.Sort(integerPoints(points)) - return points -} - -// StringDistinctReduceSlice returns the distinct value within a window. -func StringDistinctReduceSlice(a []StringPoint) []StringPoint { - m := make(map[string]StringPoint) - for _, p := range a { - if _, ok := m[p.Value]; !ok { - m[p.Value] = p - } - } - - points := make([]StringPoint, 0, len(m)) - for _, p := range m { - points = append(points, StringPoint{Time: p.Time, Value: p.Value}) - } - sort.Sort(stringPoints(points)) - return points -} - -// BooleanDistinctReduceSlice returns the distinct value within a window. -func BooleanDistinctReduceSlice(a []BooleanPoint) []BooleanPoint { - m := make(map[bool]BooleanPoint) - for _, p := range a { - if _, ok := m[p.Value]; !ok { - m[p.Value] = p - } - } - - points := make([]BooleanPoint, 0, len(m)) - for _, p := range m { - points = append(points, BooleanPoint{Time: p.Time, Value: p.Value}) - } - sort.Sort(booleanPoints(points)) - return points -} - // newMeanIterator returns an iterator for operating on a mean() call. func newMeanIterator(input Iterator, opt IteratorOptions) (Iterator, error) { switch input := input.(type) { diff --git a/influxql/functions.gen.go b/influxql/functions.gen.go index 717efc84f91..1031ff28f1c 100644 --- a/influxql/functions.gen.go +++ b/influxql/functions.gen.go @@ -6,6 +6,8 @@ package influxql +import "sort" + // FloatPointAggregator aggregates points to produce a single point. type FloatPointAggregator interface { AggregateFloat(p *FloatPoint) @@ -255,6 +257,33 @@ func (r *FloatSliceFuncBooleanReducer) Emit() []BooleanPoint { return r.fn(r.points) } +// FloatDistinctReducer returns the distinct points in a series. +type FloatDistinctReducer struct { + m map[float64]FloatPoint +} + +// NewFloatDistinctReducer creates a new FloatDistinctReducer. +func NewFloatDistinctReducer() *FloatDistinctReducer { + return &FloatDistinctReducer{m: make(map[float64]FloatPoint)} +} + +// AggregateFloat aggregates a point into the reducer. +func (r *FloatDistinctReducer) AggregateFloat(p *FloatPoint) { + if _, ok := r.m[p.Value]; !ok { + r.m[p.Value] = *p + } +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *FloatDistinctReducer) Emit() []FloatPoint { + points := make([]FloatPoint, 0, len(r.m)) + for _, p := range r.m { + points = append(points, FloatPoint{Time: p.Time, Value: p.Value}) + } + sort.Sort(floatPoints(points)) + return points +} + // IntegerPointAggregator aggregates points to produce a single point. type IntegerPointAggregator interface { AggregateInteger(p *IntegerPoint) @@ -504,6 +533,33 @@ func (r *IntegerSliceFuncBooleanReducer) Emit() []BooleanPoint { return r.fn(r.points) } +// IntegerDistinctReducer returns the distinct points in a series. +type IntegerDistinctReducer struct { + m map[int64]IntegerPoint +} + +// NewIntegerDistinctReducer creates a new IntegerDistinctReducer. +func NewIntegerDistinctReducer() *IntegerDistinctReducer { + return &IntegerDistinctReducer{m: make(map[int64]IntegerPoint)} +} + +// AggregateInteger aggregates a point into the reducer. +func (r *IntegerDistinctReducer) AggregateInteger(p *IntegerPoint) { + if _, ok := r.m[p.Value]; !ok { + r.m[p.Value] = *p + } +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *IntegerDistinctReducer) Emit() []IntegerPoint { + points := make([]IntegerPoint, 0, len(r.m)) + for _, p := range r.m { + points = append(points, IntegerPoint{Time: p.Time, Value: p.Value}) + } + sort.Sort(integerPoints(points)) + return points +} + // StringPointAggregator aggregates points to produce a single point. type StringPointAggregator interface { AggregateString(p *StringPoint) @@ -753,6 +809,33 @@ func (r *StringSliceFuncBooleanReducer) Emit() []BooleanPoint { return r.fn(r.points) } +// StringDistinctReducer returns the distinct points in a series. +type StringDistinctReducer struct { + m map[string]StringPoint +} + +// NewStringDistinctReducer creates a new StringDistinctReducer. +func NewStringDistinctReducer() *StringDistinctReducer { + return &StringDistinctReducer{m: make(map[string]StringPoint)} +} + +// AggregateString aggregates a point into the reducer. +func (r *StringDistinctReducer) AggregateString(p *StringPoint) { + if _, ok := r.m[p.Value]; !ok { + r.m[p.Value] = *p + } +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *StringDistinctReducer) Emit() []StringPoint { + points := make([]StringPoint, 0, len(r.m)) + for _, p := range r.m { + points = append(points, StringPoint{Time: p.Time, Value: p.Value}) + } + sort.Sort(stringPoints(points)) + return points +} + // BooleanPointAggregator aggregates points to produce a single point. type BooleanPointAggregator interface { AggregateBoolean(p *BooleanPoint) @@ -1001,3 +1084,30 @@ func (r *BooleanSliceFuncReducer) AggregateBooleanBulk(points []BooleanPoint) { func (r *BooleanSliceFuncReducer) Emit() []BooleanPoint { return r.fn(r.points) } + +// BooleanDistinctReducer returns the distinct points in a series. +type BooleanDistinctReducer struct { + m map[bool]BooleanPoint +} + +// NewBooleanDistinctReducer creates a new BooleanDistinctReducer. +func NewBooleanDistinctReducer() *BooleanDistinctReducer { + return &BooleanDistinctReducer{m: make(map[bool]BooleanPoint)} +} + +// AggregateBoolean aggregates a point into the reducer. +func (r *BooleanDistinctReducer) AggregateBoolean(p *BooleanPoint) { + if _, ok := r.m[p.Value]; !ok { + r.m[p.Value] = *p + } +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *BooleanDistinctReducer) Emit() []BooleanPoint { + points := make([]BooleanPoint, 0, len(r.m)) + for _, p := range r.m { + points = append(points, BooleanPoint{Time: p.Time, Value: p.Value}) + } + sort.Sort(booleanPoints(points)) + return points +} diff --git a/influxql/functions.gen.go.tmpl b/influxql/functions.gen.go.tmpl index d8f0f13ed20..47a64fc6da9 100644 --- a/influxql/functions.gen.go.tmpl +++ b/influxql/functions.gen.go.tmpl @@ -1,5 +1,7 @@ package influxql +import "sort" + {{with $types := .}}{{range $k := $types}} // {{$k.Name}}PointAggregator aggregates points to produce a single point. @@ -87,4 +89,33 @@ func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) func (r *{{$k.Name}}SliceFunc{{if ne $k.Name $v.Name}}{{$v.Name}}{{end}}Reducer) Emit() []{{$v.Name}}Point { return r.fn(r.points) } -{{end}}{{end}}{{end}} +{{end}} + +// {{$k.Name}}DistinctReducer returns the distinct points in a series. +type {{$k.Name}}DistinctReducer struct { + m map[{{$k.Type}}]{{$k.Name}}Point +} + +// New{{$k.Name}}DistinctReducer creates a new {{$k.Name}}DistinctReducer. +func New{{$k.Name}}DistinctReducer() *{{$k.Name}}DistinctReducer { + return &{{$k.Name}}DistinctReducer{m: make(map[{{$k.Type}}]{{$k.Name}}Point)} +} + +// Aggregate{{$k.Name}} aggregates a point into the reducer. +func (r *{{$k.Name}}DistinctReducer) Aggregate{{$k.Name}}(p *{{$k.Name}}Point) { + if _, ok := r.m[p.Value]; !ok { + r.m[p.Value] = *p + } +} + +// Emit emits the distinct points that have been aggregated into the reducer. +func (r *{{$k.Name}}DistinctReducer) Emit() []{{$k.Name}}Point { + points := make([]{{$k.Name}}Point, 0, len(r.m)) + for _, p := range r.m { + points = append(points, {{$k.Name}}Point{Time: p.Time, Value: p.Value}) + } + sort.Sort({{$k.name}}Points(points)) + return points +} + +{{end}}{{end}} From f7f35affd297bb2defad6739c3d8ec1882c7e5c0 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 12 Apr 2016 13:22:03 -0600 Subject: [PATCH 2/2] add distinct iterator benchmark --- influxql/call_iterator_test.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/influxql/call_iterator_test.go b/influxql/call_iterator_test.go index a31d66f60af..5d5117cdac4 100644 --- a/influxql/call_iterator_test.go +++ b/influxql/call_iterator_test.go @@ -604,6 +604,33 @@ func benchmarkCallIterator(b *testing.B, opt influxql.IteratorOptions, pointN in } } +func BenchmarkDistinctIterator_1K(b *testing.B) { benchmarkDistinctIterator(b, 1000) } +func BenchmarkDistinctIterator_100K(b *testing.B) { benchmarkDistinctIterator(b, 100000) } +func BenchmarkDistinctIterator_1M(b *testing.B) { benchmarkDistinctIterator(b, 1000000) } + +func benchmarkDistinctIterator(b *testing.B, pointN int) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + // Create a lightweight point generator. + p := influxql.FloatPoint{Name: "cpu"} + input := FloatPointGenerator{ + N: pointN, + Fn: func(i int) *influxql.FloatPoint { + p.Value = float64(i % 10) + return &p + }, + } + + // Execute call against input. + itr, err := influxql.NewDistinctIterator(&input, influxql.IteratorOptions{}) + if err != nil { + b.Fatal(err) + } + influxql.DrainIterator(itr) + } +} + type FloatPointGenerator struct { i int N int