Skip to content

Commit

Permalink
(bug/functions/transformations): fix union to be compatible with hete…
Browse files Browse the repository at this point in the history
…rogeneous table types (#246)

* (bug/functions/transformations): fix union to be compatible with heterogeneous table types

added LevelColumns to table interface, fixed union and added a batch of tests that came up mid-stream
  • Loading branch information
aanthony1243 authored Nov 6, 2018
1 parent 7009b18 commit 69370f6
Show file tree
Hide file tree
Showing 13 changed files with 2,568 additions and 13 deletions.
98 changes: 94 additions & 4 deletions execute/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,13 @@ func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error {
return nil
}

return t.Do(func(cr flux.ColReader) error {
if err := t.Do(func(cr flux.ColReader) error {
return AppendMappedCols(cr, builder, colMap)
})
}); err != nil {
return err
}

return builder.LevelColumns()
}

// AppendTable appends data from table t onto builder.
Expand All @@ -151,9 +155,14 @@ func AppendTable(t flux.Table, builder TableBuilder) error {
// AppendMappedCols appends all columns from cr onto builder.
// The colMap is a map of builder column index to cr column index.
func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error {
if len(colMap) != len(builder.Cols()) {
return errors.New("AppendMappedCols: colMap must have an entry for each table builder column")
}
for j := range builder.Cols() {
if err := AppendCol(j, colMap[j], cr, builder); err != nil {
return err
if colMap[j] >= 0 {
if err := AppendCol(j, colMap[j], cr, builder); err != nil {
return err
}
}
}
return nil
Expand Down Expand Up @@ -460,6 +469,10 @@ type TableBuilder interface {
GrowStrings(j, n int) error
GrowTimes(j, n int) error

// LevelColumns will check for columns that are too short and Grow them
// so that each column is of uniform size.
LevelColumns() error

// Sort the rows of the by the values of the columns in the order listed.
Sort(cols []string, desc bool)

Expand Down Expand Up @@ -582,6 +595,83 @@ func (b ColListTableBuilder) AddCol(c flux.ColMeta) (int, error) {
return newIdx, nil
}

func (b ColListTableBuilder) LevelColumns() error {

for idx, c := range b.table.colMeta {
switch c.Type {
case flux.TBool:
toGrow := b.NRows() - len(b.table.Bools(idx))
if toGrow > 0 {
if err := b.GrowBools(idx, toGrow); err != nil {
return err
}
}

if toGrow < 0 {
_ = fmt.Errorf("column %s is longer than expected length of table", c.Label)
}
case flux.TInt:
toGrow := b.NRows() - len(b.table.Ints(idx))
if toGrow > 0 {
if err := b.GrowInts(idx, toGrow); err != nil {
return err
}
}

if toGrow < 0 {
_ = fmt.Errorf("column %s is longer than expected length of table", c.Label)
}
case flux.TUInt:
toGrow := b.NRows() - len(b.table.UInts(idx))
if toGrow > 0 {
if err := b.GrowUInts(idx, toGrow); err != nil {
return err
}
}

if toGrow < 0 {
_ = fmt.Errorf("column %s is longer than expected length of table", c.Label)
}
case flux.TFloat:
toGrow := b.NRows() - len(b.table.Floats(idx))
if toGrow > 0 {
if err := b.GrowFloats(idx, toGrow); err != nil {
return err
}
}

if toGrow < 0 {
_ = fmt.Errorf("column %s is longer than expected length of table", c.Label)
}
case flux.TString:
toGrow := b.NRows() - len(b.table.Strings(idx))
if toGrow > 0 {
if err := b.GrowStrings(idx, toGrow); err != nil {
return err
}
}

if toGrow < 0 {
_ = fmt.Errorf("column %s is longer than expected length of table", c.Label)
}
case flux.TTime:
toGrow := b.NRows() - len(b.table.Times(idx))
if toGrow > 0 {
if err := b.GrowTimes(idx, toGrow); err != nil {
return err
}
}

if toGrow < 0 {
_ = fmt.Errorf("column %s is longer than expected length of table", c.Label)
}
default:
PanicUnknownType(c.Type)
}
}
return nil
}

func (b ColListTableBuilder) SetBool(i int, j int, value bool) error {
if err := b.checkCol(j, flux.TBool); err != nil {
return err
Expand Down
12 changes: 12 additions & 0 deletions functions/transformations/testdata/join_across_measurements.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
memUsed = from(bucket: "telegraf/autogen")
|> range(start:2018-05-22T19:53:00Z, stop:2018-05-22T19:55:00Z)
|> filter(fn: (r) => r._measurement == "mem" AND r._field == "used" )

procTotal = from(bucket: "telegraf/autogen")
|> range(start:2018-05-22T19:53:00Z, stop:2018-05-22T19:55:00Z)
|> filter(fn: (r) =>
r._measurement == "processes" AND
r._field == "total"
)

join(tables: {mem:memUsed, proc:procTotal}, on: ["_time", "_stop", "_start", "host"]) |> yield()
1,093 changes: 1,093 additions & 0 deletions functions/transformations/testdata/join_across_measurements.in.csv

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#datatype,string,long,string,string,string,string,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,long,string
#group,false,false,true,true,true,true,false,false,false,false,false,true
#default,_result,,,,,,,,,,,
,result,table,_field_mem,_field_proc,_measurement_mem,_measurement_proc,_start,_stop,_time,_value_mem,_value_proc,host
,,0,used,total,mem,processes,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:26Z,10832478208,417,host.local
,,0,used,total,mem,processes,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:36Z,10774417408,416,host.local
,,0,used,total,mem,processes,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:46Z,10718109696,416,host.local
,,0,used,total,mem,processes,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:53:56Z,10779672576,418,host.local
,,0,used,total,mem,processes,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:06Z,10785837056,418,host.local
,,0,used,total,mem,processes,2018-05-22T19:53:24.421470485Z,2018-05-22T19:54:24.421470485Z,2018-05-22T19:54:16Z,10731827200,417,host.local

6 changes: 6 additions & 0 deletions functions/transformations/testdata/show_all_tag_keys.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from(bucket:"telegraf")
|> range(start:2018-05-22T19:53:26Z)
|> keys(except: ["_time","_value","_start","_stop"])
|> group(none: true)
|> distinct()
|> map(fn:(r) => r._value)
Loading

0 comments on commit 69370f6

Please sign in to comment.