diff --git a/libflux/go/libflux/buildinfo.gen.go b/libflux/go/libflux/buildinfo.gen.go index cf1a91fcc6..869fc6e751 100644 --- a/libflux/go/libflux/buildinfo.gen.go +++ b/libflux/go/libflux/buildinfo.gen.go @@ -429,7 +429,7 @@ var sourceHashes = map[string]string{ "stdlib/universe/aggregate_window_max_test.flux": "8ee5d927ef375e7ac3687ea1ea00a0e8add969c31357362d0032ee079c2ac906", "stdlib/universe/aggregate_window_mean_test.flux": "ba33848419748489bc330c67dfaa344fac757f8af782879c71e08b345f0f652f", "stdlib/universe/aggregate_window_median_test.flux": "2791ed98310aae23deab7cb79836e60539cd29810c67910c30e4b891356a2698", - "stdlib/universe/aggregate_window_test.flux": "5fb441b250506eeb65b778ee99d4cb0af2bf7cab1ea93c1eb2cd6d101edae198", + "stdlib/universe/aggregate_window_test.flux": "79ae9443c57f613fa7917b727f80f6a3fc985fcd95006ab2804c1f5d25c9b9d3", "stdlib/universe/cmo_test.flux": "3793f5cf21ae42879d6789cdc36bbcf29ddb99bf16af8a96775a9415fd8d1c09", "stdlib/universe/columns_test.flux": "fe641e3e747439a15fb2f4a1e4c56da6ad6a75109db4597f1aab806931f881c7", "stdlib/universe/contains_test.flux": "a1890287ebc1ad6d8c84561f7e0cb725048a1e287f49f5dceac196866a20ee01", diff --git a/stdlib/universe/aggregate_window.gen.go b/stdlib/universe/aggregate_window.gen.go index 35857beace..4e610050df 100644 --- a/stdlib/universe/aggregate_window.gen.go +++ b/stdlib/universe/aggregate_window.gen.go @@ -21,13 +21,37 @@ func (a *aggregateWindowSumInt) Aggregate(ts *array.Int, vs array.Array, start, b := array.NewIntBuilder(mem) b.Resize(stop.Len()) + // Check once if we should look for nulls in the input. + hasNulls := vs.NullN() > 0 + values := vs.(*array.Int) aggregateWindows(ts, start, stop, func(i, j int) { - var sum int64 - for ; i < j; i++ { - sum += values.Value(i) + var ( + sum int64 + isNull = hasNulls + ) + if hasNulls { + for ; i < j; i++ { + // If there are nulls, check if this is null. + if values.IsNull(i) { + continue + } + sum += values.Value(i) + isNull = false + } + } else { + // Skip the extra checks if we know there are no nulls. + for ; i < j; i++ { + sum += values.Value(i) + } + } + + // Append a null value if there were no valid points. + if isNull { + b.AppendNull() + } else { + b.Append(sum) } - b.Append(sum) }) result := b.NewIntArray() a.merge(start, stop, result, mem) @@ -50,12 +74,16 @@ func (a *aggregateWindowSumInt) merge(start, stop *array.Int, result *array.Int, merged := array.NewIntBuilder(mem) merged.Resize(ts.Len()) mergeWindowValues(ts, prev, next, func(i, j int) { - if i >= 0 && j >= 0 { + iValid := i >= 0 && a.vs.IsValid(i) + jValid := j >= 0 && result.IsValid(j) + if iValid && jValid { merged.Append(a.vs.Value(i) + result.Value(j)) - } else if i >= 0 { + } else if iValid { merged.Append(a.vs.Value(i)) - } else { + } else if jValid { merged.Append(result.Value(j)) + } else { + merged.AppendNull() } }) a.vs.Release() @@ -69,7 +97,7 @@ func (a *aggregateWindowSumInt) Compute(mem memory.Allocator) (*array.Int, flux. b.Resize(n) append = func(i int) { - if i < 0 { + if i < 0 || a.vs.IsNull(i) { b.AppendNull() } else { b.Append(a.vs.Value(i)) @@ -99,97 +127,116 @@ func (a *aggregateWindowSumInt) Close() error { type aggregateWindowMeanInt struct { aggregateWindowBase counts *array.Int - sums *array.Int + means *array.Float } func (a *aggregateWindowMeanInt) Aggregate(ts *array.Int, vs array.Array, start, stop *array.Int, mem memory.Allocator) { countsB := array.NewIntBuilder(mem) countsB.Resize(stop.Len()) - sumsB := array.NewIntBuilder(mem) - sumsB.Resize(stop.Len()) + meansB := array.NewFloatBuilder(mem) + meansB.Resize(stop.Len()) + + // Check once if we should look for nulls in the input. + hasNulls := vs.NullN() > 0 values := vs.(*array.Int) aggregateWindows(ts, start, stop, func(i, j int) { - countsB.Append(int64(j - i)) - var sum int64 - for ; i < j; i++ { - sum += values.Value(i) + var ( + sum int64 + count = int64(j - i) + ) + if hasNulls { + for ; i < j; i++ { + if values.IsNull(i) { + count-- + continue + } + sum += values.Value(i) + } + } else { + for ; i < j; i++ { + sum += values.Value(i) + } + } + countsB.Append(count) + if count > 0 { + meansB.Append(float64(sum) / float64(count)) + } else { + meansB.AppendNull() } - sumsB.Append(sum) }) - counts, sums := countsB.NewIntArray(), sumsB.NewIntArray() - a.merge(start, stop, counts, sums, mem) + counts, means := countsB.NewIntArray(), meansB.NewFloatArray() + a.merge(start, stop, counts, means, mem) } func (a *aggregateWindowMeanInt) Merge(from aggregateWindow, mem memory.Allocator) { other := from.(*aggregateWindowMeanInt) other.counts.Retain() - other.sums.Retain() - a.merge(other.ts, other.ts, other.counts, other.sums, mem) + other.means.Retain() + a.merge(other.ts, other.ts, other.counts, other.means, mem) } -func (a *aggregateWindowMeanInt) merge(start, stop, counts *array.Int, sums *array.Int, mem memory.Allocator) { +func (a *aggregateWindowMeanInt) merge(start, stop, counts *array.Int, means *array.Float, mem memory.Allocator) { a.mergeWindows(start, stop, mem, func(ts, prev, next *array.Int) { - if a.sums == nil { - a.counts, a.sums = counts, sums + if a.means == nil { + a.counts, a.means = counts, means return } defer counts.Release() - defer sums.Release() + defer means.Release() mergedCounts := array.NewIntBuilder(mem) mergedCounts.Resize(ts.Len()) - mergedSums := array.NewIntBuilder(mem) - mergedSums.Resize(ts.Len()) + mergedMeans := array.NewFloatBuilder(mem) + mergedMeans.Resize(ts.Len()) mergeWindowValues(ts, prev, next, func(i, j int) { - if i >= 0 && j >= 0 { - mergedCounts.Append(a.counts.Value(i) + counts.Value(j)) - mergedSums.Append(a.sums.Value(i) + sums.Value(j)) - } else if i >= 0 { + iValid := i >= 0 && a.means.IsValid(i) + jValid := j >= 0 && means.IsValid(j) + if iValid && jValid { + m, n := a.counts.Value(i), counts.Value(j) + mergedCounts.Append(m + n) + mergedMeans.Append((a.means.Value(i)*float64(m) + means.Value(j)*float64(n)) / float64(m+n)) + } else if iValid { mergedCounts.Append(a.counts.Value(i)) - mergedSums.Append(a.sums.Value(i)) - } else { + mergedMeans.Append(a.means.Value(i)) + } else if jValid { mergedCounts.Append(counts.Value(j)) - mergedSums.Append(sums.Value(j)) + mergedMeans.Append(means.Value(j)) + } else { + mergedCounts.Append(0) + mergedMeans.AppendNull() } }) a.counts.Release() - a.sums.Release() - a.counts, a.sums = mergedCounts.NewIntArray(), mergedSums.NewIntArray() + a.means.Release() + a.counts, a.means = mergedCounts.NewIntArray(), mergedMeans.NewFloatArray() }) } func (a *aggregateWindowMeanInt) Compute(mem memory.Allocator) (*array.Int, flux.ColType, array.Array) { - b := array.NewFloatBuilder(mem) - b.Resize(a.ts.Len()) - for i, n := 0, a.sums.Len(); i < n; i++ { - v := float64(a.sums.Value(i)) / float64(a.counts.Value(i)) - b.Append(v) - } - vs := b.NewFloatArray() - a.createEmptyWindows(mem, func(n int) (append func(i int), done func()) { b := array.NewFloatBuilder(mem) b.Resize(n) append = func(i int) { - if i < 0 { + if i < 0 || a.means.IsNull(i) { b.AppendNull() } else { - b.Append(vs.Value(i)) + b.Append(a.means.Value(i)) } } done = func() { - vs.Release() - vs = b.NewFloatArray() + a.means.Release() + a.means = b.NewFloatArray() } return append, done }) a.ts.Retain() - return a.ts, flux.TFloat, vs + a.means.Retain() + return a.ts, flux.TFloat, a.means } func (a *aggregateWindowMeanInt) Close() error { @@ -198,9 +245,9 @@ func (a *aggregateWindowMeanInt) Close() error { a.counts.Release() a.counts = nil } - if a.sums != nil { - a.sums.Release() - a.sums = nil + if a.means != nil { + a.means.Release() + a.means = nil } return nil } @@ -214,13 +261,37 @@ func (a *aggregateWindowSumUint) Aggregate(ts *array.Int, vs array.Array, start, b := array.NewUintBuilder(mem) b.Resize(stop.Len()) + // Check once if we should look for nulls in the input. + hasNulls := vs.NullN() > 0 + values := vs.(*array.Uint) aggregateWindows(ts, start, stop, func(i, j int) { - var sum uint64 - for ; i < j; i++ { - sum += values.Value(i) + var ( + sum uint64 + isNull = hasNulls + ) + if hasNulls { + for ; i < j; i++ { + // If there are nulls, check if this is null. + if values.IsNull(i) { + continue + } + sum += values.Value(i) + isNull = false + } + } else { + // Skip the extra checks if we know there are no nulls. + for ; i < j; i++ { + sum += values.Value(i) + } + } + + // Append a null value if there were no valid points. + if isNull { + b.AppendNull() + } else { + b.Append(sum) } - b.Append(sum) }) result := b.NewUintArray() a.merge(start, stop, result, mem) @@ -243,12 +314,16 @@ func (a *aggregateWindowSumUint) merge(start, stop *array.Int, result *array.Uin merged := array.NewUintBuilder(mem) merged.Resize(ts.Len()) mergeWindowValues(ts, prev, next, func(i, j int) { - if i >= 0 && j >= 0 { + iValid := i >= 0 && a.vs.IsValid(i) + jValid := j >= 0 && result.IsValid(j) + if iValid && jValid { merged.Append(a.vs.Value(i) + result.Value(j)) - } else if i >= 0 { + } else if iValid { merged.Append(a.vs.Value(i)) - } else { + } else if jValid { merged.Append(result.Value(j)) + } else { + merged.AppendNull() } }) a.vs.Release() @@ -262,7 +337,7 @@ func (a *aggregateWindowSumUint) Compute(mem memory.Allocator) (*array.Int, flux b.Resize(n) append = func(i int) { - if i < 0 { + if i < 0 || a.vs.IsNull(i) { b.AppendNull() } else { b.Append(a.vs.Value(i)) @@ -292,97 +367,116 @@ func (a *aggregateWindowSumUint) Close() error { type aggregateWindowMeanUint struct { aggregateWindowBase counts *array.Int - sums *array.Uint + means *array.Float } func (a *aggregateWindowMeanUint) Aggregate(ts *array.Int, vs array.Array, start, stop *array.Int, mem memory.Allocator) { countsB := array.NewIntBuilder(mem) countsB.Resize(stop.Len()) - sumsB := array.NewUintBuilder(mem) - sumsB.Resize(stop.Len()) + meansB := array.NewFloatBuilder(mem) + meansB.Resize(stop.Len()) + + // Check once if we should look for nulls in the input. + hasNulls := vs.NullN() > 0 values := vs.(*array.Uint) aggregateWindows(ts, start, stop, func(i, j int) { - countsB.Append(int64(j - i)) - var sum uint64 - for ; i < j; i++ { - sum += values.Value(i) + var ( + sum uint64 + count = int64(j - i) + ) + if hasNulls { + for ; i < j; i++ { + if values.IsNull(i) { + count-- + continue + } + sum += values.Value(i) + } + } else { + for ; i < j; i++ { + sum += values.Value(i) + } + } + countsB.Append(count) + if count > 0 { + meansB.Append(float64(sum) / float64(count)) + } else { + meansB.AppendNull() } - sumsB.Append(sum) }) - counts, sums := countsB.NewIntArray(), sumsB.NewUintArray() - a.merge(start, stop, counts, sums, mem) + counts, means := countsB.NewIntArray(), meansB.NewFloatArray() + a.merge(start, stop, counts, means, mem) } func (a *aggregateWindowMeanUint) Merge(from aggregateWindow, mem memory.Allocator) { other := from.(*aggregateWindowMeanUint) other.counts.Retain() - other.sums.Retain() - a.merge(other.ts, other.ts, other.counts, other.sums, mem) + other.means.Retain() + a.merge(other.ts, other.ts, other.counts, other.means, mem) } -func (a *aggregateWindowMeanUint) merge(start, stop, counts *array.Int, sums *array.Uint, mem memory.Allocator) { +func (a *aggregateWindowMeanUint) merge(start, stop, counts *array.Int, means *array.Float, mem memory.Allocator) { a.mergeWindows(start, stop, mem, func(ts, prev, next *array.Int) { - if a.sums == nil { - a.counts, a.sums = counts, sums + if a.means == nil { + a.counts, a.means = counts, means return } defer counts.Release() - defer sums.Release() + defer means.Release() mergedCounts := array.NewIntBuilder(mem) mergedCounts.Resize(ts.Len()) - mergedSums := array.NewUintBuilder(mem) - mergedSums.Resize(ts.Len()) + mergedMeans := array.NewFloatBuilder(mem) + mergedMeans.Resize(ts.Len()) mergeWindowValues(ts, prev, next, func(i, j int) { - if i >= 0 && j >= 0 { - mergedCounts.Append(a.counts.Value(i) + counts.Value(j)) - mergedSums.Append(a.sums.Value(i) + sums.Value(j)) - } else if i >= 0 { + iValid := i >= 0 && a.means.IsValid(i) + jValid := j >= 0 && means.IsValid(j) + if iValid && jValid { + m, n := a.counts.Value(i), counts.Value(j) + mergedCounts.Append(m + n) + mergedMeans.Append((a.means.Value(i)*float64(m) + means.Value(j)*float64(n)) / float64(m+n)) + } else if iValid { mergedCounts.Append(a.counts.Value(i)) - mergedSums.Append(a.sums.Value(i)) - } else { + mergedMeans.Append(a.means.Value(i)) + } else if jValid { mergedCounts.Append(counts.Value(j)) - mergedSums.Append(sums.Value(j)) + mergedMeans.Append(means.Value(j)) + } else { + mergedCounts.Append(0) + mergedMeans.AppendNull() } }) a.counts.Release() - a.sums.Release() - a.counts, a.sums = mergedCounts.NewIntArray(), mergedSums.NewUintArray() + a.means.Release() + a.counts, a.means = mergedCounts.NewIntArray(), mergedMeans.NewFloatArray() }) } func (a *aggregateWindowMeanUint) Compute(mem memory.Allocator) (*array.Int, flux.ColType, array.Array) { - b := array.NewFloatBuilder(mem) - b.Resize(a.ts.Len()) - for i, n := 0, a.sums.Len(); i < n; i++ { - v := float64(a.sums.Value(i)) / float64(a.counts.Value(i)) - b.Append(v) - } - vs := b.NewFloatArray() - a.createEmptyWindows(mem, func(n int) (append func(i int), done func()) { b := array.NewFloatBuilder(mem) b.Resize(n) append = func(i int) { - if i < 0 { + if i < 0 || a.means.IsNull(i) { b.AppendNull() } else { - b.Append(vs.Value(i)) + b.Append(a.means.Value(i)) } } done = func() { - vs.Release() - vs = b.NewFloatArray() + a.means.Release() + a.means = b.NewFloatArray() } return append, done }) a.ts.Retain() - return a.ts, flux.TFloat, vs + a.means.Retain() + return a.ts, flux.TFloat, a.means } func (a *aggregateWindowMeanUint) Close() error { @@ -391,9 +485,9 @@ func (a *aggregateWindowMeanUint) Close() error { a.counts.Release() a.counts = nil } - if a.sums != nil { - a.sums.Release() - a.sums = nil + if a.means != nil { + a.means.Release() + a.means = nil } return nil } @@ -407,13 +501,37 @@ func (a *aggregateWindowSumFloat) Aggregate(ts *array.Int, vs array.Array, start b := array.NewFloatBuilder(mem) b.Resize(stop.Len()) + // Check once if we should look for nulls in the input. + hasNulls := vs.NullN() > 0 + values := vs.(*array.Float) aggregateWindows(ts, start, stop, func(i, j int) { - var sum float64 - for ; i < j; i++ { - sum += values.Value(i) + var ( + sum float64 + isNull = hasNulls + ) + if hasNulls { + for ; i < j; i++ { + // If there are nulls, check if this is null. + if values.IsNull(i) { + continue + } + sum += values.Value(i) + isNull = false + } + } else { + // Skip the extra checks if we know there are no nulls. + for ; i < j; i++ { + sum += values.Value(i) + } + } + + // Append a null value if there were no valid points. + if isNull { + b.AppendNull() + } else { + b.Append(sum) } - b.Append(sum) }) result := b.NewFloatArray() a.merge(start, stop, result, mem) @@ -436,12 +554,16 @@ func (a *aggregateWindowSumFloat) merge(start, stop *array.Int, result *array.Fl merged := array.NewFloatBuilder(mem) merged.Resize(ts.Len()) mergeWindowValues(ts, prev, next, func(i, j int) { - if i >= 0 && j >= 0 { + iValid := i >= 0 && a.vs.IsValid(i) + jValid := j >= 0 && result.IsValid(j) + if iValid && jValid { merged.Append(a.vs.Value(i) + result.Value(j)) - } else if i >= 0 { + } else if iValid { merged.Append(a.vs.Value(i)) - } else { + } else if jValid { merged.Append(result.Value(j)) + } else { + merged.AppendNull() } }) a.vs.Release() @@ -455,7 +577,7 @@ func (a *aggregateWindowSumFloat) Compute(mem memory.Allocator) (*array.Int, flu b.Resize(n) append = func(i int) { - if i < 0 { + if i < 0 || a.vs.IsNull(i) { b.AppendNull() } else { b.Append(a.vs.Value(i)) @@ -485,97 +607,116 @@ func (a *aggregateWindowSumFloat) Close() error { type aggregateWindowMeanFloat struct { aggregateWindowBase counts *array.Int - sums *array.Float + means *array.Float } func (a *aggregateWindowMeanFloat) Aggregate(ts *array.Int, vs array.Array, start, stop *array.Int, mem memory.Allocator) { countsB := array.NewIntBuilder(mem) countsB.Resize(stop.Len()) - sumsB := array.NewFloatBuilder(mem) - sumsB.Resize(stop.Len()) + meansB := array.NewFloatBuilder(mem) + meansB.Resize(stop.Len()) + + // Check once if we should look for nulls in the input. + hasNulls := vs.NullN() > 0 values := vs.(*array.Float) aggregateWindows(ts, start, stop, func(i, j int) { - countsB.Append(int64(j - i)) - var sum float64 - for ; i < j; i++ { - sum += values.Value(i) + var ( + sum float64 + count = int64(j - i) + ) + if hasNulls { + for ; i < j; i++ { + if values.IsNull(i) { + count-- + continue + } + sum += values.Value(i) + } + } else { + for ; i < j; i++ { + sum += values.Value(i) + } + } + countsB.Append(count) + if count > 0 { + meansB.Append(float64(sum) / float64(count)) + } else { + meansB.AppendNull() } - sumsB.Append(sum) }) - counts, sums := countsB.NewIntArray(), sumsB.NewFloatArray() - a.merge(start, stop, counts, sums, mem) + counts, means := countsB.NewIntArray(), meansB.NewFloatArray() + a.merge(start, stop, counts, means, mem) } func (a *aggregateWindowMeanFloat) Merge(from aggregateWindow, mem memory.Allocator) { other := from.(*aggregateWindowMeanFloat) other.counts.Retain() - other.sums.Retain() - a.merge(other.ts, other.ts, other.counts, other.sums, mem) + other.means.Retain() + a.merge(other.ts, other.ts, other.counts, other.means, mem) } -func (a *aggregateWindowMeanFloat) merge(start, stop, counts *array.Int, sums *array.Float, mem memory.Allocator) { +func (a *aggregateWindowMeanFloat) merge(start, stop, counts *array.Int, means *array.Float, mem memory.Allocator) { a.mergeWindows(start, stop, mem, func(ts, prev, next *array.Int) { - if a.sums == nil { - a.counts, a.sums = counts, sums + if a.means == nil { + a.counts, a.means = counts, means return } defer counts.Release() - defer sums.Release() + defer means.Release() mergedCounts := array.NewIntBuilder(mem) mergedCounts.Resize(ts.Len()) - mergedSums := array.NewFloatBuilder(mem) - mergedSums.Resize(ts.Len()) + mergedMeans := array.NewFloatBuilder(mem) + mergedMeans.Resize(ts.Len()) mergeWindowValues(ts, prev, next, func(i, j int) { - if i >= 0 && j >= 0 { - mergedCounts.Append(a.counts.Value(i) + counts.Value(j)) - mergedSums.Append(a.sums.Value(i) + sums.Value(j)) - } else if i >= 0 { + iValid := i >= 0 && a.means.IsValid(i) + jValid := j >= 0 && means.IsValid(j) + if iValid && jValid { + m, n := a.counts.Value(i), counts.Value(j) + mergedCounts.Append(m + n) + mergedMeans.Append((a.means.Value(i)*float64(m) + means.Value(j)*float64(n)) / float64(m+n)) + } else if iValid { mergedCounts.Append(a.counts.Value(i)) - mergedSums.Append(a.sums.Value(i)) - } else { + mergedMeans.Append(a.means.Value(i)) + } else if jValid { mergedCounts.Append(counts.Value(j)) - mergedSums.Append(sums.Value(j)) + mergedMeans.Append(means.Value(j)) + } else { + mergedCounts.Append(0) + mergedMeans.AppendNull() } }) a.counts.Release() - a.sums.Release() - a.counts, a.sums = mergedCounts.NewIntArray(), mergedSums.NewFloatArray() + a.means.Release() + a.counts, a.means = mergedCounts.NewIntArray(), mergedMeans.NewFloatArray() }) } func (a *aggregateWindowMeanFloat) Compute(mem memory.Allocator) (*array.Int, flux.ColType, array.Array) { - b := array.NewFloatBuilder(mem) - b.Resize(a.ts.Len()) - for i, n := 0, a.sums.Len(); i < n; i++ { - v := float64(a.sums.Value(i)) / float64(a.counts.Value(i)) - b.Append(v) - } - vs := b.NewFloatArray() - a.createEmptyWindows(mem, func(n int) (append func(i int), done func()) { b := array.NewFloatBuilder(mem) b.Resize(n) append = func(i int) { - if i < 0 { + if i < 0 || a.means.IsNull(i) { b.AppendNull() } else { - b.Append(vs.Value(i)) + b.Append(a.means.Value(i)) } } done = func() { - vs.Release() - vs = b.NewFloatArray() + a.means.Release() + a.means = b.NewFloatArray() } return append, done }) a.ts.Retain() - return a.ts, flux.TFloat, vs + a.means.Retain() + return a.ts, flux.TFloat, a.means } func (a *aggregateWindowMeanFloat) Close() error { @@ -584,9 +725,9 @@ func (a *aggregateWindowMeanFloat) Close() error { a.counts.Release() a.counts = nil } - if a.sums != nil { - a.sums.Release() - a.sums = nil + if a.means != nil { + a.means.Release() + a.means = nil } return nil } diff --git a/stdlib/universe/aggregate_window.gen.go.tmpl b/stdlib/universe/aggregate_window.gen.go.tmpl index 127e2f6a55..a35e3c89b7 100644 --- a/stdlib/universe/aggregate_window.gen.go.tmpl +++ b/stdlib/universe/aggregate_window.gen.go.tmpl @@ -17,13 +17,37 @@ func (a *aggregateWindowSum{{.Name}}) Aggregate(ts *array.Int, vs array.Array, s b := array.New{{.Name}}Builder(mem) b.Resize(stop.Len()) + // Check once if we should look for nulls in the input. + hasNulls := vs.NullN() > 0 + values := vs.(*{{.ArrowType}}) aggregateWindows(ts, start, stop, func(i, j int) { - var sum {{.Type}} - for ; i < j; i++ { - sum += values.Value(i) - } - b.Append(sum) + var ( + sum {{.Type}} + isNull = hasNulls + ) + if hasNulls { + for ; i < j; i++ { + // If there are nulls, check if this is null. + if values.IsNull(i) { + continue + } + sum += values.Value(i) + isNull = false + } + } else { + // Skip the extra checks if we know there are no nulls. + for ; i < j; i++ { + sum += values.Value(i) + } + } + + // Append a null value if there were no valid points. + if isNull { + b.AppendNull() + } else { + b.Append(sum) + } }) result := b.New{{.Name}}Array() a.merge(start, stop, result, mem) @@ -46,13 +70,17 @@ func (a *aggregateWindowSum{{.Name}}) merge(start, stop *array.Int, result *arra merged := array.New{{.Name}}Builder(mem) merged.Resize(ts.Len()) mergeWindowValues(ts, prev, next, func(i, j int) { - if i >= 0 && j >= 0 { + iValid := i >= 0 && a.vs.IsValid(i) + jValid := j >= 0 && result.IsValid(j) + if iValid && jValid { merged.Append(a.vs.Value(i) + result.Value(j)) - } else if i >= 0 { + } else if iValid { merged.Append(a.vs.Value(i)) - } else { + } else if jValid { merged.Append(result.Value(j)) - } + } else { + merged.AppendNull() + } }) a.vs.Release() a.vs = merged.New{{.Name}}Array() @@ -65,7 +93,7 @@ func (a *aggregateWindowSum{{.Name}}) Compute(mem memory.Allocator) (*array.Int, b.Resize(n) append = func(i int) { - if i < 0 { + if i < 0 || a.vs.IsNull(i) { b.AppendNull() } else { b.Append(a.vs.Value(i)) @@ -95,97 +123,116 @@ func (a *aggregateWindowSum{{.Name}}) Close() error { type aggregateWindowMean{{.Name}} struct { aggregateWindowBase counts *array.Int - sums *{{.ArrowType}} + means *array.Float } func (a *aggregateWindowMean{{.Name}}) Aggregate(ts *array.Int, vs array.Array, start, stop *array.Int, mem memory.Allocator) { countsB := array.NewIntBuilder(mem) countsB.Resize(stop.Len()) - sumsB := array.New{{.Name}}Builder(mem) - sumsB.Resize(stop.Len()) + meansB := array.NewFloatBuilder(mem) + meansB.Resize(stop.Len()) + + // Check once if we should look for nulls in the input. + hasNulls := vs.NullN() > 0 values := vs.(*{{.ArrowType}}) aggregateWindows(ts, start, stop, func(i, j int) { - countsB.Append(int64(j - i)) - var sum {{.Type}} - for ; i < j; i++ { - sum += values.Value(i) + var ( + sum {{.Type}} + count = int64(j - i) + ) + if hasNulls { + for ; i < j; i++ { + if values.IsNull(i) { + count-- + continue + } + sum += values.Value(i) + } + } else { + for ; i < j; i++ { + sum += values.Value(i) + } + } + countsB.Append(count) + if count > 0 { + meansB.Append(float64(sum) / float64(count)) + } else { + meansB.AppendNull() } - sumsB.Append(sum) }) - counts, sums := countsB.NewIntArray(), sumsB.New{{.Name}}Array() - a.merge(start, stop, counts, sums, mem) + counts, means := countsB.NewIntArray(), meansB.NewFloatArray() + a.merge(start, stop, counts, means, mem) } func (a *aggregateWindowMean{{.Name}}) Merge(from aggregateWindow, mem memory.Allocator) { other := from.(*aggregateWindowMean{{.Name}}) other.counts.Retain() - other.sums.Retain() - a.merge(other.ts, other.ts, other.counts, other.sums, mem) + other.means.Retain() + a.merge(other.ts, other.ts, other.counts, other.means, mem) } -func (a *aggregateWindowMean{{.Name}}) merge(start, stop, counts *array.Int, sums *{{.ArrowType}}, mem memory.Allocator) { +func (a *aggregateWindowMean{{.Name}}) merge(start, stop, counts *array.Int, means *array.Float, mem memory.Allocator) { a.mergeWindows(start, stop, mem, func(ts, prev, next *array.Int) { - if a.sums == nil { - a.counts, a.sums = counts, sums + if a.means == nil { + a.counts, a.means = counts, means return } defer counts.Release() - defer sums.Release() + defer means.Release() mergedCounts := array.NewIntBuilder(mem) mergedCounts.Resize(ts.Len()) - mergedSums := array.New{{.Name}}Builder(mem) - mergedSums.Resize(ts.Len()) + mergedMeans := array.NewFloatBuilder(mem) + mergedMeans.Resize(ts.Len()) mergeWindowValues(ts, prev, next, func(i, j int) { - if i >= 0 && j >= 0 { - mergedCounts.Append(a.counts.Value(i) + counts.Value(j)) - mergedSums.Append(a.sums.Value(i) + sums.Value(j)) - } else if i >= 0 { + iValid := i >= 0 && a.means.IsValid(i) + jValid := j >= 0 && means.IsValid(j) + if iValid && jValid { + m, n := a.counts.Value(i), counts.Value(j) + mergedCounts.Append(m + n) + mergedMeans.Append((a.means.Value(i) * float64(m) + means.Value(j) * float64(n)) / float64(m + n)) + } else if iValid { mergedCounts.Append(a.counts.Value(i)) - mergedSums.Append(a.sums.Value(i)) - } else { + mergedMeans.Append(a.means.Value(i)) + } else if jValid { mergedCounts.Append(counts.Value(j)) - mergedSums.Append(sums.Value(j)) + mergedMeans.Append(means.Value(j)) + } else { + mergedCounts.Append(0) + mergedMeans.AppendNull() } }) a.counts.Release() - a.sums.Release() - a.counts, a.sums = mergedCounts.NewIntArray(), mergedSums.New{{.Name}}Array() + a.means.Release() + a.counts, a.means = mergedCounts.NewIntArray(), mergedMeans.NewFloatArray() }) } func (a *aggregateWindowMean{{.Name}}) Compute(mem memory.Allocator) (*array.Int, flux.ColType, array.Array) { - b := array.NewFloatBuilder(mem) - b.Resize(a.ts.Len()) - for i, n := 0, a.sums.Len(); i < n; i++ { - v := float64(a.sums.Value(i)) / float64(a.counts.Value(i)) - b.Append(v) - } - vs := b.NewFloatArray() - a.createEmptyWindows(mem, func(n int) (append func(i int), done func()) { b := array.NewFloatBuilder(mem) b.Resize(n) append = func(i int) { - if i < 0 { + if i < 0 || a.means.IsNull(i) { b.AppendNull() } else { - b.Append(vs.Value(i)) + b.Append(a.means.Value(i)) } } done = func() { - vs.Release() - vs = b.NewFloatArray() + a.means.Release() + a.means = b.NewFloatArray() } return append, done }) a.ts.Retain() - return a.ts, flux.TFloat, vs + a.means.Retain() + return a.ts, flux.TFloat, a.means } func (a *aggregateWindowMean{{.Name}}) Close() error { @@ -194,9 +241,9 @@ func (a *aggregateWindowMean{{.Name}}) Close() error { a.counts.Release() a.counts = nil } - if a.sums != nil { - a.sums.Release() - a.sums = nil + if a.means != nil { + a.means.Release() + a.means = nil } return nil } diff --git a/stdlib/universe/aggregate_window_test.flux b/stdlib/universe/aggregate_window_test.flux index 6a8492d3ed..fc4d863b0c 100644 --- a/stdlib/universe/aggregate_window_test.flux +++ b/stdlib/universe/aggregate_window_test.flux @@ -4,227 +4,160 @@ package universe_test import "array" import "csv" import "testing" +import "planner" + +sampleData = [ + {_time: 2019-11-25T00:00:00Z, t0: "a-0", _value: 1.0}, + {_time: 2019-11-25T00:00:05Z, t0: "a-0", _value: 5.0}, + {_time: 2019-11-25T00:00:15Z, t0: "a-0", _value: 2.0}, + {_time: 2019-11-25T00:00:30Z, t0: "a-0", _value: 3.0}, + {_time: 2019-11-25T00:00:45Z, t0: "a-0", _value: 4.0}, + {_time: 2019-11-25T00:00:00Z, t0: "a-1", _value: 1.0}, + {_time: 2019-11-25T00:00:05Z, t0: "a-1", _value: 5.0}, + {_time: 2019-11-25T00:00:15Z, t0: "a-1", _value: 2.0}, + {_time: 2019-11-25T00:00:30Z, t0: "a-1", _value: 3.0}, + {_time: 2019-11-25T00:00:45Z, t0: "a-1", _value: 4.0}, + {_time: 2019-11-25T00:00:00Z, t0: "a-2", _value: 1.0}, + {_time: 2019-11-25T00:00:05Z, t0: "a-2", _value: 5.0}, + {_time: 2019-11-25T00:00:15Z, t0: "a-2", _value: 2.0}, + {_time: 2019-11-25T00:00:30Z, t0: "a-2", _value: 3.0}, + {_time: 2019-11-25T00:00:45Z, t0: "a-2", _value: 4.0}, +] do_test = (every, fn) => - array.from( - rows: [ - { - _time: 2019-11-25T00:00:00Z, - _measurement: "m0", - _field: "f0", - t0: "a-0", - _value: 1.0, - }, - { - _time: 2019-11-25T00:00:15Z, - _measurement: "m0", - _field: "f0", - t0: "a-0", - _value: 2.0, - }, - { - _time: 2019-11-25T00:00:30Z, - _measurement: "m0", - _field: "f0", - t0: "a-0", - _value: 3.0, - }, - { - _time: 2019-11-25T00:00:45Z, - _measurement: "m0", - _field: "f0", - t0: "a-0", - _value: 4.0, - }, - { - _time: 2019-11-25T00:00:00Z, - _measurement: "m0", - _field: "f0", - t0: "a-1", - _value: 1.0, - }, - { - _time: 2019-11-25T00:00:15Z, - _measurement: "m0", - _field: "f0", - t0: "a-1", - _value: 2.0, - }, - { - _time: 2019-11-25T00:00:30Z, - _measurement: "m0", - _field: "f0", - t0: "a-1", - _value: 3.0, - }, - { - _time: 2019-11-25T00:00:45Z, - _measurement: "m0", - _field: "f0", - t0: "a-1", - _value: 4.0, - }, - { - _time: 2019-11-25T00:00:00Z, - _measurement: "m0", - _field: "f0", - t0: "a-2", - _value: 1.0, - }, - { - _time: 2019-11-25T00:00:15Z, - _measurement: "m0", - _field: "f0", - t0: "a-2", - _value: 2.0, - }, - { - _time: 2019-11-25T00:00:30Z, - _measurement: "m0", - _field: "f0", - t0: "a-2", - _value: 3.0, - }, - { - _time: 2019-11-25T00:00:45Z, - _measurement: "m0", - _field: "f0", - t0: "a-2", - _value: 4.0, - }, - ], - ) + array.from(rows: sampleData) + |> map(fn: (r) => ({r with _measurement: "m0", _field: "f0"})) |> group(columns: ["_measurement", "_field", "t0"]) |> testing.load() |> range(start: 2019-11-25T00:00:00Z, stop: 2019-11-25T00:01:00Z) - |> aggregateWindow(every, fn) + |> aggregateWindow(every: every, fn: fn, timeSrc: "_start") |> drop(columns: ["_start", "_stop"]) -testcase count_with_nulls { +testcase count_empty_windows { want = array.from( rows: [ { - _time: 2019-11-25T00:00:10Z, + _time: 2019-11-25T00:00:00Z, _measurement: "m0", _field: "f0", t0: "a-0", - _value: 1, + _value: 2, }, { - _time: 2019-11-25T00:00:20Z, + _time: 2019-11-25T00:00:10Z, _measurement: "m0", _field: "f0", t0: "a-0", _value: 1, }, { - _time: 2019-11-25T00:00:30Z, + _time: 2019-11-25T00:00:20Z, _measurement: "m0", _field: "f0", t0: "a-0", _value: 0, }, { - _time: 2019-11-25T00:00:40Z, + _time: 2019-11-25T00:00:30Z, _measurement: "m0", _field: "f0", t0: "a-0", _value: 1, }, { - _time: 2019-11-25T00:00:50Z, + _time: 2019-11-25T00:00:40Z, _measurement: "m0", _field: "f0", t0: "a-0", _value: 1, }, { - _time: 2019-11-25T00:01:00Z, + _time: 2019-11-25T00:00:50Z, _measurement: "m0", _field: "f0", t0: "a-0", _value: 0, }, { - _time: 2019-11-25T00:00:10Z, + _time: 2019-11-25T00:00:00Z, _measurement: "m0", _field: "f0", t0: "a-1", - _value: 1, + _value: 2, }, { - _time: 2019-11-25T00:00:20Z, + _time: 2019-11-25T00:00:10Z, _measurement: "m0", _field: "f0", t0: "a-1", _value: 1, }, { - _time: 2019-11-25T00:00:30Z, + _time: 2019-11-25T00:00:20Z, _measurement: "m0", _field: "f0", t0: "a-1", _value: 0, }, { - _time: 2019-11-25T00:00:40Z, + _time: 2019-11-25T00:00:30Z, _measurement: "m0", _field: "f0", t0: "a-1", _value: 1, }, { - _time: 2019-11-25T00:00:50Z, + _time: 2019-11-25T00:00:40Z, _measurement: "m0", _field: "f0", t0: "a-1", _value: 1, }, { - _time: 2019-11-25T00:01:00Z, + _time: 2019-11-25T00:00:50Z, _measurement: "m0", _field: "f0", t0: "a-1", _value: 0, }, { - _time: 2019-11-25T00:00:10Z, + _time: 2019-11-25T00:00:00Z, _measurement: "m0", _field: "f0", t0: "a-2", - _value: 1, + _value: 2, }, { - _time: 2019-11-25T00:00:20Z, + _time: 2019-11-25T00:00:10Z, _measurement: "m0", _field: "f0", t0: "a-2", _value: 1, }, { - _time: 2019-11-25T00:00:30Z, + _time: 2019-11-25T00:00:20Z, _measurement: "m0", _field: "f0", t0: "a-2", _value: 0, }, { - _time: 2019-11-25T00:00:40Z, + _time: 2019-11-25T00:00:30Z, _measurement: "m0", _field: "f0", t0: "a-2", _value: 1, }, { - _time: 2019-11-25T00:00:50Z, + _time: 2019-11-25T00:00:40Z, _measurement: "m0", _field: "f0", t0: "a-2", _value: 1, }, { - _time: 2019-11-25T00:01:00Z, + _time: 2019-11-25T00:00:50Z, _measurement: "m0", _field: "f0", t0: "a-2", @@ -235,9 +168,52 @@ testcase count_with_nulls { |> group(columns: ["_measurement", "_field", "t0"]) got = do_test(every: 10s, fn: count) - testing.diff(got, want) |> yield() + testing.diff(got, want) } -testcase min_with_nulls { + +testcase count_with_nulls { + want = + array.from( + rows: [ + {_time: 2019-11-25T00:00:00Z, t0: "a-0", _value: 6}, + {_time: 2019-11-25T00:00:00Z, t0: "a-1", _value: 6}, + {_time: 2019-11-25T00:00:00Z, t0: "a-2", _value: 6}, + ], + ) + |> map(fn: (r) => ({r with _measurement: "m0", _field: "f0"})) + |> group(columns: ["_measurement", "_field", "t0"]) + got = + do_test(every: 10s, fn: sum) + |> aggregateWindow(every: 1m, fn: count, timeSrc: "_start") + |> drop(columns: ["_start", "_stop"]) + + testing.diff(got, want) +} + +testcase count_null_windows { + want = + array.from( + rows: [ + {_time: 2019-11-25T00:00:00Z, _value: 3}, + {_time: 2019-11-25T00:00:10Z, _value: 3}, + {_time: 2019-11-25T00:00:20Z, _value: 3}, + {_time: 2019-11-25T00:00:30Z, _value: 3}, + {_time: 2019-11-25T00:00:40Z, _value: 3}, + {_time: 2019-11-25T00:00:50Z, _value: 3}, + ], + ) + |> map(fn: (r) => ({r with _measurement: "m0", _field: "f0"})) + |> group(columns: ["_measurement", "_field"]) + got = + do_test(every: 10s, fn: sum) + |> group(columns: ["_measurement", "_field"]) + |> aggregateWindow(every: 10s, fn: count, timeSrc: "_start") + |> drop(columns: ["_start", "_stop"]) + + testing.diff(got, want) +} + +testcase sum_empty_windows { want = csv.from( csv: @@ -245,31 +221,230 @@ testcase min_with_nulls { #group,false,false,false,true,true,true,false #default,_result,,,,,, ,result,table,_time,_measurement,_field,t0,_value -,,0,2019-11-25T00:00:10Z,m0,f0,a-0,1.0 -,,0,2019-11-25T00:00:20Z,m0,f0,a-0,2.0 -,,0,2019-11-25T00:00:30Z,m0,f0,a-0, -,,0,2019-11-25T00:00:40Z,m0,f0,a-0,3.0 -,,0,2019-11-25T00:00:50Z,m0,f0,a-0,4.0 -,,0,2019-11-25T00:01:00Z,m0,f0,a-0, -,,1,2019-11-25T00:00:10Z,m0,f0,a-1,1.0 -,,1,2019-11-25T00:00:20Z,m0,f0,a-1,2.0 -,,1,2019-11-25T00:00:30Z,m0,f0,a-1, -,,1,2019-11-25T00:00:40Z,m0,f0,a-1,3.0 -,,1,2019-11-25T00:00:50Z,m0,f0,a-1,4.0 -,,1,2019-11-25T00:01:00Z,m0,f0,a-1, -,,2,2019-11-25T00:00:10Z,m0,f0,a-2,1.0 -,,2,2019-11-25T00:00:20Z,m0,f0,a-2,2.0 -,,2,2019-11-25T00:00:30Z,m0,f0,a-2, -,,2,2019-11-25T00:00:40Z,m0,f0,a-2,3.0 -,,2,2019-11-25T00:00:50Z,m0,f0,a-2,4.0 -,,2,2019-11-25T00:01:00Z,m0,f0,a-2, +,,0,2019-11-25T00:00:00Z,m0,f0,a-0,6.0 +,,0,2019-11-25T00:00:10Z,m0,f0,a-0,2.0 +,,0,2019-11-25T00:00:20Z,m0,f0,a-0, +,,0,2019-11-25T00:00:30Z,m0,f0,a-0,3.0 +,,0,2019-11-25T00:00:40Z,m0,f0,a-0,4.0 +,,0,2019-11-25T00:00:50Z,m0,f0,a-0, +,,1,2019-11-25T00:00:00Z,m0,f0,a-1,6.0 +,,1,2019-11-25T00:00:10Z,m0,f0,a-1,2.0 +,,1,2019-11-25T00:00:20Z,m0,f0,a-1, +,,1,2019-11-25T00:00:30Z,m0,f0,a-1,3.0 +,,1,2019-11-25T00:00:40Z,m0,f0,a-1,4.0 +,,1,2019-11-25T00:00:50Z,m0,f0,a-1, +,,2,2019-11-25T00:00:00Z,m0,f0,a-2,6.0 +,,2,2019-11-25T00:00:10Z,m0,f0,a-2,2.0 +,,2,2019-11-25T00:00:20Z,m0,f0,a-2, +,,2,2019-11-25T00:00:30Z,m0,f0,a-2,3.0 +,,2,2019-11-25T00:00:40Z,m0,f0,a-2,4.0 +,,2,2019-11-25T00:00:50Z,m0,f0,a-2, +", + ) + got = do_test(every: 10s, fn: sum) + + testing.diff(got, want) +} + +testcase sum_with_nulls { + want = + array.from( + rows: [ + {_time: 2019-11-25T00:00:00Z, t0: "a-0", _value: 15.0}, + {_time: 2019-11-25T00:00:00Z, t0: "a-1", _value: 15.0}, + {_time: 2019-11-25T00:00:00Z, t0: "a-2", _value: 15.0}, + ], + ) + |> map(fn: (r) => ({r with _measurement: "m0", _field: "f0"})) + |> group(columns: ["_measurement", "_field", "t0"]) + got = + do_test(every: 10s, fn: sum) + |> aggregateWindow(every: 1m, fn: sum, timeSrc: "_start") + |> drop(columns: ["_start", "_stop"]) + + testing.diff(got, want) +} + +testcase sum_null_windows { + want = + csv.from( + csv: + "#datatype,string,long,dateTime:RFC3339,string,string,double +#group,false,false,false,true,true,false +#default,_result,,,,, +,result,table,_time,_measurement,_field,_value +,,0,2019-11-25T00:00:00Z,m0,f0,18.0 +,,0,2019-11-25T00:00:10Z,m0,f0,6.0 +,,0,2019-11-25T00:00:20Z,m0,f0, +,,0,2019-11-25T00:00:30Z,m0,f0,9.0 +,,0,2019-11-25T00:00:40Z,m0,f0,12.0 +,,0,2019-11-25T00:00:50Z,m0,f0, +", + ) + got = + do_test(every: 10s, fn: sum) + |> group(columns: ["_measurement", "_field"]) + |> aggregateWindow(every: 10s, fn: sum, timeSrc: "_start") + |> drop(columns: ["_start", "_stop"]) + + testing.diff(got, want) +} + +testcase mean_empty_windows { + want = + csv.from( + csv: + "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,_field,t0,_value +,,0,2019-11-25T00:00:00Z,m0,f0,a-0,3.0 +,,0,2019-11-25T00:00:10Z,m0,f0,a-0,2.0 +,,0,2019-11-25T00:00:20Z,m0,f0,a-0, +,,0,2019-11-25T00:00:30Z,m0,f0,a-0,3.0 +,,0,2019-11-25T00:00:40Z,m0,f0,a-0,4.0 +,,0,2019-11-25T00:00:50Z,m0,f0,a-0, +,,1,2019-11-25T00:00:00Z,m0,f0,a-1,3.0 +,,1,2019-11-25T00:00:10Z,m0,f0,a-1,2.0 +,,1,2019-11-25T00:00:20Z,m0,f0,a-1, +,,1,2019-11-25T00:00:30Z,m0,f0,a-1,3.0 +,,1,2019-11-25T00:00:40Z,m0,f0,a-1,4.0 +,,1,2019-11-25T00:00:50Z,m0,f0,a-1, +,,2,2019-11-25T00:00:00Z,m0,f0,a-2,3.0 +,,2,2019-11-25T00:00:10Z,m0,f0,a-2,2.0 +,,2,2019-11-25T00:00:20Z,m0,f0,a-2, +,,2,2019-11-25T00:00:30Z,m0,f0,a-2,3.0 +,,2,2019-11-25T00:00:40Z,m0,f0,a-2,4.0 +,,2,2019-11-25T00:00:50Z,m0,f0,a-2, +", + ) + got = do_test(every: 10s, fn: mean) + + testing.diff(got, want) +} + +testcase mean_with_nulls { + want = + array.from( + rows: [ + {_time: 2019-11-25T00:00:00Z, t0: "a-0", _value: 3.75}, + {_time: 2019-11-25T00:00:00Z, t0: "a-1", _value: 3.75}, + {_time: 2019-11-25T00:00:00Z, t0: "a-2", _value: 3.75}, + ], + ) + |> map(fn: (r) => ({r with _measurement: "m0", _field: "f0"})) + |> group(columns: ["_measurement", "_field", "t0"]) + got = + do_test(every: 10s, fn: sum) + |> aggregateWindow(every: 1m, fn: mean, timeSrc: "_start") + |> drop(columns: ["_start", "_stop"]) + + testing.diff(got, want) +} + +testcase mean_null_windows { + want = + csv.from( + csv: + "#datatype,string,long,dateTime:RFC3339,string,string,double +#group,false,false,false,true,true,false +#default,_result,,,,, +,result,table,_time,_measurement,_field,_value +,,0,2019-11-25T00:00:00Z,m0,f0,6.0 +,,0,2019-11-25T00:00:10Z,m0,f0,2.0 +,,0,2019-11-25T00:00:20Z,m0,f0, +,,0,2019-11-25T00:00:30Z,m0,f0,3.0 +,,0,2019-11-25T00:00:40Z,m0,f0,4.0 +,,0,2019-11-25T00:00:50Z,m0,f0, +", + ) + got = + do_test(every: 10s, fn: sum) + |> group(columns: ["_measurement", "_field"]) + |> aggregateWindow(every: 10s, fn: mean, timeSrc: "_start") + |> drop(columns: ["_start", "_stop"]) + + testing.diff(got, want) +} + +testcase min_empty_windows { + want = + csv.from( + csv: + "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,true,false +#default,_result,,,,,, +,result,table,_time,_measurement,_field,t0,_value +,,0,2019-11-25T00:00:00Z,m0,f0,a-0,1.0 +,,0,2019-11-25T00:00:10Z,m0,f0,a-0,2.0 +,,0,2019-11-25T00:00:20Z,m0,f0,a-0, +,,0,2019-11-25T00:00:30Z,m0,f0,a-0,3.0 +,,0,2019-11-25T00:00:40Z,m0,f0,a-0,4.0 +,,0,2019-11-25T00:00:50Z,m0,f0,a-0, +,,1,2019-11-25T00:00:00Z,m0,f0,a-1,1.0 +,,1,2019-11-25T00:00:10Z,m0,f0,a-1,2.0 +,,1,2019-11-25T00:00:20Z,m0,f0,a-1, +,,1,2019-11-25T00:00:30Z,m0,f0,a-1,3.0 +,,1,2019-11-25T00:00:40Z,m0,f0,a-1,4.0 +,,1,2019-11-25T00:00:50Z,m0,f0,a-1, +,,2,2019-11-25T00:00:00Z,m0,f0,a-2,1.0 +,,2,2019-11-25T00:00:10Z,m0,f0,a-2,2.0 +,,2,2019-11-25T00:00:20Z,m0,f0,a-2, +,,2,2019-11-25T00:00:30Z,m0,f0,a-2,3.0 +,,2,2019-11-25T00:00:40Z,m0,f0,a-2,4.0 +,,2,2019-11-25T00:00:50Z,m0,f0,a-2, ", ) got = do_test(every: 10s, fn: min) - testing.diff(got, want) |> yield() + testing.diff(got, want) } -testcase max_with_nulls { + +testcase min_with_nulls { + want = + array.from( + rows: [ + {_time: 2019-11-25T00:00:00Z, t0: "a-0", _value: 2.0}, + {_time: 2019-11-25T00:00:00Z, t0: "a-1", _value: 2.0}, + {_time: 2019-11-25T00:00:00Z, t0: "a-2", _value: 2.0}, + ], + ) + |> map(fn: (r) => ({r with _measurement: "m0", _field: "f0"})) + |> group(columns: ["_measurement", "_field", "t0"]) + got = + do_test(every: 10s, fn: sum) + |> aggregateWindow(every: 1m, fn: min, timeSrc: "_start") + |> drop(columns: ["_start", "_stop"]) + + testing.diff(got, want) +} + +testcase min_null_windows { + want = + csv.from( + csv: + "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,false,false +#default,_result,,,,,, +,result,table,_time,_measurement,_field,t0,_value +,,0,2019-11-25T00:00:00Z,m0,f0,a-0,6.0 +,,0,2019-11-25T00:00:10Z,m0,f0,a-0,2.0 +,,0,2019-11-25T00:00:20Z,m0,f0,, +,,0,2019-11-25T00:00:30Z,m0,f0,a-0,3.0 +,,0,2019-11-25T00:00:40Z,m0,f0,a-0,4.0 +,,0,2019-11-25T00:00:50Z,m0,f0,, +", + ) + got = + do_test(every: 10s, fn: sum) + |> group(columns: ["_measurement", "_field"]) + |> aggregateWindow(every: 10s, fn: min, timeSrc: "_start") + |> drop(columns: ["_start", "_stop"]) + + testing.diff(got, want) +} + +testcase max_empty_windows { want = csv.from( csv: @@ -277,27 +452,71 @@ testcase max_with_nulls { #group,false,false,false,true,true,true,false #default,_result,,,,,, ,result,table,_time,_measurement,_field,t0,_value -,,0,2019-11-25T00:00:10Z,m0,f0,a-0,1.0 -,,0,2019-11-25T00:00:20Z,m0,f0,a-0,2.0 -,,0,2019-11-25T00:00:30Z,m0,f0,a-0, -,,0,2019-11-25T00:00:40Z,m0,f0,a-0,3.0 -,,0,2019-11-25T00:00:50Z,m0,f0,a-0,4.0 -,,0,2019-11-25T00:01:00Z,m0,f0,a-0, -,,1,2019-11-25T00:00:10Z,m0,f0,a-1,1.0 -,,1,2019-11-25T00:00:20Z,m0,f0,a-1,2.0 -,,1,2019-11-25T00:00:30Z,m0,f0,a-1, -,,1,2019-11-25T00:00:40Z,m0,f0,a-1,3.0 -,,1,2019-11-25T00:00:50Z,m0,f0,a-1,4.0 -,,1,2019-11-25T00:01:00Z,m0,f0,a-1, -,,2,2019-11-25T00:00:10Z,m0,f0,a-2,1.0 -,,2,2019-11-25T00:00:20Z,m0,f0,a-2,2.0 -,,2,2019-11-25T00:00:30Z,m0,f0,a-2, -,,2,2019-11-25T00:00:40Z,m0,f0,a-2,3.0 -,,2,2019-11-25T00:00:50Z,m0,f0,a-2,4.0 -,,2,2019-11-25T00:01:00Z,m0,f0,a-2, +,,0,2019-11-25T00:00:00Z,m0,f0,a-0,5.0 +,,0,2019-11-25T00:00:10Z,m0,f0,a-0,2.0 +,,0,2019-11-25T00:00:20Z,m0,f0,a-0, +,,0,2019-11-25T00:00:30Z,m0,f0,a-0,3.0 +,,0,2019-11-25T00:00:40Z,m0,f0,a-0,4.0 +,,0,2019-11-25T00:00:50Z,m0,f0,a-0, +,,1,2019-11-25T00:00:00Z,m0,f0,a-1,5.0 +,,1,2019-11-25T00:00:10Z,m0,f0,a-1,2.0 +,,1,2019-11-25T00:00:20Z,m0,f0,a-1, +,,1,2019-11-25T00:00:30Z,m0,f0,a-1,3.0 +,,1,2019-11-25T00:00:40Z,m0,f0,a-1,4.0 +,,1,2019-11-25T00:00:50Z,m0,f0,a-1, +,,2,2019-11-25T00:00:00Z,m0,f0,a-2,5.0 +,,2,2019-11-25T00:00:10Z,m0,f0,a-2,2.0 +,,2,2019-11-25T00:00:20Z,m0,f0,a-2, +,,2,2019-11-25T00:00:30Z,m0,f0,a-2,3.0 +,,2,2019-11-25T00:00:40Z,m0,f0,a-2,4.0 +,,2,2019-11-25T00:00:50Z,m0,f0,a-2, ", ) got = do_test(every: 10s, fn: max) - testing.diff(got, want) |> yield() + testing.diff(got, want) +} + +testcase max_with_nulls { + want = + array.from( + rows: [ + {_time: 2019-11-25T00:00:00Z, t0: "a-0", _value: 6.0}, + {_time: 2019-11-25T00:00:00Z, t0: "a-1", _value: 6.0}, + {_time: 2019-11-25T00:00:00Z, t0: "a-2", _value: 6.0}, + ], + ) + |> map(fn: (r) => ({r with _measurement: "m0", _field: "f0"})) + |> group(columns: ["_measurement", "_field", "t0"]) + got = + do_test(every: 10s, fn: sum) + |> aggregateWindow(every: 1m, fn: max, timeSrc: "_start") + |> drop(columns: ["_start", "_stop"]) + + testing.diff(got, want) +} + +testcase max_null_windows { + want = + csv.from( + csv: + "#datatype,string,long,dateTime:RFC3339,string,string,string,double +#group,false,false,false,true,true,false,false +#default,_result,,,,,, +,result,table,_time,_measurement,_field,t0,_value +,,0,2019-11-25T00:00:00Z,m0,f0,a-0,6.0 +,,0,2019-11-25T00:00:10Z,m0,f0,a-0,2.0 +,,0,2019-11-25T00:00:20Z,m0,f0,, +,,0,2019-11-25T00:00:30Z,m0,f0,a-0,3.0 +,,0,2019-11-25T00:00:40Z,m0,f0,a-0,4.0 +,,0,2019-11-25T00:00:50Z,m0,f0,, +", + ) + got = + do_test(every: 10s, fn: sum) + |> group(columns: ["_measurement", "_field"]) + |> aggregateWindow(every: 10s, fn: max, timeSrc: "_start") + |> drop(columns: ["_start", "_stop"]) + + testing.diff(got, want) }