Skip to content

Commit

Permalink
feat(stdlib): promote various features to defaults (#5005)
Browse files Browse the repository at this point in the history
This promotes the narrow state tracking and limit transformations to be
the defaults and removes their old implementations.

This also turns the group transformation group and optimized aggregate
window to default to true. I'm keeping the feature flags for these two
as these ones are more likely to require an unexpected problem and I
would like to keep a way for them to be turned off quickly. These two
will likely have their old versions removed next week.
  • Loading branch information
jsternberg authored Jul 25, 2022
1 parent 8be0a72 commit 05a1065
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 727 deletions.
32 changes: 2 additions & 30 deletions internal/feature/flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 2 additions & 14 deletions internal/feature/flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
- name: Group Transformation Group
description: Enable GroupTransformation interface for the group function
key: groupTransformationGroup
default: false
default: true
contact: Sean Brickley

- name: Query Concurrency Limit
Expand Down Expand Up @@ -55,21 +55,9 @@
- name: Optimize Aggregate Window
description: Enables a version of aggregateWindow written in Go
key: optimizeAggregateWindow
default: false
default: true
contact: Jonathan Sternberg

- name: Narrow Transformation Limit
description: Enable the NarrowStateTransformation implementation of limit
key: narrowTransformationLimit
default: false
contact: Owen Nelson

- name: Optimize State Tracking
description: Enable implementation of NarrowStateTransformation of stateTracking
key: optimizeStateTracking
default: false
contact: Sean Brickley

- name: Label polymorphism
description: Enables label polymorphism in the type system
key: labelPolymorphism
Expand Down
2 changes: 1 addition & 1 deletion lang/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ func TestQueryTracing(t *testing.T) {
msgCount int
}{
{opName: "*universe.filterTransformation", msgCount: 2},
{opName: "*universe.groupTransformation", msgCount: 3},
{opName: "*universe.groupTransformationAdapter", msgCount: 3},
{opName: "*universe.mapTransformation", msgCount: 3},
{opName: "*array.tableSource"},
}
Expand Down
181 changes: 17 additions & 164 deletions stdlib/universe/limit.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package universe

import (
"context"

arrowmem "github.com/apache/arrow/go/v7/arrow/memory"
"github.com/influxdata/flux"
"github.com/influxdata/flux/array"
Expand All @@ -11,7 +9,6 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/internal/execute/table"
"github.com/influxdata/flux/internal/feature"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/runtime"
Expand Down Expand Up @@ -101,165 +98,31 @@ func createLimitTransformation(id execute.DatasetID, mode execute.AccumulationMo
if !ok {
return nil, nil, errors.Newf(codes.Internal, "invalid spec type %T", spec)
}

if feature.NarrowTransformationLimit().Enabled(a.Context()) {
return NewNarrowLimitTransformation(s, id, a.Allocator())
}

t, d := NewLimitTransformation(s, id)
return t, d, nil
return NewLimitTransformation(s, id, a.Allocator())
}

type limitTransformation struct {
execute.ExecutionNode
d *execute.PassthroughDataset
n, offset int
type limitState struct {
n int
offset int
}

func NewLimitTransformation(spec *LimitProcedureSpec, id execute.DatasetID) (execute.Transformation, execute.Dataset) {
d := execute.NewPassthroughDataset(id)
func NewLimitTransformation(
spec *LimitProcedureSpec,
id execute.DatasetID,
mem memory.Allocator,
) (execute.Transformation, execute.Dataset, error) {
t := &limitTransformation{
d: d,
n: int(spec.N),
offset: int(spec.Offset),
}
return t, d
}

func (t *limitTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error {
return t.d.RetractTable(key)
}

func (t *limitTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
tbl, err := table.Stream(tbl.Key(), tbl.Cols(), func(ctx context.Context, w *table.StreamWriter) error {
return t.limitTable(ctx, w, tbl)
})
if err != nil {
return err
}
return t.d.Process(tbl)
}

func (t *limitTransformation) limitTable(ctx context.Context, w *table.StreamWriter, tbl flux.Table) error {
n, offset := t.n, t.offset
return tbl.Do(func(cr flux.ColReader) error {
if n <= 0 {
return nil
}
l := cr.Len()
if l <= offset {
offset -= l
// Skip entire batch
return nil
}
start := offset
stop := l
count := stop - start
if count > n {
count = n
stop = start + count
}

// Reduce the number of rows we will keep from the
// next buffer and set the offset to zero as it has been
// entirely consumed.
n -= count
offset = 0

vs := make([]array.Array, len(cr.Cols()))
for j := range vs {
arr := table.Values(cr, j)
if arr.Len() == count {
arr.Retain()
} else {
arr = arrow.Slice(arr, int64(start), int64(stop))
}
vs[j] = arr
}
return w.Write(vs)
})
}

func appendSlicedCols(reader flux.ColReader, builder execute.TableBuilder, start, stop int) error {
for j, c := range reader.Cols() {
if j > len(builder.Cols()) {
return errors.New(codes.Internal, "builder index out of bounds")
}

switch c.Type {
case flux.TBool:
s := arrow.BoolSlice(reader.Bools(j), start, stop)
if err := builder.AppendBools(j, s); err != nil {
s.Release()
return err
}
s.Release()
case flux.TInt:
s := arrow.IntSlice(reader.Ints(j), start, stop)
if err := builder.AppendInts(j, s); err != nil {
s.Release()
return err
}
s.Release()
case flux.TUInt:
s := arrow.UintSlice(reader.UInts(j), start, stop)
if err := builder.AppendUInts(j, s); err != nil {
s.Release()
return err
}
s.Release()
case flux.TFloat:
s := arrow.FloatSlice(reader.Floats(j), start, stop)
if err := builder.AppendFloats(j, s); err != nil {
s.Release()
return err
}
s.Release()
case flux.TString:
s := arrow.StringSlice(reader.Strings(j), start, stop)
if err := builder.AppendStrings(j, s); err != nil {
s.Release()
return err
}
s.Release()
case flux.TTime:
s := arrow.IntSlice(reader.Times(j), start, stop)
if err := builder.AppendTimes(j, s); err != nil {
s.Release()
return err
}
s.Release()
default:
execute.PanicUnknownType(c.Type)
}
}

return nil
}

func (t *limitTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error {
return t.d.UpdateWatermark(mark)
}
func (t *limitTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error {
return t.d.UpdateProcessingTime(pt)
}
func (t *limitTransformation) Finish(id execute.DatasetID, err error) {
t.d.Finish(err)
}

type limitState struct {
n int
offset int
}
type limitTransformationAdapter struct {
limitTransformation *limitTransformation
return execute.NewNarrowStateTransformation(id, t, mem)
}

func (*limitTransformationAdapter) Close() error {
return nil
type limitTransformation struct {
n, offset int
}

func (t *limitTransformationAdapter) Process(
func (t *limitTransformation) Process(
chunk table.Chunk,
state interface{},
dataset *execute.TransportDataset,
Expand All @@ -271,14 +134,14 @@ func (t *limitTransformationAdapter) Process(
// include a value for `state`. Initialization happens here then is passed
// in/out for the subsequent calls.
if state == nil {
state_ = &limitState{n: t.limitTransformation.n, offset: t.limitTransformation.offset}
state_ = &limitState{n: t.n, offset: t.offset}
} else {
state_ = state.(*limitState)
}
return t.processChunk(chunk, state_, dataset)
}

func (t *limitTransformationAdapter) processChunk(
func (t *limitTransformation) processChunk(
chunk table.Chunk,
state *limitState,
dataset *execute.TransportDataset,
Expand Down Expand Up @@ -348,16 +211,6 @@ func (t *limitTransformationAdapter) processChunk(
return state, true, nil
}

func NewNarrowLimitTransformation(
spec *LimitProcedureSpec,
id execute.DatasetID,
mem memory.Allocator,
) (execute.Transformation, execute.Dataset, error) {
t := &limitTransformationAdapter{
limitTransformation: &limitTransformation{
n: int(spec.N),
offset: int(spec.Offset),
},
}
return execute.NewNarrowStateTransformation(id, t, mem)
func (*limitTransformation) Close() error {
return nil
}
Loading

0 comments on commit 05a1065

Please sign in to comment.