Skip to content

Commit

Permalink
feat(stdlib): create a version of map that is columnar
Browse files Browse the repository at this point in the history
The columnar version of map will run map over an entire table chunk
rather than running it and then appending per row. This version of map
should be more efficient in the normal cases and be more amenable to
vectorized functions.

Related to #4186.
  • Loading branch information
jsternberg committed Dec 8, 2021
1 parent ed2e64a commit a99745c
Show file tree
Hide file tree
Showing 9 changed files with 520 additions and 0 deletions.
3 changes: 3 additions & 0 deletions array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ func (a *String) ValueLen(i int) int {
}
return len(a.value)
}
func (a *String) IsConstant() bool {
return a.data == nil
}

type sliceable interface {
Slice(i, j int) Interface
Expand Down
14 changes: 14 additions & 0 deletions internal/feature/flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions internal/feature/flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@
key: queryConcurrencyLimit
default: 0
contact: Jonathan Sternberg

- name: Vectorized Map
description: Enables the version of map that supports vectorized functions
key: vectorizedMap
default: false
contact: Jonathan Sternberg
6 changes: 6 additions & 0 deletions stdlib/universe/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdata/flux/compiler"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/internal/feature"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/runtime"
Expand Down Expand Up @@ -101,6 +102,11 @@ func createMapTransformation(id execute.DatasetID, mode execute.AccumulationMode
if !ok {
return nil, nil, errors.Newf(codes.Internal, "invalid spec type %T", spec)
}

if feature.VectorizedMap().Enabled(a.Context()) {
return newMapTransformation2(a.Context(), id, s, a.Allocator())
}

cache := execute.NewTableBuilderCache(a.Allocator())
d := execute.NewDataset(id, mode, cache)
t, err := NewMapTransformation(a.Context(), s, d, cache)
Expand Down
129 changes: 129 additions & 0 deletions stdlib/universe/map2.gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: map2.gen.go.tmpl

package universe

import (
"fmt"

"github.com/influxdata/flux/array"
)

func (m *mapTransformation2) isConstant(arr array.Interface) bool {
switch arr := arr.(type) {
case *array.Int:
return m.isIntConstant(arr)
case *array.Uint:
return m.isUintConstant(arr)
case *array.Float:
return m.isFloatConstant(arr)
case *array.String:
return m.isStringConstant(arr)
case *array.Boolean:
return m.isBooleanConstant(arr)

default:
panic(fmt.Errorf("unsupported array datat ype: %s", arr.DataType()))
}
}

func (m *mapTransformation2) isIntConstant(arr *array.Int) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

// All values are non-null so check if they are all the same.
v := arr.Value(0)
for i, n := 1, arr.Len(); i < n; i++ {
if arr.Value(i) != v {
return false
}
}
return true

}

func (m *mapTransformation2) isUintConstant(arr *array.Uint) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

// All values are non-null so check if they are all the same.
v := arr.Value(0)
for i, n := 1, arr.Len(); i < n; i++ {
if arr.Value(i) != v {
return false
}
}
return true

}

func (m *mapTransformation2) isFloatConstant(arr *array.Float) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

// All values are non-null so check if they are all the same.
v := arr.Value(0)
for i, n := 1, arr.Len(); i < n; i++ {
if arr.Value(i) != v {
return false
}
}
return true

}

func (m *mapTransformation2) isStringConstant(arr *array.String) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

return arr.IsConstant()

}

func (m *mapTransformation2) isBooleanConstant(arr *array.Boolean) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

// All values are non-null so check if they are all the same.
v := arr.Value(0)
for i, n := 1, arr.Len(); i < n; i++ {
if arr.Value(i) != v {
return false
}
}
return true

}
43 changes: 43 additions & 0 deletions stdlib/universe/map2.gen.go.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package universe

import (
"fmt"

"github.com/influxdata/flux/array"
)

func (m *mapTransformation2) isConstant(arr array.Interface) bool {
switch arr := arr.(type) {
{{range .}}case *array.{{.Name}}:
return m.is{{.Name}}Constant(arr)
{{end}}
default:
panic(fmt.Errorf("unsupported array datat ype: %s", arr.DataType()))
}
}

{{range .}}
func (m *mapTransformation2) is{{.Name}}Constant(arr *array.{{.Name}}) bool {
// If all values are null, then that is still constant.
if arr.NullN() == arr.Len() {
return true
} else if arr.NullN() > 0 {
// At least one value is null, but not all so
// not constant by definition.
return false
}

{{if eq .Name "String"}}
return arr.IsConstant()
{{else}}
// All values are non-null so check if they are all the same.
v := arr.Value(0)
for i, n := 1, arr.Len(); i < n; i++ {
if arr.Value(i) != v {
return false
}
}
return true
{{end}}
}
{{end}}
Loading

0 comments on commit a99745c

Please sign in to comment.