-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: make histogramQuantile handle case of zero samples #5419
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -251,51 +251,97 @@ func (t histogramQuantileTransformation) Process(id execute.DatasetID, tbl flux. | |
}) | ||
} | ||
|
||
q, ok, err := t.computeQuantile(cdf) | ||
result, err := t.computeQuantile(cdf) | ||
if err != nil { | ||
return err | ||
} | ||
if !ok { | ||
if result.action == drop { | ||
return nil | ||
} | ||
if err := execute.AppendKeyValues(tbl.Key(), builder); err != nil { | ||
return err | ||
} | ||
if err := builder.AppendFloat(valueIdx, q); err != nil { | ||
return err | ||
if result.action == appendValue { | ||
if err := builder.AppendFloat(valueIdx, result.v); err != nil { | ||
return err | ||
} | ||
} else { | ||
// action is appendNil | ||
if err := builder.AppendNil(valueIdx); err != nil { | ||
return err | ||
} | ||
|
||
} | ||
return nil | ||
} | ||
|
||
func (t *histogramQuantileTransformation) computeQuantile(cdf []bucket) (float64, bool, error) { | ||
type quantileAction int | ||
|
||
const ( | ||
appendValue quantileAction = iota | ||
appendNil | ||
drop | ||
) | ||
|
||
type quantileResult struct { | ||
action quantileAction | ||
v float64 | ||
} | ||
|
||
// isMonotonic will check if the buckets are monotonic and | ||
// return true if so. | ||
// | ||
// If force is set, it will force them to be monotonic | ||
// by assuming no increase from the previous bucket. | ||
// When force is set, this function will always return true. | ||
func isMonotonic(force bool, cdf []bucket) bool { | ||
prevCount := 0.0 | ||
for i := range cdf { | ||
if cdf[i].count < prevCount { | ||
if force { | ||
cdf[i].count = prevCount | ||
} else { | ||
return false | ||
} | ||
} else { | ||
prevCount = cdf[i].count | ||
} | ||
} | ||
return true | ||
} | ||
|
||
func (t *histogramQuantileTransformation) computeQuantile(cdf []bucket) (quantileResult, error) { | ||
if len(cdf) == 0 { | ||
return 0, false, errors.New(codes.FailedPrecondition, "histogram is empty") | ||
return quantileResult{}, errors.New(codes.FailedPrecondition, "histogram is empty") | ||
} | ||
|
||
if !isMonotonic(t.spec.OnNonmonotonic == onNonmonotonicForce, cdf) { | ||
switch t.spec.OnNonmonotonic { | ||
case onNonmonotonicError: | ||
return quantileResult{}, errors.New(codes.FailedPrecondition, "histogram records counts are not monotonic") | ||
case onNonmonotonicDrop: | ||
return quantileResult{action: drop}, nil | ||
default: | ||
// "force" is not possible because isMonotonic will fix the buckets | ||
return quantileResult{}, errors.Newf(codes.Internal, "unknown or unexpected value for onNonmonotonic: %q", t.spec.OnNonmonotonic) | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sometimes the histogram buckets are not monotonic (which they should be if they are cumulative) due to late-arriving data on the edge. The Checking for monotonicity first (and fixing if needed and requested by the user) avoids a bug that occurred when the total observation count was pulled from the last bucket before it was "fixed" in the case of forcing monotonicity. This is not really related to the issue the user found but I saw it here and fixed it. The test case |
||
|
||
// Find rank index and check counts are monotonic | ||
prevCount := 0.0 | ||
totalCount := cdf[len(cdf)-1].count | ||
if totalCount == 0 { | ||
// Produce a null value if there were no samples | ||
return quantileResult{action: appendNil}, nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is where we bail and produce a null value for the case of zero observations. |
||
|
||
rank := t.spec.Quantile * totalCount | ||
rankIdx := -1 | ||
for i, b := range cdf { | ||
if b.count < prevCount { | ||
switch t.spec.OnNonmonotonic { | ||
case onNonmonotonicError: | ||
return 0, false, errors.New(codes.FailedPrecondition, "histogram records counts are not monotonic") | ||
case onNonmonotonicForce: | ||
b.count = prevCount | ||
case onNonmonotonicDrop: | ||
return 0, false, nil | ||
default: | ||
return 0, false, errors.Newf(codes.Internal, "unknown value for onNonmonotonic: %q", t.spec.OnNonmonotonic) | ||
} | ||
} else { | ||
prevCount = b.count | ||
} | ||
|
||
if rank >= b.count { | ||
rankIdx = i | ||
} | ||
} | ||
|
||
var ( | ||
lowerCount, | ||
lowerBound, | ||
|
@@ -311,7 +357,7 @@ func (t *histogramQuantileTransformation) computeQuantile(cdf []bucket) (float64 | |
upperBound = cdf[0].upperBound | ||
case len(cdf) - 1: | ||
// Quantile is above the highest upper bound, simply return it as it must be finite | ||
return cdf[len(cdf)-1].upperBound, true, nil | ||
return quantileResult{action: appendValue, v: cdf[len(cdf)-1].upperBound}, nil | ||
default: | ||
lowerCount = cdf[rankIdx].count | ||
lowerBound = cdf[rankIdx].upperBound | ||
|
@@ -320,19 +366,19 @@ func (t *histogramQuantileTransformation) computeQuantile(cdf []bucket) (float64 | |
} | ||
if rank == lowerCount { | ||
// No need to interpolate | ||
return lowerBound, true, nil | ||
return quantileResult{action: appendValue, v: lowerBound}, nil | ||
} | ||
if math.IsInf(lowerBound, -1) { | ||
// We cannot interpolate with infinity | ||
return upperBound, true, nil | ||
return quantileResult{action: appendValue, v: upperBound}, nil | ||
} | ||
if math.IsInf(upperBound, 1) { | ||
// We cannot interpolate with infinity | ||
return lowerBound, true, nil | ||
return quantileResult{action: appendValue, v: lowerBound}, nil | ||
} | ||
// Compute quantile using linear interpolation | ||
scale := (rank - lowerCount) / (upperCount - lowerCount) | ||
return lowerBound + (upperBound-lowerBound)*scale, true, nil | ||
return quantileResult{action: appendValue, v: lowerBound + (upperBound-lowerBound)*scale}, nil | ||
} | ||
|
||
func (t histogramQuantileTransformation) UpdateWatermark(id execute.DatasetID, mark execute.Time) error { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue here was that the Flux stdlib function
histogram_quantile
gets a wrong answer for some input data.This function accepts a cumulative distribution function (a cumulative histogram produced from the input table data) and produces the requested quantile.
When the cdf contains all zeroes, this function would return the bound of the last histogram bucket, which is incorrect. The right thing to do for that case is to return a null value, since we can't compute a quantile if we didn't actually receive any observations.