Skip to content

Commit

Permalink
feat(stdlib): add an optimized version of the shift transformation (#…
Browse files Browse the repository at this point in the history
…4337)

This optimizes shift to use arrow tables and the narrow transformation
transport.
  • Loading branch information
jsternberg committed Dec 13, 2021
1 parent d2a9887 commit 348928b
Show file tree
Hide file tree
Showing 12 changed files with 709 additions and 0 deletions.
3 changes: 3 additions & 0 deletions array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ func (a *String) ValueLen(i int) int {
}
return len(a.value)
}
func (a *String) IsConstant() bool {
return a.data == nil
}

type sliceable interface {
Slice(i, j int) Interface
Expand Down
1 change: 1 addition & 0 deletions execute/executetest/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var testFlags = map[string]interface{}{
"narrowTransformationFilter": true,
"aggregateTransformationTransport": true,
"groupTransformationGroup": true,
"optimizeShiftTransformation": true,
}

type testFlagger struct{}
Expand Down
28 changes: 28 additions & 0 deletions internal/feature/flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions internal/feature/flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,15 @@
key: queryConcurrencyLimit
default: 0
contact: Jonathan Sternberg

- name: Optimize Shift Transformation
description: Enable optimized shift transformation
key: optimizeShiftTransformation
default: false
contact: Jonathan Sternberg

- name: Vectorized Map
description: Enables the version of map that supports vectorized functions
key: vectorizedMap
default: false
contact: Jonathan Sternberg
6 changes: 6 additions & 0 deletions stdlib/universe/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdata/flux/compiler"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/internal/feature"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/runtime"
Expand Down Expand Up @@ -101,6 +102,11 @@ func createMapTransformation(id execute.DatasetID, mode execute.AccumulationMode
if !ok {
return nil, nil, errors.Newf(codes.Internal, "invalid spec type %T", spec)
}

if feature.VectorizedMap().Enabled(a.Context()) {
return newMapTransformation2(a.Context(), id, s, a.Allocator())
}

cache := execute.NewTableBuilderCache(a.Allocator())
d := execute.NewDataset(id, mode, cache)
t, err := NewMapTransformation(a.Context(), s, d, cache)
Expand Down
129 changes: 129 additions & 0 deletions stdlib/universe/map2.gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: map2.gen.go.tmpl

package universe

import (
"fmt"

"github.com/influxdata/flux/array"
)

func (m *mapTransformation2) isConstant(arr array.Interface) bool {
switch arr := arr.(type) {
case *array.Int:
return m.isIntConstant(arr)
case *array.Uint:
return m.isUintConstant(arr)
case *array.Float:
return m.isFloatConstant(arr)
case *array.String:
return m.isStringConstant(arr)
case *array.Boolean:
return m.isBooleanConstant(arr)

default:
panic(fmt.Errorf("unsupported array datat ype: %s", arr.DataType()))
}
}

func (m *mapTransformation2) isIntConstant(arr *array.Int) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

// All values are non-null so check if they are all the same.
v := arr.Value(0)
for i, n := 1, arr.Len(); i < n; i++ {
if arr.Value(i) != v {
return false
}
}
return true

}

func (m *mapTransformation2) isUintConstant(arr *array.Uint) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

// All values are non-null so check if they are all the same.
v := arr.Value(0)
for i, n := 1, arr.Len(); i < n; i++ {
if arr.Value(i) != v {
return false
}
}
return true

}

func (m *mapTransformation2) isFloatConstant(arr *array.Float) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

// All values are non-null so check if they are all the same.
v := arr.Value(0)
for i, n := 1, arr.Len(); i < n; i++ {
if arr.Value(i) != v {
return false
}
}
return true

}

func (m *mapTransformation2) isStringConstant(arr *array.String) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

return arr.IsConstant()

}

func (m *mapTransformation2) isBooleanConstant(arr *array.Boolean) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

// All values are non-null so check if they are all the same.
v := arr.Value(0)
for i, n := 1, arr.Len(); i < n; i++ {
if arr.Value(i) != v {
return false
}
}
return true

}
43 changes: 43 additions & 0 deletions stdlib/universe/map2.gen.go.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package universe

import (
"fmt"

"github.com/influxdata/flux/array"
)

func (m *mapTransformation2) isConstant(arr array.Interface) bool {
switch arr := arr.(type) {
{{range .}}case *array.{{.Name}}:
return m.is{{.Name}}Constant(arr)
{{end}}
default:
panic(fmt.Errorf("unsupported array datat ype: %s", arr.DataType()))
}
}

{{range .}}
func (m *mapTransformation2) is{{.Name}}Constant(arr *array.{{.Name}}) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

{{if eq .Name "String"}}
return arr.IsConstant()
{{else}}
// All values are non-null so check if they are all the same.
v := arr.Value(0)
for i, n := 1, arr.Len(); i < n; i++ {
if arr.Value(i) != v {
return false
}
}
return true
{{end}}
}
{{end}}
Loading

0 comments on commit 348928b

Please sign in to comment.