Skip to content

Commit

Permalink
Use Atomic vars in multithreaded env. ++num and num++ operations aren…
Browse files Browse the repository at this point in the history
…'t atomic, can't use them with volatile vars

Signed-off-by: Dmitry Kryukov <dk2k@ya.ru>
  • Loading branch information
dk2k committed Oct 16, 2024
1 parent ec7b652 commit 20ea733
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.openjdk.jmh.annotations.Warmup;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@Fork(3)
@Warmup(iterations = 10)
Expand All @@ -28,7 +29,7 @@
@State(Scope.Benchmark)
@SuppressWarnings("unused") // invoked by benchmarking framework
public class NanoTimeVsCurrentTimeMillisBenchmark {
private volatile long var = 0;
private final AtomicLong var = new AtomicLong(0);

@Benchmark
public long currentTimeMillis() {
Expand All @@ -45,6 +46,6 @@ public long nanoTime() {
* */
@Benchmark
public long accessLongVar() {
return var++;
return var.getAndIncrement();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static org.opensearch.action.support.TransportActions.isShardNotAvailableException;
Expand Down Expand Up @@ -226,7 +227,7 @@ class AsyncShardsAction {
private final ActionListener<FieldCapabilitiesIndexResponse> listener;
private final GroupShardsIterator<ShardIterator> shardsIt;

private volatile int shardIndex = 0;
private final AtomicInteger shardIndex = new AtomicInteger(0);

private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener<FieldCapabilitiesIndexResponse> listener) {
this.listener = listener;
Expand Down Expand Up @@ -263,11 +264,11 @@ private void onFailure(ShardRouting shardRouting, Exception e) {
}

private ShardRouting nextRoutingOrNull(Exception failure) {
if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) {
if (shardsIt.size() == 0 || shardIndex.get() >= shardsIt.size()) {
return null;
}
ShardRouting next = FailAwareWeightedRouting.getInstance()
.findNext(shardsIt.get(shardIndex), clusterService.state(), failure, this::moveToNextShard);
.findNext(shardsIt.get(shardIndex.get()), clusterService.state(), failure, this::moveToNextShard);

if (next != null) {
return next;
Expand All @@ -277,7 +278,7 @@ private ShardRouting nextRoutingOrNull(Exception failure) {
}

private void moveToNextShard() {
++shardIndex;
shardIndex.incrementAndGet();

Check warning on line 281 in server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java#L281

Added line #L281 was not covered by tests
}

private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand Down Expand Up @@ -155,7 +156,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
aggsList,
topDocsList,
topDocsStats,
pendingMerges.numReducePhases,
pendingMerges.numReducePhases.get(),
false,
aggReduceContextBuilder,
performFinalReduce
Expand Down Expand Up @@ -239,7 +240,7 @@ private MergeResult partialReduce(
}

public int getNumReducePhases() {
return pendingMerges.numReducePhases;
return pendingMerges.numReducePhases.get();
}

/**
Expand All @@ -264,7 +265,7 @@ private class PendingMerges implements Releasable {
private final SearchPhaseController.TopDocsStats topDocsStats;
private volatile MergeResult mergeResult;
private volatile boolean hasPartialReduce;
private volatile int numReducePhases;
private final AtomicInteger numReducePhases = new AtomicInteger();

PendingMerges(int batchReduceSize, int trackTotalHitsUpTo) {
this.batchReduceSize = batchReduceSize;
Expand Down Expand Up @@ -448,8 +449,8 @@ protected void doRun() {
long estimatedMergeSize = estimateRamBytesUsedForReduce(estimatedTotalSize);
addEstimateAndMaybeBreak(estimatedMergeSize);
estimatedTotalSize += estimatedMergeSize;
++numReducePhases;
newMerge = partialReduce(toConsume, task.emptyResults, topDocsStats, thisMergeResult, numReducePhases);
numReducePhases.incrementAndGet();
newMerge = partialReduce(toConsume, task.emptyResults, topDocsStats, thisMergeResult, numReducePhases.get());
} catch (Exception t) {
onMergeFailure(t);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A plain iterator
Expand All @@ -44,29 +45,26 @@
public class PlainIterator<T> implements Iterable<T>, Countable {
private final List<T> elements;

// Calls to nextOrNull might be performed on different threads in the transport actions so we need the volatile
// keyword in order to ensure visibility. Note that it is fine to use `volatile` for a counter in that case given
// that although nextOrNull might be called from different threads, it can never happen concurrently.
private volatile int index;
private AtomicInteger index = new AtomicInteger();

public PlainIterator(List<T> elements) {
this.elements = elements;
reset();
}

public void reset() {
index = 0;
index = new AtomicInteger(0);
}

public int remaining() {
return elements.size() - index;
return elements.size() - index.get();
}

public T nextOrNull() {
if (index == elements.size()) {
if (index.get() == elements.size()) {
return null;
} else {
return elements.get(index++);
return elements.get(index.getAndIncrement());
}
}

Expand Down

0 comments on commit 20ea733

Please sign in to comment.