Skip to content

Commit

Permalink
feat(stdlib): create a version of map that is columnar and supports v…
Browse files Browse the repository at this point in the history
…ectorization

The columnar version of map will run map over an entire table chunk
rather than running it and then appending per row. This version of map
should be more efficient in the normal cases. In addition, it also
supports running vectorized functions when vectorization is available.
  • Loading branch information
jsternberg committed Feb 8, 2022
1 parent 24fe9cc commit 3d455fd
Show file tree
Hide file tree
Showing 16 changed files with 1,622 additions and 619 deletions.
3 changes: 3 additions & 0 deletions array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ func (a *String) ValueLen(i int) int {
}
return len(a.value)
}
func (a *String) IsConstant() bool {
return a.data == nil
}

type sliceable interface {
Slice(i, j int) Interface
Expand Down
1 change: 1 addition & 0 deletions execute/executetest/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var testFlags = map[string]interface{}{
"aggregateTransformationTransport": true,
"groupTransformationGroup": true,
"optimizeUnionTransformation": true,
"vectorizedMap": true,
}

type TestFlagger map[string]interface{}
Expand Down
30 changes: 19 additions & 11 deletions execute/row_fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ type compiledFn struct {
recordType semantic.MonoType
cols []flux.ColMeta
extraTypes map[string]semantic.MonoType
vectorized bool
}

func (f *compiledFn) isCacheHit(cols []flux.ColMeta, extraTypes map[string]semantic.MonoType) bool {
func (f *compiledFn) isCacheHit(cols []flux.ColMeta, extraTypes map[string]semantic.MonoType, vectorized bool) bool {
if f.vectorized != vectorized {
return false
}
if len(f.cols) != len(cols) {
return false
}
Expand Down Expand Up @@ -57,13 +61,16 @@ func newDynamicFn(fn *semantic.FunctionExpression, scope compiler.Scope) dynamic
}

// typeof returns an object monotype that matches the current column data.
func (f *dynamicFn) typeof(cols []flux.ColMeta) (semantic.MonoType, error) {
func (f *dynamicFn) typeof(cols []flux.ColMeta, vectorized bool) (semantic.MonoType, error) {
properties := make([]semantic.PropertyType, len(cols))
for i, c := range cols {
vtype := flux.SemanticType(c.Type)
if vtype.Kind() == semantic.Unknown {
return semantic.MonoType{}, errors.Newf(codes.Internal, "unknown column type: %s", c.Type)
}
if vectorized {
vtype = semantic.NewVectorType(vtype)
}
properties[i] = semantic.PropertyType{
Key: []byte(c.Label),
Value: vtype,
Expand All @@ -72,12 +79,12 @@ func (f *dynamicFn) typeof(cols []flux.ColMeta) (semantic.MonoType, error) {
return semantic.NewObjectType(properties), nil
}

func (f *dynamicFn) compileFunction(cols []flux.ColMeta, extraTypes map[string]semantic.MonoType) error {
func (f *dynamicFn) compileFunction(cols []flux.ColMeta, extraTypes map[string]semantic.MonoType, vectorized bool) error {

// If the types have not changed we do not need to recompile, just use the cached version
if f.compiledFn == nil || !f.compiledFn.isCacheHit(cols, extraTypes) {
if f.compiledFn == nil || !f.compiledFn.isCacheHit(cols, extraTypes, vectorized) {
// Prepare the type of the record column.
recordType, err := f.typeof(cols)
recordType, err := f.typeof(cols, vectorized)
if err != nil {
return err
}
Expand All @@ -104,13 +111,14 @@ func (f *dynamicFn) compileFunction(cols []flux.ColMeta, extraTypes map[string]s
recordType: recordType,
cols: cols,
extraTypes: extraTypes,
vectorized: vectorized,
}
}
return nil
}

func (f *dynamicFn) prepare(cols []flux.ColMeta, extraTypes map[string]semantic.MonoType) (preparedFn, error) {
err := f.compileFunction(cols, extraTypes)
func (f *dynamicFn) prepare(cols []flux.ColMeta, extraTypes map[string]semantic.MonoType, vectorized bool) (preparedFn, error) {
err := f.compileFunction(cols, extraTypes, vectorized)
if err != nil {
return preparedFn{}, err
}
Expand Down Expand Up @@ -206,7 +214,7 @@ func NewTablePredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope)
}

func (f *TablePredicateFn) Prepare(tbl flux.Table) (*TablePredicatePreparedFn, error) {
fn, err := f.prepare(tbl.Key().Cols(), nil)
fn, err := f.prepare(tbl.Key().Cols(), nil, false)
if err != nil {
return nil, err
} else if fn.returnType().Nature() != semantic.Bool {
Expand Down Expand Up @@ -253,7 +261,7 @@ func NewRowPredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope) *R
}

func (f *RowPredicateFn) Prepare(cols []flux.ColMeta) (*RowPredicatePreparedFn, error) {
fn, err := f.prepare(cols, nil)
fn, err := f.prepare(cols, nil, false)
if err != nil {
return nil, err
} else if fn.returnType().Nature() != semantic.Bool {
Expand Down Expand Up @@ -310,7 +318,7 @@ func NewRowMapFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowMapF
}

func (f *RowMapFn) Prepare(cols []flux.ColMeta) (*RowMapPreparedFn, error) {
fn, err := f.prepare(cols, nil)
fn, err := f.prepare(cols, nil, false)
if err != nil {
return nil, err
} else if k := fn.returnType().Nature(); k != semantic.Object {
Expand Down Expand Up @@ -348,7 +356,7 @@ func NewRowReduceFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowR
}

func (f *RowReduceFn) Prepare(cols []flux.ColMeta, reducerType map[string]semantic.MonoType) (*RowReducePreparedFn, error) {
fn, err := f.prepare(cols, reducerType)
fn, err := f.prepare(cols, reducerType, false)
if err != nil {
return nil, err
}
Expand Down
85 changes: 85 additions & 0 deletions execute/vector_fn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package execute

import (
"context"

"github.com/influxdata/flux"
"github.com/influxdata/flux/array"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/compiler"
"github.com/influxdata/flux/execute/table"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
)

type VectorMapFn struct {
dynamicFn
}

func NewVectorMapFn(fn *semantic.FunctionExpression, scope compiler.Scope) *VectorMapFn {
return &VectorMapFn{
dynamicFn: newDynamicFn(fn, scope),
}
}

func (f *VectorMapFn) Prepare(cols []flux.ColMeta) (*VectorMapPreparedFn, error) {
fn, err := f.prepare(cols, nil, true)
if err != nil {
return nil, err
} else if k := fn.returnType().Nature(); k != semantic.Object {
return nil, errors.Newf(codes.Invalid, "map function must return an object, got %s", k.String())
}
return &VectorMapPreparedFn{
vectorFn: vectorFn{preparedFn: fn},
}, nil
}

type VectorMapPreparedFn struct {
vectorFn
}

func (f *VectorMapPreparedFn) Type() semantic.MonoType {
return f.fn.Type()
}

type vectorFn struct {
preparedFn
}

func (f *vectorFn) Eval(ctx context.Context, chunk table.Chunk) ([]array.Interface, error) {
for j, col := range chunk.Cols() {
arr := chunk.Values(j)
arr.Retain()
v := values.NewVectorValue(arr, flux.SemanticType(col.Type))
f.arg0.Set(col.Label, v)
}

res, err := f.fn.Eval(ctx, f.args)
if err != nil {
return nil, err
}

// Map the return object to the expected order from type inference.
// The compiler should have done this by itself, but it doesn't at the moment.
// When the compiler gets refactored so it returns records in the same order
// as type inference, we can remove this and just do a copy by index.
retType := f.returnType()
n := res.Object().Len()
vs := make([]array.Interface, n)
for i := 0; i < n; i++ {
prop, err := retType.RecordProperty(i)
if err != nil {
return nil, err
}

vec, ok := res.Object().Get(prop.Name())
if !ok {
return nil, errors.Newf(codes.Internal, "column %s is not valid", prop.Name())
}
vs[i] = vec.(values.Vector).Arr()
vs[i].Retain()
}
res.Release()
return vs, nil
}
14 changes: 14 additions & 0 deletions internal/feature/flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions internal/feature/flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,9 @@
key: mqttPoolDialer
default: false
contact: Jonathan Sternberg

- name: Vectorized Map
description: Enables the version of map that supports vectorized functions
key: vectorizedMap
default: false
contact: Jonathan Sternberg
4 changes: 2 additions & 2 deletions libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/merge_filter_flag_off_test.flux": "6a6da4adbc57d6b26ff00cfc4b107f985bdb5fd706bd521dc88df288b0dd4369",
"stdlib/universe/merge_filter_flag_on_test.flux": "50c2e77b7e6a2efbcabc560e7345d5355df037eb1d4c4b0a8ab32231b5d68ae4",
"stdlib/universe/merge_filter_test.flux": "cfa7e13431c0571a341f8fc689b87dff6f2d2c095f3473dcbe3131e98243f604",
"stdlib/universe/meta_query_keys_test.flux": "4df4d7271c77f3bc8cfabc03523c7256cd05cbb34c3b35e42a968c3bce7663c3",
"stdlib/universe/meta_query_keys_test.flux": "7d7c489c09ed71287d09718e640857b85c8ad8705894c5ba3d5a2202636d428c",
"stdlib/universe/min_test.flux": "cd4aa50cd2bf2f389acd465e56c270d990cf2f5ecfdc720ee8651e825c5b2319",
"stdlib/universe/mode_string_test.flux": "6b6514f3e1ac48dd187fa97c372916048096ba7e3e194c6900aa67996e632370",
"stdlib/universe/mode_test.flux": "b43a1bbb195efd144345ab86c254d371de3d660a799074c3d2db315a560f8b2d",
Expand Down Expand Up @@ -592,7 +592,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/set_test.flux": "47a0b5485c0b06e6f055faa86474827816d36f59726a25aa0422508dbad6f995",
"stdlib/universe/shift_negative_duration_test.flux": "2321ab9d41c0e93322c840f638cbb342a843c34b0db73917ea49bb7687a36411",
"stdlib/universe/shift_test.flux": "480d5bcfa5f7d6466b13857744f668bd635dd84c0fe20e0aa4392ab3fa8283d1",
"stdlib/universe/show_all_tag_keys_test.flux": "0ed7fb82c276ffbfd6b458fc1e460898348b1379a6316d278efc1c41d72debd4",
"stdlib/universe/show_all_tag_keys_test.flux": "d445d734111eed7518efd0e28469ed051bfa1c4e1d4610c7a6bd0589bf0bc099",
"stdlib/universe/simple_max_test.flux": "1196fa76e5f603b53f3ab78980d8810ca94e690b5585fec7c0f5725d7b16518b",
"stdlib/universe/skew_test.flux": "7530c2fc2e84e508749e3a05f97eebf592e7070ae1290a0103cbd0da54271f07",
"stdlib/universe/sort2_test.flux": "3b31fa92a30fafb5ef80e1e24697aa7b62e9385a5174ec255c98cdd5eac3ee1c",
Expand Down
Loading

0 comments on commit 3d455fd

Please sign in to comment.