Skip to content

Commit

Permalink
fix: Don't include null columns when unpivoting (#5082)
Browse files Browse the repository at this point in the history
* fix: Don't include null columns when unpivoting

The pivoted data we get from IOx may contain fields with nulls which we must exclude from the unpivoted chunk so they do not show up as extra rows with `null` as the `_value`.

* fix: Handle leading nulls in a chunk in unpivot
  • Loading branch information
Markus Westerlind authored Aug 18, 2022
1 parent 701120f commit 672a675
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 12 deletions.
2 changes: 1 addition & 1 deletion libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ var sourceHashes = map[string]string{
"stdlib/experimental/diff_test.flux": "9864f41f9c95999a071890f7ec1750b8e01cef082f7bb708443bf4f4bd59af49",
"stdlib/experimental/distinct_test.flux": "c7358d31972d0931aef6735ea94d901827c13fbaaeb9b02ff255391b5f95ea30",
"stdlib/experimental/experimental.flux": "7d902e7d0a142d94a0970ba96317ccd8050f2381551da2f43039da0bb6c90a1b",
"stdlib/experimental/experimental_test.flux": "ae1135fa59e0f3487b8360ccda551e1b1caf881ad71c2d260342dffcdbe829d6",
"stdlib/experimental/experimental_test.flux": "b417f361be23e610b6caffa266c40c421b19dedc3289ce064bb065cb0bcd825c",
"stdlib/experimental/fill_test.flux": "467ac288515415093ef50e70118c651e574d3fd50fbd157cc4795ce1248a54f8",
"stdlib/experimental/first_test.flux": "3bd1ff03bac6a45a3c525abb5ded3377f08195b6a5094caa42c8fb8b96aa6268",
"stdlib/experimental/geo/asTracks_test.flux": "3b33b115eacb39e04177d552cc0180f8969a75832bf457267baceb6d506a6321",
Expand Down
43 changes: 43 additions & 0 deletions stdlib/experimental/experimental_test.flux
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package experimental_test
import "array"
import "csv"
import "experimental"
import "internal/debug"
import "testing"

testcase addDuration_to_time {
Expand Down Expand Up @@ -283,3 +284,45 @@ testcase unpivot_3_columns {
testing.diff(got: got, want: want)
|> yield()
}

testcase unpivot_with_nulls {
input =
array.from(
rows: [
{_time: 2018-12-18T20:52:33Z, a: 1.0, b: debug.null(type: "string")},
{_time: 2018-12-18T20:52:33Z, a: debug.null(type: "float"), b: "abc"},
],
)

want =
array.from(rows: [{_time: 2018-12-18T20:52:33Z, _field: "a", _value: 1.0}])
|> group(columns: ["_field"])

got =
input
|> experimental.unpivot()
|> filter(fn: (r) => r._field == "a")

testing.diff(want, got)
}

testcase unpivot_with_nulls_2 {
input =
array.from(
rows: [
{_time: 2018-12-18T20:52:33Z, a: debug.null(type: "float"), b: "abc"},
{_time: 2018-12-18T20:52:33Z, a: 1.0, b: debug.null(type: "string")},
],
)

want =
array.from(rows: [{_time: 2018-12-18T20:52:33Z, _field: "a", _value: 1.0}])
|> group(columns: ["_field"])

got =
input
|> experimental.unpivot()
|> filter(fn: (r) => r._field == "a")

testing.diff(want, got)
}
34 changes: 23 additions & 11 deletions stdlib/experimental/unpivot.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (t *unpivotTransformation) Process(chunk table.Chunk, d *execute.TransportD

timeColumn := -1
for j, c := range chunk.Cols() {
if c.Label == "_time" {
if c.Label == execute.DefaultTimeColLabel {
timeColumn = j
break
}
Expand All @@ -99,16 +99,21 @@ func (t *unpivotTransformation) Process(chunk table.Chunk, d *execute.TransportD
}

for i, c := range chunk.Cols() {
if chunk.Key().HasCol(c.Label) || c.Label == "_time" {
if chunk.Key().HasCol(c.Label) || c.Label == execute.DefaultTimeColLabel {
continue
}

chunkValues := chunk.Values(i)
chunkValues.Retain()

newChunkLen := chunk.Len() - chunkValues.NullN()

groupKey := chunk.Key()
columns := groupKey.Cols()
columns = append(columns,
flux.ColMeta{Label: "_field", Type: flux.TString},
flux.ColMeta{Label: "_time", Type: flux.TTime},
flux.ColMeta{Label: "_value", Type: c.Type},
flux.ColMeta{Label: execute.DefaultTimeColLabel, Type: flux.TTime},
flux.ColMeta{Label: execute.DefaultValueColLabel, Type: c.Type},
)

groupCols := []flux.ColMeta{{Label: "_field", Type: flux.TString}}
Expand All @@ -131,19 +136,26 @@ func (t *unpivotTransformation) Process(chunk table.Chunk, d *execute.TransportD
break
}
}
values := chunk.Values(fromColumn)
values.Retain()

oldValues := chunk.Values(fromColumn)
var values array.Array
if newChunkLen == chunk.Len() {
values = oldValues
values.Retain()
} else {
// We have nulls for some of the fields which we must exclude from the unpivoted
// output, otherwise they show up as extra rows
values = array.Slice(oldValues, 0, newChunkLen)
}
buffer.Values[toColumn] = values
}

buffer.Values[len(buffer.Values)-3] = array.StringRepeat(c.Label, chunk.Len(), mem)
buffer.Values[len(buffer.Values)-3] = array.StringRepeat(c.Label, newChunkLen, mem)

times := chunk.Values(timeColumn)
times.Retain()
times := array.CopyValidValues(mem, chunk.Values(timeColumn))
buffer.Values[len(buffer.Values)-2] = times

values := chunk.Values(i)
values.Retain()
values := array.CopyValidValues(mem, chunk.Values(i))
buffer.Values[len(buffer.Values)-1] = values

out := table.ChunkFromBuffer(buffer)
Expand Down

0 comments on commit 672a675

Please sign in to comment.