From 836cc311eaf557b9324d3f64e04199c4fa54c622 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Sun, 9 Jul 2023 11:39:34 -0700 Subject: [PATCH] Fix flakyness in RemoteStoreRefreshListenerIT (#8547) (#8551) --------- (cherry picked from commit fa3412131280f2e3943a08f06d82aaaccc063df4) Signed-off-by: Ashish Singh <ssashish@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> --- .../remotestore/RemoteStoreRefreshListenerIT.java | 1 - .../index/shard/RemoteStoreRefreshListener.java | 14 ++++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index 13f76adc8a5a7..4005e6359a2f7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -27,7 +27,6 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7703") public void testRemoteRefreshRetryOnFailure() throws Exception { Path location = randomRepoPath().toAbsolutePath(); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 93c0aad8c41c5..002c42064aef0 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -46,7 +46,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -95,9 +94,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private long primaryTerm; /** - * Semaphore that ensures there is only 1 retry scheduled at any time. + * This boolean is used to ensure that there is only 1 retry scheduled/running at any time. */ - private final Semaphore SCHEDULE_RETRY_PERMITS = new Semaphore(1); + private final AtomicBoolean retryScheduled = new AtomicBoolean(false); private volatile Iterator<TimeValue> backoffDelayIterator; @@ -311,6 +310,9 @@ private void onSuccessfulSegmentsSync( private void cancelAndResetScheduledCancellableRetry() { if (scheduledCancellableRetry != null && scheduledCancellableRetry.getDelay(TimeUnit.NANOSECONDS) > 0) { scheduledCancellableRetry.cancel(); + // Since we are cancelling the retry attempt as an internal/external refresh happened already before the retry job could be + // started and the current run successfully uploaded the segments. + retryScheduled.set(false); } scheduledCancellableRetry = null; } @@ -323,14 +325,14 @@ private void resetBackOffDelayIterator() { } private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) { - // If this was a retry attempt, then we release the semaphore at the end so that further retries can be scheduled + // If this was a retry attempt, then we set the retryScheduled to false so that the next retry (if needed) can be scheduled if (isRetry) { - SCHEDULE_RETRY_PERMITS.release(); + retryScheduled.set(false); } // If there are failures in uploading segments, then we should retry as search idle can lead to // refresh not occurring until write happens. - if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && SCHEDULE_RETRY_PERMITS.tryAcquire()) { + if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retryScheduled.compareAndSet(false, true)) { scheduledCancellableRetry = indexShard.getThreadPool() .schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH); }