Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fix flaky test MoreExpressionIT.testSpecialValueVariable in concurrent search path #11261

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,6 @@ public void testInvalidFieldMember() {
}

public void testSpecialValueVariable() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/10079",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
// i.e. _value for aggregations
createIndex("test");
ensureGreen("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ class ExpressionAggregationScript implements AggregationScript.LeafFactory {
final SimpleBindings bindings;
final DoubleValuesSource source;
final boolean needsScore;
final ReplaceableConstDoubleValueSource specialValue; // _value
final PerThreadReplaceableConstDoubleValueSource specialValue; // _value

ExpressionAggregationScript(Expression e, SimpleBindings b, boolean n, ReplaceableConstDoubleValueSource v) {
ExpressionAggregationScript(Expression e, SimpleBindings b, boolean n, PerThreadReplaceableConstDoubleValueSource v) {
exprScript = e;
bindings = b;
source = exprScript.getDoubleValuesSource(bindings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,14 @@ private static AggregationScript.LeafFactory newAggregationScript(
// instead of complicating SimpleBindings (which should stay simple)
SimpleBindings bindings = new SimpleBindings();
boolean needsScores = false;
ReplaceableConstDoubleValueSource specialValue = null;
PerThreadReplaceableConstDoubleValueSource specialValue = null;
for (String variable : expr.variables) {
try {
if (variable.equals("_score")) {
bindings.add("_score", DoubleValuesSource.SCORES);
needsScores = true;
} else if (variable.equals("_value")) {
specialValue = new ReplaceableConstDoubleValueSource();
specialValue = new PerThreadReplaceableConstDoubleValueSource();
bindings.add("_value", specialValue);
// noop: _value is special for aggregations, and is handled in ExpressionScriptBindings
// TODO: if some uses it in a scoring expression, they will get a nasty failure when evaluating...need a
Expand Down Expand Up @@ -388,15 +388,15 @@ private static ScoreScript.LeafFactory newScoreScript(Expression expr, SearchLoo
// NOTE: if we need to do anything complicated with bindings in the future, we can just extend Bindings,
// instead of complicating SimpleBindings (which should stay simple)
SimpleBindings bindings = new SimpleBindings();
ReplaceableConstDoubleValueSource specialValue = null;
PerThreadReplaceableConstDoubleValueSource specialValue = null;
boolean needsScores = false;
for (String variable : expr.variables) {
try {
if (variable.equals("_score")) {
bindings.add("_score", DoubleValuesSource.SCORES);
needsScores = true;
} else if (variable.equals("_value")) {
specialValue = new ReplaceableConstDoubleValueSource();
specialValue = new PerThreadReplaceableConstDoubleValueSource();
bindings.add("_value", specialValue);
// noop: _value is special for aggregations, and is handled in ExpressionScriptBindings
// TODO: if some uses it in a scoring expression, they will get a nasty failure when evaluating...need a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,25 @@
import org.apache.lucene.search.IndexSearcher;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* A {@link DoubleValuesSource} which has a stub {@link DoubleValues} that holds a dynamically replaceable constant double.
* A {@link DoubleValuesSource} which has a stub {@link DoubleValues} that holds a dynamically replaceable constant double. This is made
* thread-safe for concurrent segment search use case by keeping the {@link DoubleValues} per thread. Any update to the value happens in
* thread specific {@link DoubleValuesSource} instance.
*/
final class ReplaceableConstDoubleValueSource extends DoubleValuesSource {
final ReplaceableConstDoubleValues fv;
final class PerThreadReplaceableConstDoubleValueSource extends DoubleValuesSource {
// Multiple slices can be processed by same thread but that will be sequential, so keeping per thread is fine
final Map<Long, ReplaceableConstDoubleValues> perThreadDoubleValues;

ReplaceableConstDoubleValueSource() {
fv = new ReplaceableConstDoubleValues();
PerThreadReplaceableConstDoubleValueSource() {
perThreadDoubleValues = new ConcurrentHashMap<>();
}

@Override
public DoubleValues getValues(LeafReaderContext ctx, DoubleValues scores) throws IOException {
return fv;
return perThreadDoubleValues.computeIfAbsent(Thread.currentThread().getId(), threadId -> new ReplaceableConstDoubleValues());
}

@Override
Expand All @@ -62,7 +67,11 @@ public boolean needsScores() {

@Override
public Explanation explain(LeafReaderContext ctx, int docId, Explanation scoreExplanation) throws IOException {
if (fv.advanceExact(docId)) return Explanation.match((float) fv.doubleValue(), "ReplaceableConstDoubleValues");
final ReplaceableConstDoubleValues currentFv = perThreadDoubleValues.computeIfAbsent(
Thread.currentThread().getId(),
threadId -> new ReplaceableConstDoubleValues()
);
if (currentFv.advanceExact(docId)) return Explanation.match((float) currentFv.doubleValue(), "ReplaceableConstDoubleValues");
else return Explanation.noMatch("ReplaceableConstDoubleValues");
}

Expand All @@ -77,7 +86,11 @@ public int hashCode() {
}

public void setValue(double v) {
fv.setValue(v);
final ReplaceableConstDoubleValues currentFv = perThreadDoubleValues.computeIfAbsent(
Thread.currentThread().getId(),
threadId -> new ReplaceableConstDoubleValues()
);
currentFv.setValue(v);
}

@Override
Expand Down