From 9c5c0810e6b824a4cb37b92af74d9deaa21ec7f6 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 26 Jul 2023 10:23:15 -0700 Subject: [PATCH] Remove unnecessary refresh listeners from NRTReplicationReaderManager. (#8859) * Remove unnecessary refresh listeners from NRTReplicationReaderManager. This change removes RefreshListeners used by InternalEngine to provide waitFor functionality. These listeners were previously registered onto NRT replicas only to be force released on the next refresh cycle without actually refreshing the reader. This change also removes the unnecessary blocking refresh from NRTReaderManager because we no longer have conflicting refresh invocations from scheduledRefresh. Signed-off-by: Marc Handalian * Reduce the amount of docs ingested with testPrimaryRelocation and testPrimaryRelocationWithSegRepFailure. These tests were ingesting 100-1k docs and randomly selecting a refresh policy. Wtih the IMMEDIATE refresh policy a blocking refresh is performed that increase the time required for the primary to block operations for relocation. On my machine this change reduces the test time with max docs from 1m to 5-6s. Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian Signed-off-by: Shivansh Arora --- .../SegmentReplicationRelocationIT.java | 4 ++-- .../index/engine/NRTReplicationEngine.java | 21 ++++++------------- .../engine/NRTReplicationReaderManager.java | 2 +- .../opensearch/index/shard/IndexShard.java | 3 ++- .../SegmentReplicationIndexShardTests.java | 6 ++++++ 5 files changed, 17 insertions(+), 19 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 7cf7e5148dd4a..3024eeb798b48 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -60,7 +60,7 @@ public void testPrimaryRelocation() throws Exception { createIndex(1); final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(100, 1000); + final int initialDocCount = scaledRandomIntBetween(10, 100); final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); final List> pendingIndexResponses = new ArrayList<>(); for (int i = 0; i < initialDocCount; i++) { @@ -137,7 +137,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception { createIndex(1); final String replica = internalCluster().startNode(); ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(100, 1000); + final int initialDocCount = scaledRandomIntBetween(10, 100); final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); final List> pendingIndexResponses = new ArrayList<>(); for (int i = 0; i < initialDocCount; i++) { diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index a9f7a2e70884c..b55508b7facd3 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -77,9 +77,10 @@ public NRTReplicationEngine(EngineConfig engineConfig) { this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); this.readerManager = readerManager; this.readerManager.addListener(completionStatsCache); - for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { - this.readerManager.addListener(listener); - } + // NRT Replicas do not have a concept of Internal vs External reader managers. + // We also do not want to wire up refresh listeners for waitFor & pending refresh location. + // which are the current external listeners set from IndexShard. + // Only wire up the internal listeners. for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { this.readerManager.addListener(listener); } @@ -322,22 +323,12 @@ public List segments(boolean verbose) { @Override public void refresh(String source) throws EngineException { - maybeRefresh(source); + // Refresh on this engine should only ever happen in the reader after new segments arrive. } @Override public boolean maybeRefresh(String source) throws EngineException { - ensureOpen(); - try { - return readerManager.maybeRefresh(); - } catch (IOException e) { - try { - failEngine("refresh failed source[" + source + "]", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new RefreshFailedEngineException(shardId, e); - } + return false; } @Override diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 35409437f605a..7b4c93c7235fe 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -103,7 +103,7 @@ public void updateSegments(SegmentInfos infos) throws IOException { // is always increased. infos.updateGeneration(currentInfos); currentInfos = infos; - maybeRefreshBlocking(); + maybeRefresh(); } public SegmentInfos getSegmentInfos() { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8b6d083379fe1..e43b9773cc1e0 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4423,7 +4423,8 @@ public void addRefreshListener(Translog.Location location, Consumer lis readAllowed = isReadAllowed(); } } - if (readAllowed) { + // NRT Replicas will not accept refresh listeners. + if (readAllowed && isSegmentReplicationAllowed() == false) { refreshListeners.addOrNotify(location, listener); } else { // we're not yet ready fo ready for reads, just ignore refresh cycles diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index d988e34ef18dc..0c68512f93ea6 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -120,6 +120,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException { closeShards(indexShard); } + public void testNRTReplicasDoNotAcceptRefreshListeners() throws IOException { + final IndexShard indexShard = newStartedShard(false, settings, new NRTReplicationEngineFactory()); + indexShard.addRefreshListener(mock(Translog.Location.class), Assert::assertFalse); + closeShards(indexShard); + } + public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { shards.startAll();