diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index d345578c7de6e..2a7e8e58b2d03 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -875,69 +875,84 @@ public void testPressureServiceStats() throws Exception { waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); + // get shard references. + final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); + final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); + logger.info("Replica aid {}", replicaShard.routingEntry().allocationId()); + logger.info("former primary aid {}", primaryShard.routingEntry().allocationId()); + + // fetch pressure stats from the Primary's Node. SegmentReplicationPressureService pressureService = internalCluster().getInstance( SegmentReplicationPressureService.class, primaryNode ); - final Map shardStats = pressureService.nodeStats().getShardStats(); - assertEquals(1, shardStats.size()); - final IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); - IndexShard replica = getIndexShard(replicaNode, INDEX_NAME); - SegmentReplicationPerGroupStats groupStats = shardStats.get(primaryShard.shardId()); - Set replicaStats = groupStats.getReplicaStats(); - assertEquals(1, replicaStats.size()); - - // assert replica node returns nothing. + // Fetch pressure stats from the Replica's Node we will assert replica node returns nothing until it is promoted. SegmentReplicationPressureService replicaNode_service = internalCluster().getInstance( SegmentReplicationPressureService.class, replicaNode ); + + final Map shardStats = pressureService.nodeStats().getShardStats(); + assertEquals("We should have stats returned for the replication group", 1, shardStats.size()); + + SegmentReplicationPerGroupStats groupStats = shardStats.get(primaryShard.shardId()); + Set replicaStats = groupStats.getReplicaStats(); + assertAllocationIdsInReplicaShardStats(Set.of(replicaShard.routingEntry().allocationId().getId()), replicaStats); + assertTrue(replicaNode_service.nodeStats().getShardStats().isEmpty()); - // drop the primary, this won't hand off SR state. + // drop the primary, this won't hand off pressure stats between old/new primary. internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); ensureYellowAndNoInitializingShards(INDEX_NAME); - replicaNode_service = internalCluster().getInstance(SegmentReplicationPressureService.class, replicaNode); - replica = getIndexShard(replicaNode, INDEX_NAME); - assertTrue("replica should be promoted as a primary", replica.routingEntry().primary()); - assertEquals(1, replicaNode_service.nodeStats().getShardStats().size()); - // we don't have a replica assigned yet, so this should be 0. - assertEquals(0, replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats().size()); + + assertTrue("replica should be promoted as a primary", replicaShard.routingEntry().primary()); + assertEquals( + "We should have stats returned for the replication group", + 1, + replicaNode_service.nodeStats().getShardStats().size() + ); + // after the primary is dropped and replica is promoted we won't have a replica assigned yet, so stats per replica should return + // empty. + replicaStats = replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats(); + assertTrue(replicaStats.isEmpty()); // start another replica. String replicaNode_2 = internalCluster().startDataOnlyNode(); ensureGreen(INDEX_NAME); - String docId = String.valueOf(initialDocCount + 1); - client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get(); - refresh(INDEX_NAME); - waitForSearchableDocs(initialDocCount + 1, replicaNode_2); + final IndexShard secondReplicaShard = getIndexShard(replicaNode_2, INDEX_NAME); + final String second_replica_aid = secondReplicaShard.routingEntry().allocationId().getId(); + waitForSearchableDocs(initialDocCount, replicaNode_2); - replicaNode_service = internalCluster().getInstance(SegmentReplicationPressureService.class, replicaNode); - replica = getIndexShard(replicaNode_2, INDEX_NAME); - assertEquals(1, replicaNode_service.nodeStats().getShardStats().size()); - replicaStats = replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats(); - assertEquals(1, replicaStats.size()); + assertEquals( + "We should have stats returned for the replication group", + 1, + replicaNode_service.nodeStats().getShardStats().size() + ); + replicaStats = replicaNode_service.nodeStats().getShardStats().get(replicaShard.shardId()).getReplicaStats(); + assertAllocationIdsInReplicaShardStats(Set.of(second_replica_aid), replicaStats); + final SegmentReplicationShardStats replica_entry = replicaStats.stream().findFirst().get(); + assertEquals(replica_entry.getCheckpointsBehindCount(), 0); // test a checkpoint without any new segments flush(INDEX_NAME); assertBusy(() -> { - final SegmentReplicationPressureService service = internalCluster().getInstance( - SegmentReplicationPressureService.class, - replicaNode - ); - assertEquals(1, service.nodeStats().getShardStats().size()); - final Set shardStatsSet = service.nodeStats() + assertEquals(1, replicaNode_service.nodeStats().getShardStats().size()); + final Set shardStatsSet = replicaNode_service.nodeStats() .getShardStats() - .get(primaryShard.shardId()) + .get(replicaShard.shardId()) .getReplicaStats(); - assertEquals(1, shardStatsSet.size()); + assertAllocationIdsInReplicaShardStats(Set.of(second_replica_aid), shardStatsSet); final SegmentReplicationShardStats stats = shardStatsSet.stream().findFirst().get(); assertEquals(0, stats.getCheckpointsBehindCount()); }); } } + private void assertAllocationIdsInReplicaShardStats(Set expected, Set replicaStats) { + assertEquals(expected, replicaStats.stream().map(SegmentReplicationShardStats::getAllocationId).collect(Collectors.toSet())); + } + /** * Tests a scroll query on the replica * @throws Exception when issue is encountered diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java index 7426ca2f13f84..7cf7e5148dd4a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -22,6 +22,8 @@ import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -29,8 +31,10 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -528,13 +532,27 @@ public void testFlushAfterRelocation() throws Exception { client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); } - // Verify segment replication event never happened on replica shard + final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); + + // Verify segment replication event never happened on replica shard other than recovery. + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), 0); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), 0); + SegmentReplicationStatsResponse segmentReplicationStatsResponse = client().admin() .indices() .prepareSegmentReplicationStats(INDEX_NAME) .execute() .actionGet(); - assertTrue(segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0).getReplicaStats().isEmpty()); + final Set replicaStats = segmentReplicationStatsResponse.getReplicationStats() + .get(INDEX_NAME) + .get(0) + .getReplicaStats(); + assertEquals( + Set.of(replicaShard.routingEntry().allocationId().getId()), + replicaStats.stream().map(SegmentReplicationShardStats::getAllocationId).collect(Collectors.toSet()) + ); + // the primary still has not refreshed to update its checkpoint, so our replica is not yet behind. + assertEquals(0, replicaStats.stream().findFirst().get().getCheckpointsBehindCount()); // Relocate primary to new primary. When new primary starts it does perform a flush. logger.info("--> relocate the shard from primary to newPrimary"); diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 4d6cffa58510f..19fe9ee97cd2f 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -247,7 +247,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final Consumer onReplicationGroupUpdated; - private volatile ReplicationCheckpoint lastPublishedReplicationCheckpoint; + private volatile ReplicationCheckpoint latestReplicationCheckpoint; /** * Get all retention leases tracked on this shard. @@ -1054,6 +1054,7 @@ public ReplicationTracker( this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; this.onReplicationGroupUpdated = onReplicationGroupUpdated; + this.latestReplicationCheckpoint = indexSettings.isSegRepEnabled() ? ReplicationCheckpoint.empty(shardId) : null; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } @@ -1212,26 +1213,42 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation */ public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) { assert indexSettings.isSegRepEnabled(); - assert handoffInProgress == false; - if (checkpoint.equals(lastPublishedReplicationCheckpoint) == false) { - this.lastPublishedReplicationCheckpoint = checkpoint; - for (Map.Entry entry : checkpoints.entrySet()) { - if (entry.getKey().equals(this.shardAllocationId) == false) { - final CheckpointState cps = entry.getValue(); - if (cps.inSync) { - cps.checkpointTimers.computeIfAbsent(checkpoint, ignored -> { - final ReplicationTimer replicationTimer = new ReplicationTimer(); - replicationTimer.start(); - return replicationTimer; - }); - logger.trace( - () -> new ParameterizedMessage( - "updated last published checkpoint to {} - timers [{}]", - checkpoint, - cps.checkpointTimers.keySet() - ) - ); - } + if (checkpoint.equals(latestReplicationCheckpoint) == false) { + this.latestReplicationCheckpoint = checkpoint; + } + if (primaryMode) { + startReplicationLagTimers(); + } + } + + public ReplicationCheckpoint getLatestReplicationCheckpoint() { + return this.latestReplicationCheckpoint; + } + + private void startReplicationLagTimers() { + for (Map.Entry entry : checkpoints.entrySet()) { + final String allocationId = entry.getKey(); + if (allocationId.equals(this.shardAllocationId) == false) { + final CheckpointState cps = entry.getValue(); + // if the shard is in checkpoints but is unavailable or out of sync we will not track its replication state. + // it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event. + if (cps.inSync + && replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false + && latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)) { + cps.checkpointTimers.computeIfAbsent(latestReplicationCheckpoint, ignored -> { + final ReplicationTimer replicationTimer = new ReplicationTimer(); + replicationTimer.start(); + return replicationTimer; + }); + logger.trace( + () -> new ParameterizedMessage( + "updated last published checkpoint for {} at visible cp {} to {} - timers [{}]", + allocationId, + cps.visibleReplicationCheckpoint, + latestReplicationCheckpoint, + cps.checkpointTimers.keySet() + ) + ); } } } @@ -1244,12 +1261,17 @@ public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint ch */ public synchronized Set getSegmentReplicationStats() { assert indexSettings.isSegRepEnabled(); - final ReplicationCheckpoint lastPublishedCheckpoint = this.lastPublishedReplicationCheckpoint; - if (primaryMode && lastPublishedCheckpoint != null) { + if (primaryMode) { return this.checkpoints.entrySet() .stream() - .filter(entry -> entry.getKey().equals(this.shardAllocationId) == false && entry.getValue().inSync) - .map(entry -> buildShardStats(lastPublishedCheckpoint.getLength(), entry.getKey(), entry.getValue())) + // filter out this shard's allocation id, any shards that are out of sync or unavailable (shard marked in-sync but has not + // been assigned to a node). + .filter( + entry -> entry.getKey().equals(this.shardAllocationId) == false + && entry.getValue().inSync + && replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false + ) + .map(entry -> buildShardStats(latestReplicationCheckpoint.getLength(), entry.getKey(), entry.getValue())) .collect(Collectors.toUnmodifiableSet()); } return Collections.emptySet(); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index da01023ace47c..7549f3450e7f2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -33,6 +33,10 @@ public class ReplicationCheckpoint implements Writeable, Comparable tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync)); - // get insync ids, filter out the primary. - final Set inSyncAllocationIds = tracker.getReplicationGroup() - .getInSyncAllocationIds() - .stream() - .filter(id -> tracker.shardAllocationId.equals(id) == false) - .collect(Collectors.toSet()); + + initializingIds.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint( tracker.shardId(), @@ -1831,8 +1827,10 @@ public void testSegmentReplicationCheckpointTracking() { tracker.setLatestReplicationCheckpoint(secondCheckpoint); tracker.setLatestReplicationCheckpoint(thirdCheckpoint); + final Set expectedIds = ids(initializingIds); + Set groupStats = tracker.getSegmentReplicationStats(); - assertEquals(inSyncAllocationIds.size(), groupStats.size()); + assertEquals(expectedIds.size(), groupStats.size()); for (SegmentReplicationShardStats shardStat : groupStats) { assertEquals(3, shardStat.getCheckpointsBehindCount()); assertEquals(100L, shardStat.getBytesBehindCount()); @@ -1840,7 +1838,7 @@ public void testSegmentReplicationCheckpointTracking() { // simulate replicas moved up to date. final Map checkpoints = tracker.checkpoints; - for (String id : inSyncAllocationIds) { + for (String id : expectedIds) { final ReplicationTracker.CheckpointState checkpointState = checkpoints.get(id); assertEquals(3, checkpointState.checkpointTimers.size()); tracker.updateVisibleCheckpointForShard(id, initialCheckpoint); @@ -1848,13 +1846,13 @@ public void testSegmentReplicationCheckpointTracking() { } groupStats = tracker.getSegmentReplicationStats(); - assertEquals(inSyncAllocationIds.size(), groupStats.size()); + assertEquals(expectedIds.size(), groupStats.size()); for (SegmentReplicationShardStats shardStat : groupStats) { assertEquals(2, shardStat.getCheckpointsBehindCount()); assertEquals(99L, shardStat.getBytesBehindCount()); } - for (String id : inSyncAllocationIds) { + for (String id : expectedIds) { final ReplicationTracker.CheckpointState checkpointState = checkpoints.get(id); assertEquals(2, checkpointState.checkpointTimers.size()); tracker.updateVisibleCheckpointForShard(id, thirdCheckpoint); @@ -1862,7 +1860,7 @@ public void testSegmentReplicationCheckpointTracking() { } groupStats = tracker.getSegmentReplicationStats(); - assertEquals(inSyncAllocationIds.size(), groupStats.size()); + assertEquals(expectedIds.size(), groupStats.size()); for (SegmentReplicationShardStats shardStat : groupStats) { assertEquals(0, shardStat.getCheckpointsBehindCount()); assertEquals(0L, shardStat.getBytesBehindCount()); @@ -1883,19 +1881,24 @@ public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() { AllocationId primaryId = activeAllocationIds.iterator().next(); IndexShardRoutingTable routingTable = routingTable(initializingIds, primaryId); final ReplicationTracker tracker = newTracker(primaryId, settings); - tracker.updateFromClusterManager(initialClusterStateVersion, ids(activeAllocationIds), routingTable); tracker.activatePrimaryMode(NO_OPS_PERFORMED); - assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds))); - assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable)); - assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync)); - // get insync ids, filter out the primary. - final Set inSyncAllocationIds = tracker.getReplicationGroup() - .getInSyncAllocationIds() - .stream() - .filter(id -> tracker.shardAllocationId.equals(id) == false) - .collect(Collectors.toSet()); + initializingIds.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); + + assertEquals(tracker.getReplicationGroup().getRoutingTable(), routingTable); + assertEquals( + "All active & initializing ids are now marked in-sync", + Sets.union(ids(activeAllocationIds), ids(initializingIds)), + tracker.getReplicationGroup().getInSyncAllocationIds() + ); + + assertEquals( + "Active ids are in-sync but still unavailable", + tracker.getReplicationGroup().getUnavailableInSyncShards(), + Sets.difference(ids(activeAllocationIds), Set.of(primaryId.getId())) + ); + assertTrue(activeAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync)); final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint( tracker.shardId(), @@ -1907,15 +1910,20 @@ public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() { ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); + // we expect that the only returned ids from getSegmentReplicationStats will be the initializing ids we marked with + // markAsTrackingAndInSyncQuietly. + // This is because the ids marked active initially are still unavailable (don't have an associated routing entry). + final Set expectedIds = ids(initializingIds); Set groupStats = tracker.getSegmentReplicationStats(); - assertEquals(inSyncAllocationIds.size(), groupStats.size()); + final Set actualIds = groupStats.stream().map(SegmentReplicationShardStats::getAllocationId).collect(Collectors.toSet()); + assertEquals(expectedIds, actualIds); for (SegmentReplicationShardStats shardStat : groupStats) { assertEquals(1, shardStat.getCheckpointsBehindCount()); } // simulate replicas moved up to date. final Map checkpoints = tracker.checkpoints; - for (String id : inSyncAllocationIds) { + for (String id : expectedIds) { final ReplicationTracker.CheckpointState checkpointState = checkpoints.get(id); assertEquals(1, checkpointState.checkpointTimers.size()); tracker.updateVisibleCheckpointForShard(id, initialCheckpoint);