Skip to content

Commit

Permalink
Eliminate unnecessary native histogram copies
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Jan 10, 2025
1 parent 4cf1e27 commit 0ac7576
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (v *VectorScalarBinaryOperation) NextSeries(ctx context.Context) (types.Ins
v.vectorIterator.Reset(series)

for {
t, vectorF, vectorH, keep := v.vectorIterator.Next()
t, vectorF, vectorH, vectorHIndex, keep := v.vectorIterator.Next()

if !keep {
// We are done.
Expand Down Expand Up @@ -215,6 +215,11 @@ func (v *VectorScalarBinaryOperation) NextSeries(ctx context.Context) (types.Ins
}
}

if h == vectorH {
// We're reusing the FloatHistogram from the vector. Remove it from the vector so that it is not modified when the slice is reused.
series.Histograms[vectorHIndex].H = nil
}

hPoints = append(hPoints, promql.HPoint{T: t, H: h})
} else {
// We have a float value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,8 @@ func (b *VectorVectorBinaryOperation) computeResult(left types.InstantVectorSeri
b.rightIterator.Reset(right)

// Get first sample from left and right
lT, lF, lH, lOk := b.leftIterator.Next()
rT, rF, rH, rOk := b.rightIterator.Next()
lT, lF, lH, lHIndex, lOk := b.leftIterator.Next()
rT, rF, rH, rHIndex, rOk := b.rightIterator.Next()
// Continue iterating until we exhaust either the LHS or RHS
// denoted by lOk or rOk being false.
for lOk && rOk {
Expand Down Expand Up @@ -586,6 +586,17 @@ func (b *VectorVectorBinaryOperation) computeResult(left types.InstantVectorSeri
return types.InstantVectorSeriesData{}, err
}
}

// Check if we're reusing the FloatHistogram from either side.
// If so, remove it so that it is not modified when the slice is reused.
if resultHist == lH {
left.Histograms[lHIndex].H = nil
}

if resultHist == rH {
right.Histograms[rHIndex].H = nil
}

hPoints = append(hPoints, promql.HPoint{
H: resultHist,
T: lT,
Expand All @@ -606,12 +617,12 @@ func (b *VectorVectorBinaryOperation) computeResult(left types.InstantVectorSeri

// Advance the iterator with the lower timestamp, or both if equal
if lT == rT {
lT, lF, lH, lOk = b.leftIterator.Next()
rT, rF, rH, rOk = b.rightIterator.Next()
lT, lF, lH, lHIndex, lOk = b.leftIterator.Next()
rT, rF, rH, rHIndex, rOk = b.rightIterator.Next()
} else if lT < rT {
lT, lF, lH, lOk = b.leftIterator.Next()
lT, lF, lH, lHIndex, lOk = b.leftIterator.Next()
} else {
rT, rF, rH, rOk = b.rightIterator.Next()
rT, rF, rH, rHIndex, rOk = b.rightIterator.Next()
}
}

Expand Down Expand Up @@ -662,16 +673,14 @@ func (b *VectorVectorBinaryOperation) emitAnnotation(generator types.AnnotationG

type binaryOperationFunc func(lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram) (f float64, h *histogram.FloatHistogram, keep bool, valid bool, err error)

// FIXME(jhesketh): Investigate avoiding copying histograms for binary ops.
// We would need nil-out the retained FloatHistogram instances in their original HPoint slices, to avoid them being modified when the slice is returned to the pool.
var arithmeticAndComparisonOperationFuncs = map[parser.ItemType]binaryOperationFunc{
parser.ADD: func(lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool, bool, error) {
if hlhs == nil && hrhs == nil {
return lhs + rhs, nil, true, true, nil
}

if hlhs != nil && hrhs != nil {
res, err := hlhs.Copy().Add(hrhs)
res, err := hlhs.Add(hrhs)
if err != nil {
return 0, nil, false, true, err
}
Expand All @@ -686,7 +695,7 @@ var arithmeticAndComparisonOperationFuncs = map[parser.ItemType]binaryOperationF
}

if hlhs != nil && hrhs != nil {
res, err := hlhs.Copy().Sub(hrhs)
res, err := hlhs.Sub(hrhs)
if err != nil {
return 0, nil, false, true, err
}
Expand All @@ -701,11 +710,11 @@ var arithmeticAndComparisonOperationFuncs = map[parser.ItemType]binaryOperationF
}

if hlhs != nil && hrhs == nil {
return 0, hlhs.Copy().Mul(rhs), true, true, nil
return 0, hlhs.Mul(rhs), true, true, nil
}

if hlhs == nil && hrhs != nil {
return 0, hrhs.Copy().Mul(lhs), true, true, nil
return 0, hrhs.Mul(lhs), true, true, nil
}

return 0, nil, false, false, nil
Expand All @@ -716,7 +725,7 @@ var arithmeticAndComparisonOperationFuncs = map[parser.ItemType]binaryOperationF
}

if hlhs != nil && hrhs == nil {
return 0, hlhs.Copy().Div(rhs), true, true, nil
return 0, hlhs.Div(rhs), true, true, nil
}

return 0, nil, false, false, nil
Expand Down Expand Up @@ -753,7 +762,7 @@ var arithmeticAndComparisonOperationFuncs = map[parser.ItemType]binaryOperationF

if hlhs != nil && hrhs != nil {
if hlhs.Equals(hrhs) {
return 0, hlhs.Copy(), true, true, nil
return 0, hlhs, true, true, nil
}

return 0, nil, false, true, nil
Expand All @@ -772,7 +781,7 @@ var arithmeticAndComparisonOperationFuncs = map[parser.ItemType]binaryOperationF

if hlhs != nil && hrhs != nil {
if !hlhs.Equals(hrhs) {
return 0, hlhs.Copy(), true, true, nil
return 0, hlhs, true, true, nil
}

return 0, nil, false, true, nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/streamingpromql/types/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func (i *InstantVectorSeriesDataIterator) Reset(data InstantVectorSeriesData) {
// It returns the next point with the lowest timestamp.
// If h is not nil, the value is a histogram, otherwise it is a float.
// If no more values exist ok is false.
func (i *InstantVectorSeriesDataIterator) Next() (t int64, f float64, h *histogram.FloatHistogram, ok bool) {
func (i *InstantVectorSeriesDataIterator) Next() (t int64, f float64, h *histogram.FloatHistogram, hIndex int, ok bool) {
if i.fIndex >= len(i.data.Floats) && i.hIndex >= len(i.data.Histograms) {
return 0, 0, nil, false
return 0, 0, nil, -1, false
}

exhaustedFloats := i.fIndex >= len(i.data.Floats)
Expand All @@ -58,13 +58,13 @@ func (i *InstantVectorSeriesDataIterator) Next() (t int64, f float64, h *histogr
// Return the next float
point := i.data.Floats[i.fIndex]
i.fIndex++
return point.T, point.F, nil, true
return point.T, point.F, nil, -1, true
}

// Return the next histogram
point := i.data.Histograms[i.hIndex]
i.hIndex++
return point.T, 0, point.H, true
return point.T, 0, point.H, i.hIndex - 1, true
}

// RangeVectorStepData contains the data and timestamps associated with a single time step produced by a
Expand Down
72 changes: 37 additions & 35 deletions pkg/streamingpromql/types/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func TestInstantVectorSeriesDataIterator(t *testing.T) {
T int64
F float64
H *histogram.FloatHistogram
HIndex int
HasNext bool
}
type testCase struct {
Expand All @@ -37,10 +38,10 @@ func TestInstantVectorSeriesDataIterator(t *testing.T) {
},
},
expected: []expected{
{1000, 1.1, nil, true},
{2000, 2.2, nil, true},
{3000, 3.3, nil, true},
{0, 0, nil, false},
{1000, 1.1, nil, -1, true},
{2000, 2.2, nil, -1, true},
{3000, 3.3, nil, -1, true},
{0, 0, nil, -1, false},
},
},
{
Expand All @@ -52,9 +53,9 @@ func TestInstantVectorSeriesDataIterator(t *testing.T) {
},
},
expected: []expected{
{1500, 0, &histogram.FloatHistogram{Sum: 1500}, true},
{2500, 0, &histogram.FloatHistogram{Sum: 2500}, true},
{0, 0, nil, false},
{1500, 0, &histogram.FloatHistogram{Sum: 1500}, 0, true},
{2500, 0, &histogram.FloatHistogram{Sum: 2500}, 1, true},
{0, 0, nil, -1, false},
},
},
{
Expand All @@ -75,16 +76,16 @@ func TestInstantVectorSeriesDataIterator(t *testing.T) {
},
},
expected: []expected{
{1000, 1.1, nil, true},
{1500, 0, &histogram.FloatHistogram{Sum: 1500}, true},
{2000, 2.2, nil, true},
{2500, 0, &histogram.FloatHistogram{Sum: 2500}, true},
{3000, 3.3, nil, true},
{4000, 4.4, nil, true},
{5000, 5.5, nil, true},
{5500, 0, &histogram.FloatHistogram{Sum: 5500}, true},
{6000, 6.5, nil, true},
{0, 0, nil, false},
{1000, 1.1, nil, -1, true},
{1500, 0, &histogram.FloatHistogram{Sum: 1500}, 0, true},
{2000, 2.2, nil, -1, true},
{2500, 0, &histogram.FloatHistogram{Sum: 2500}, 1, true},
{3000, 3.3, nil, -1, true},
{4000, 4.4, nil, -1, true},
{5000, 5.5, nil, -1, true},
{5500, 0, &histogram.FloatHistogram{Sum: 5500}, 2, true},
{6000, 6.5, nil, -1, true},
{0, 0, nil, -1, false},
},
},
{
Expand All @@ -104,22 +105,22 @@ func TestInstantVectorSeriesDataIterator(t *testing.T) {
},
},
expected: []expected{
{1000, 1.1, nil, true},
{1500, 0, &histogram.FloatHistogram{Sum: 1500}, true},
{2000, 2.2, nil, true},
{2500, 0, &histogram.FloatHistogram{Sum: 2500}, true},
{3000, 3.3, nil, true},
{4000, 4.4, nil, true},
{5000, 5.5, nil, true},
{5500, 0, &histogram.FloatHistogram{Sum: 5500}, true},
{0, 0, nil, false},
{1000, 1.1, nil, -1, true},
{1500, 0, &histogram.FloatHistogram{Sum: 1500}, 0, true},
{2000, 2.2, nil, -1, true},
{2500, 0, &histogram.FloatHistogram{Sum: 2500}, 1, true},
{3000, 3.3, nil, -1, true},
{4000, 4.4, nil, -1, true},
{5000, 5.5, nil, -1, true},
{5500, 0, &histogram.FloatHistogram{Sum: 5500}, 2, true},
{0, 0, nil, -1, false},
},
},
{
name: "empty data",
data: InstantVectorSeriesData{},
expected: []expected{
{0, 0, nil, false},
{0, 0, nil, -1, false},
},
},
{
Expand All @@ -130,10 +131,10 @@ func TestInstantVectorSeriesDataIterator(t *testing.T) {
},
},
expected: []expected{
{1000, 1.1, nil, true},
{0, 0, nil, false},
{0, 0, nil, false},
{0, 0, nil, false},
{1000, 1.1, nil, -1, true},
{0, 0, nil, -1, false},
{0, 0, nil, -1, false},
{0, 0, nil, -1, false},
},
},
}
Expand All @@ -144,10 +145,11 @@ func TestInstantVectorSeriesDataIterator(t *testing.T) {
iter.Reset(tc.data)

for _, exp := range tc.expected {
timestamp, floatVal, hist, hasNext := iter.Next()
require.Equal(t, exp.T, timestamp)
require.Equal(t, exp.F, floatVal)
require.Equal(t, exp.H, hist)
ts, f, h, hIndex, hasNext := iter.Next()
require.Equal(t, exp.T, ts)
require.Equal(t, exp.F, f)
require.Equal(t, exp.H, h)
require.Equal(t, exp.HIndex, hIndex)
require.Equal(t, exp.HasNext, hasNext)
}
})
Expand Down

0 comments on commit 0ac7576

Please sign in to comment.