diff --git a/CHANGELOG.md b/CHANGELOG.md index e4779231977b9..6aa18ce0064ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introduce a setting to disable download of full cluster state from remote on term mismatch([#16798](https://github.com/opensearch-project/OpenSearch/pull/16798/)) - Added ability to retrieve value from DocValues in a flat_object filed([#16802](https://github.com/opensearch-project/OpenSearch/pull/16802)) - Improve performace of NumericTermAggregation by avoiding unnecessary sorting([#17252](https://github.com/opensearch-project/OpenSearch/pull/17252)) +- Implemented computation of segment replication stats at shard level ([#17055](https://github.com/opensearch-project/OpenSearch/pull/17055)) - [Rule Based Auto-tagging] Add in-memory attribute value store ([#17342](https://github.com/opensearch-project/OpenSearch/pull/17342)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 7fd219a3dd9dc..2d0918ff6e89a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -114,6 +114,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Stream; @@ -136,6 +137,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength; +import static org.mockito.Mockito.mock; public class IndexShardIT extends OpenSearchSingleNodeTestCase { @@ -716,7 +718,8 @@ public static final IndexShard newIndexShard( null, DefaultRemoteStoreSettings.INSTANCE, false, - IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting) + IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting), + mock(Function.class) ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index 89aef6f0be1a6..5d69799e32647 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -404,19 +404,17 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception { for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { ReplicationStats replicationStats = nodeStats.getIndices().getSegments().getReplicationStats(); - // primary node - should hold replication statistics + // primary node - do not have any replication statistics if (nodeStats.getNode().getName().equals(primaryNode)) { + assertTrue(replicationStats.getMaxBytesBehind() == 0); + assertTrue(replicationStats.getTotalBytesBehind() == 0); + assertTrue(replicationStats.getMaxReplicationLag() == 0); + } + // replica nodes - should hold replication statistics + if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) { assertTrue(replicationStats.getMaxBytesBehind() > 0); assertTrue(replicationStats.getTotalBytesBehind() > 0); assertTrue(replicationStats.getMaxReplicationLag() > 0); - // 2 replicas so total bytes should be double of max - assertEquals(replicationStats.getMaxBytesBehind() * 2, replicationStats.getTotalBytesBehind()); - } - // replica nodes - should hold empty replication statistics - if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) { - assertEquals(0, replicationStats.getMaxBytesBehind()); - assertEquals(0, replicationStats.getTotalBytesBehind()); - assertEquals(0, replicationStats.getMaxReplicationLag()); } } // get replication statistics at index level diff --git a/server/src/main/java/org/opensearch/common/io/IndexIOStreamHandlerFactory.java b/server/src/main/java/org/opensearch/common/io/IndexIOStreamHandlerFactory.java new file mode 100644 index 0000000000000..a4ad161d0ced3 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/io/IndexIOStreamHandlerFactory.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io; + +/** + * Interface for factory to provide handler implementation for type {@link T} + * @param The type of content to be read/written to stream + * + * @opensearch.internal + */ +public interface IndexIOStreamHandlerFactory { + + /** + * Implements logic to provide handler based on the stream versions + * @param version stream version + * @return Handler for reading/writing content streams to/from - {@link T} + */ + IndexIOStreamHandler getHandler(int version); +} diff --git a/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java b/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java index 8089d354a2480..b62ae1f1d3956 100644 --- a/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java +++ b/server/src/main/java/org/opensearch/common/io/VersionedCodecStreamWrapper.java @@ -28,18 +28,25 @@ public class VersionedCodecStreamWrapper { private static final Logger logger = LogManager.getLogger(VersionedCodecStreamWrapper.class); - // TODO This can be updated to hold a streamReadWriteHandlerFactory and get relevant handler based on the stream versions - private final IndexIOStreamHandler indexIOStreamHandler; + private final IndexIOStreamHandlerFactory indexIOStreamHandlerFactory; + private final int minVersion; private final int currentVersion; private final String codec; /** - * @param indexIOStreamHandler handler to read/write stream from T + * @param indexIOStreamHandlerFactory factory for providing handler to read/write stream from T + * @param minVersion earliest supported version of the stream * @param currentVersion latest supported version of the stream * @param codec: stream codec */ - public VersionedCodecStreamWrapper(IndexIOStreamHandler indexIOStreamHandler, int currentVersion, String codec) { - this.indexIOStreamHandler = indexIOStreamHandler; + public VersionedCodecStreamWrapper( + IndexIOStreamHandlerFactory indexIOStreamHandlerFactory, + int minVersion, + int currentVersion, + String codec + ) { + this.indexIOStreamHandlerFactory = indexIOStreamHandlerFactory; + this.minVersion = minVersion; this.currentVersion = currentVersion; this.codec = codec; } @@ -87,7 +94,7 @@ public void writeStream(IndexOutput indexOutput, T content) throws IOException { */ private int checkHeader(IndexInput indexInput) throws IOException { // TODO Once versioning strategy is decided we'll add support for min/max supported versions - return CodecUtil.checkHeader(indexInput, this.codec, this.currentVersion, this.currentVersion); + return CodecUtil.checkHeader(indexInput, this.codec, minVersion, this.currentVersion); } /** @@ -120,8 +127,6 @@ private void writeFooter(IndexOutput indexOutput) throws IOException { * @param version stream content version */ private IndexIOStreamHandler getHandlerForVersion(int version) { - // TODO implement factory and pick relevant handler based on version. - // It should also take into account min and max supported versions - return this.indexIOStreamHandler; + return this.indexIOStreamHandlerFactory.getHandler(version); } } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 52dd92f31d70b..7016ddb8e59b8 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -57,6 +57,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; @@ -652,7 +653,8 @@ public IndexService newIndexService( clusterDefaultRefreshIntervalSupplier, recoverySettings, remoteStoreSettings, - (s) -> {} + (s) -> {}, + shardId -> ReplicationStats.empty() ); } @@ -678,7 +680,8 @@ public IndexService newIndexService( Supplier clusterDefaultRefreshIntervalSupplier, RecoverySettings recoverySettings, RemoteStoreSettings remoteStoreSettings, - Consumer replicator + Consumer replicator, + Function segmentReplicationStatsProvider ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -740,7 +743,8 @@ public IndexService newIndexService( remoteStoreSettings, fileCache, compositeIndexSettings, - replicator + replicator, + segmentReplicationStatsProvider ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 72d723c7e1199..e265ce3590121 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -197,6 +197,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final FileCache fileCache; private final CompositeIndexSettings compositeIndexSettings; private final Consumer replicator; + private final Function segmentReplicationStatsProvider; public IndexService( IndexSettings indexSettings, @@ -235,7 +236,8 @@ public IndexService( RemoteStoreSettings remoteStoreSettings, FileCache fileCache, CompositeIndexSettings compositeIndexSettings, - Consumer replicator + Consumer replicator, + Function segmentReplicationStatsProvider ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -322,6 +324,7 @@ public IndexService( this.compositeIndexSettings = compositeIndexSettings; this.fileCache = fileCache; this.replicator = replicator; + this.segmentReplicationStatsProvider = segmentReplicationStatsProvider; updateFsyncTaskIfNecessary(); } @@ -398,7 +401,8 @@ public IndexService( remoteStoreSettings, null, null, - s -> {} + s -> {}, + (shardId) -> ReplicationStats.empty() ); } @@ -694,7 +698,8 @@ protected void closeInternal() { recoverySettings, remoteStoreSettings, seedRemote, - discoveryNodes + discoveryNodes, + segmentReplicationStatsProvider ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/ReplicationStats.java b/server/src/main/java/org/opensearch/index/ReplicationStats.java index 8987a492e9a90..22628e86d309f 100644 --- a/server/src/main/java/org/opensearch/index/ReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/ReplicationStats.java @@ -42,6 +42,10 @@ public ReplicationStats(StreamInput in) throws IOException { this.maxReplicationLag = in.readVLong(); } + public static ReplicationStats empty() { + return new ReplicationStats(); + } + public ReplicationStats() { } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index df841dac4cf8e..f8ad3fc8cf866 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -361,6 +361,7 @@ Runnable getGlobalCheckpointSyncer() { */ private final ShardMigrationState shardMigrationState; private DiscoveryNodes discoveryNodes; + private final Function segmentReplicationStatsProvider; public IndexShard( final ShardRouting shardRouting, @@ -391,7 +392,8 @@ public IndexShard( final RecoverySettings recoverySettings, final RemoteStoreSettings remoteStoreSettings, boolean seedRemote, - final DiscoveryNodes discoveryNodes + final DiscoveryNodes discoveryNodes, + final Function segmentReplicationStatsProvider ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -493,6 +495,7 @@ public boolean shouldCache(Query query) { this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings); this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote); this.discoveryNodes = discoveryNodes; + this.segmentReplicationStatsProvider = segmentReplicationStatsProvider; } public ThreadPool getThreadPool() { @@ -3233,17 +3236,10 @@ public Set getReplicationStatsForTrackedReplicas() } public ReplicationStats getReplicationStats() { - if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary()) { - final Set stats = getReplicationStatsForTrackedReplicas(); - long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L); - long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum(); - long maxReplicationLag = stats.stream() - .mapToLong(SegmentReplicationShardStats::getCurrentReplicationLagMillis) - .max() - .orElse(0L); - return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag); - } - return new ReplicationStats(); + if (indexSettings.isSegRepEnabledOrRemoteNode() && !routingEntry().primary()) { + return segmentReplicationStatsProvider.apply(shardId); + } + return ReplicationStats.empty(); } /** diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 46a90da2a18b6..c18902b69d23c 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -38,7 +38,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; -import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandlerFactory; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.threadpool.ThreadPool; @@ -104,7 +104,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private Map segmentsUploadedToRemoteStore; private static final VersionedCodecStreamWrapper metadataStreamWrapper = new VersionedCodecStreamWrapper<>( - new RemoteSegmentMetadataHandler(), + new RemoteSegmentMetadataHandlerFactory(), + RemoteSegmentMetadata.VERSION_ONE, RemoteSegmentMetadata.CURRENT_VERSION, RemoteSegmentMetadata.METADATA_CODEC ); diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java index 41a145273e8ef..463e08918b3f7 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java @@ -30,10 +30,15 @@ */ @PublicApi(since = "2.6.0") public class RemoteSegmentMetadata { + + public static final int VERSION_ONE = 1; + + public static final int VERSION_TWO = 2; + /** * Latest supported version of metadata */ - public static final int CURRENT_VERSION = 1; + public static final int CURRENT_VERSION = VERSION_TWO; /** * Metadata codec */ @@ -106,6 +111,11 @@ public static Map f ); } + /** + * Write always writes with the latest version of the RemoteSegmentMetadata + * @param out file output stream which will store stream content + * @throws IOException in case there is a problem writing the file + */ public void write(IndexOutput out) throws IOException { out.writeMapOfStrings(toMapOfStrings()); writeCheckpointToIndexOutput(replicationCheckpoint, out); @@ -113,11 +123,18 @@ public void write(IndexOutput out) throws IOException { out.writeBytes(segmentInfosBytes, segmentInfosBytes.length); } - public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException { + /** + * Read can happen in the upgraded version of replica which needs to support all versions of RemoteSegmentMetadata + * @param indexInput file input stream + * @param version version of the RemoteSegmentMetadata + * @return {@code RemoteSegmentMetadata} + * @throws IOException in case there is a problem reading from the file input stream + */ + public static RemoteSegmentMetadata read(IndexInput indexInput, int version) throws IOException { Map metadata = indexInput.readMapOfStrings(); final Map uploadedSegmentMetadataMap = RemoteSegmentMetadata .fromMapOfStrings(metadata); - ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap); + ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap, version); int byteArraySize = (int) indexInput.readLong(); byte[] segmentInfosBytes = new byte[byteArraySize]; indexInput.readBytes(segmentInfosBytes, 0, byteArraySize); @@ -136,11 +153,13 @@ public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicatio out.writeLong(replicationCheckpoint.getSegmentInfosVersion()); out.writeLong(replicationCheckpoint.getLength()); out.writeString(replicationCheckpoint.getCodec()); + out.writeLong(replicationCheckpoint.getCreatedTimeStamp()); } private static ReplicationCheckpoint readCheckpointFromIndexInput( IndexInput in, - Map uploadedSegmentMetadataMap + Map uploadedSegmentMetadataMap, + int version ) throws IOException { return new ReplicationCheckpoint( new ShardId(new Index(in.readString(), in.readString()), in.readVInt()), @@ -149,7 +168,8 @@ private static ReplicationCheckpoint readCheckpointFromIndexInput( in.readLong(), in.readLong(), in.readString(), - toStoreFileMetadata(uploadedSegmentMetadataMap) + toStoreFileMetadata(uploadedSegmentMetadataMap), + version >= VERSION_TWO ? in.readLong() : 0 ); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandler.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandler.java index 3077d8c76ddae..9fa76b38d2b07 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandler.java +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandler.java @@ -20,6 +20,13 @@ * @opensearch.internal */ public class RemoteSegmentMetadataHandler implements IndexIOStreamHandler { + + private final int version; + + public RemoteSegmentMetadataHandler(int version) { + this.version = version; + } + /** * Reads metadata content from metadata file input stream and parsed into {@link RemoteSegmentMetadata} * @param indexInput metadata file input stream with {@link IndexInput#getFilePointer()} pointing to metadata content @@ -27,7 +34,7 @@ public class RemoteSegmentMetadataHandler implements IndexIOStreamHandler { + private final AtomicReference> handlerRef = new AtomicReference<>(); + + @Override + public IndexIOStreamHandler getHandler(int version) { + IndexIOStreamHandler current = handlerRef.get(); + if (current != null) { + return current; + } + + IndexIOStreamHandler newHandler = createHandler(version); + handlerRef.compareAndSet(null, newHandler); + return handlerRef.get(); + } + + private IndexIOStreamHandler createHandler(int version) { + return switch (version) { + case RemoteSegmentMetadata.VERSION_ONE -> new RemoteSegmentMetadataHandler(RemoteSegmentMetadata.VERSION_ONE); + case RemoteSegmentMetadata.VERSION_TWO -> new RemoteSegmentMetadataHandler(RemoteSegmentMetadata.VERSION_TWO); + default -> throw new IllegalArgumentException("Unsupported RemoteSegmentMetadata version: " + version); + }; + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 1e621d6cb7688..d410f473c71f1 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -75,7 +75,8 @@ public class TranslogTransferManager { private final Logger logger; private static final VersionedCodecStreamWrapper metadataStreamWrapper = new VersionedCodecStreamWrapper<>( - new TranslogTransferMetadataHandler(), + new TranslogTransferMetadataHandlerFactory(), + TranslogTransferMetadata.CURRENT_VERSION, TranslogTransferMetadata.CURRENT_VERSION, TranslogTransferMetadata.METADATA_CODEC ); diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerFactory.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerFactory.java new file mode 100644 index 0000000000000..8f8e3e816d665 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerFactory.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.common.io.IndexIOStreamHandler; +import org.opensearch.common.io.IndexIOStreamHandlerFactory; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * {@link TranslogTransferMetadataHandlerFactory} is a factory class to create {@link TranslogTransferMetadataHandler} + * instances based on the {@link TranslogTransferMetadata} version + * + * @opensearch.internal + */ +public class TranslogTransferMetadataHandlerFactory implements IndexIOStreamHandlerFactory { + + private final ConcurrentHashMap> handlers = new ConcurrentHashMap<>(); + + @Override + public IndexIOStreamHandler getHandler(int version) { + return handlers.computeIfAbsent(version, this::createHandler); + } + + private IndexIOStreamHandler createHandler(int version) { + return switch (version) { + case TranslogTransferMetadata.CURRENT_VERSION -> new TranslogTransferMetadataHandler(); + default -> throw new IllegalArgumentException("Unsupported TranslogTransferMetadata version: " + version); + }; + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index d679240955a07..527c2c23ba6b1 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -105,6 +105,7 @@ import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.IngestionConsumerFactory; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.compositeindex.CompositeIndexSettings; @@ -365,6 +366,7 @@ public class IndicesService extends AbstractLifecycleComponent private final FileCache fileCache; private final CompositeIndexSettings compositeIndexSettings; private final Consumer replicator; + private final Function segmentReplicationStatsProvider; private volatile int maxSizeInRequestCache; @Override @@ -404,7 +406,8 @@ public IndicesService( RemoteStoreSettings remoteStoreSettings, FileCache fileCache, CompositeIndexSettings compositeIndexSettings, - Consumer replicator + Consumer replicator, + Function segmentReplicationStatsProvider ) { this.settings = settings; this.threadPool = threadPool; @@ -515,6 +518,7 @@ protected void closeInternal() { this.compositeIndexSettings = compositeIndexSettings; this.fileCache = fileCache; this.replicator = replicator; + this.segmentReplicationStatsProvider = segmentReplicationStatsProvider; this.maxSizeInRequestCache = INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING, this::setMaxSizeInRequestCache); @@ -581,6 +585,7 @@ public IndicesService( remoteStoreSettings, null, null, + null, null ); } @@ -998,7 +1003,8 @@ private synchronized IndexService createIndexService( this::getClusterDefaultRefreshInterval, this.recoverySettings, this.remoteStoreSettings, - replicator + replicator, + segmentReplicationStatsProvider ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index b06b3e0497cf7..30d9c362b6269 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -110,7 +110,6 @@ public void getSegmentFiles( return; } logger.debug("Downloading segment files from remote store {}", filesToFetch); - if (remoteMetadataExists()) { final Directory storeDirectory = indexShard.store().directory(); final Collection directoryFiles = List.of(storeDirectory.listAll()); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7131b49a41834..64bd73ebb4611 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Locale; import java.util.Set; +import java.util.function.BiConsumer; import java.util.stream.Collectors; /** @@ -161,7 +162,7 @@ public void writeFileChunk( * * @param listener {@link ActionListener} listener. */ - public void startReplication(ActionListener listener) { + public void startReplication(ActionListener listener, BiConsumer checkpointUpdater) { cancellableThreads.setOnCancel((reason, beforeCancelEx) -> { throw new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]"); }); @@ -177,6 +178,8 @@ public void startReplication(ActionListener listener) { source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); checkpointInfoListener.whenComplete(checkpointInfo -> { + checkpointUpdater.accept(checkpointInfo.getCheckpoint(), this.indexShard); + final List filesToFetch = getFiles(checkpointInfo); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 8fee3f671ecc9..d57f35a5079fc 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -22,7 +22,6 @@ import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; -import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.transport.TransportResponse; @@ -49,7 +48,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -58,7 +56,6 @@ /** * Service class that handles incoming checkpoints to initiate replication events on replicas. - * * @opensearch.internal */ public class SegmentReplicationTargetService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { @@ -70,8 +67,6 @@ public class SegmentReplicationTargetService extends AbstractLifecycleComponent private final SegmentReplicationSourceFactory sourceFactory; - protected final Map latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap(); - private final IndicesService indicesService; private final ClusterService clusterService; private final TransportService transportService; @@ -216,7 +211,6 @@ public void clusterChanged(ClusterChangedEvent event) { public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()) { replicator.cancel(indexShard.shardId(), "Shard closing"); - latestReceivedCheckpoint.remove(shardId); } } @@ -227,6 +221,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh @Override public void afterIndexShardStarted(IndexShard indexShard) { if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() && indexShard.routingEntry().primary() == false) { + replicator.initializeStats(indexShard.shardId()); processLatestReceivedCheckpoint(indexShard, Thread.currentThread()); } } @@ -241,7 +236,6 @@ public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting ol && oldRouting.primary() == false && newRouting.primary()) { replicator.cancel(indexShard.shardId(), "Shard has been promoted to primary"); - latestReceivedCheckpoint.remove(indexShard.shardId()); } } @@ -468,7 +462,7 @@ private DiscoveryNode getPrimaryNode(ShardRouting primaryShard) { // visible to tests protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread) { - final ReplicationCheckpoint latestPublishedCheckpoint = latestReceivedCheckpoint.get(replicaShard.shardId()); + final ReplicationCheckpoint latestPublishedCheckpoint = replicator.getPrimaryCheckpoint(replicaShard.shardId()); if (latestPublishedCheckpoint != null) { logger.trace( () -> new ParameterizedMessage( @@ -481,7 +475,7 @@ protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Threa // if we retry ensure the shard is not in the process of being closed. // it will be removed from indexService's collection before the shard is actually marked as closed. if (indicesService.getShardOrNull(replicaShard.shardId()) != null) { - onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard); + onNewCheckpoint(replicator.getPrimaryCheckpoint(replicaShard.shardId()), replicaShard); } }; // Checks if we are using same thread and forks if necessary. @@ -497,13 +491,7 @@ protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Threa // visible to tests protected void updateLatestReceivedCheckpoint(ReplicationCheckpoint receivedCheckpoint, IndexShard replicaShard) { - if (latestReceivedCheckpoint.get(replicaShard.shardId()) != null) { - if (receivedCheckpoint.isAheadOf(latestReceivedCheckpoint.get(replicaShard.shardId()))) { - latestReceivedCheckpoint.replace(replicaShard.shardId(), receivedCheckpoint); - } - } else { - latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint); - } + replicator.updateReplicationCheckpointStats(receivedCheckpoint, replicaShard); } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java index ad3bc1933208c..b8a5774c21c1f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java @@ -19,8 +19,10 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationFailedException; @@ -29,6 +31,10 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; /** * This class is responsible for managing segment replication events on replicas. @@ -43,8 +49,11 @@ public class SegmentReplicator { private final ReplicationCollection onGoingReplications; private final Map completedReplications = ConcurrentCollections.newConcurrentMap(); - private final ThreadPool threadPool; + private final ConcurrentMap> replicationCheckpointStats = + ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap primaryCheckpoint = ConcurrentCollections.newConcurrentMap(); + private final ThreadPool threadPool; private final SetOnce sourceFactory; public SegmentReplicator(ThreadPool threadPool) { @@ -102,6 +111,135 @@ SegmentReplicationTarget startReplication( return target; } + /** + * Retrieves segment replication statistics for a specific shard. + * Its computed based on the last and first entry in the replicationCheckpointStats map. + * The Last entry gives the Bytes behind, and the difference in the first and last entry provides the lag. + * + * @param shardId The shardId to get statistics for + * @return ReplicationStats containing bytes behind and replication lag information + */ + public ReplicationStats getSegmentReplicationStats(final ShardId shardId) { + final ConcurrentNavigableMap existingCheckpointStats = replicationCheckpointStats.get(shardId); + if (existingCheckpointStats == null || existingCheckpointStats.isEmpty()) { + return ReplicationStats.empty(); + } + + Map.Entry lowestEntry = existingCheckpointStats.firstEntry(); + Map.Entry highestEntry = existingCheckpointStats.lastEntry(); + + long bytesBehind = highestEntry.getValue().getBytesBehind(); + long replicationLag = bytesBehind > 0L + ? TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lowestEntry.getValue().getTimestamp()) + : 0; + + return new ReplicationStats(bytesBehind, bytesBehind, replicationLag); + } + + /** + * Updates the latest checkpoint of the primary for the replica shard and then + * calculates checkpoint statistics for the replica shard with the latest checkpoint information. + * This method maintains statistics about how far behind replica shards are from the primary. + * It calculates the bytes behind by comparing the latest-received and current checkpoint in the indexShard, + * and it maintains the bytes behind and timestamp for each segmentInfosVersion of latestCheckPoint. + *
+     * Example:
+     * {
+     *     [replica][0] : {
+     *                       7 : {bytesBehind=0, timestamp=1700220000000}
+     *                       8 : {bytesBehind=100, timestamp=1700330000000}
+     *                       9 : {bytesBehind=150, timestamp=1700440000000}
+     *                    }
+     * }
+     * 
+ * @param latestReceivedCheckPoint The most recent checkpoint from the primary + * @param indexShard The index shard where its updated + */ + public void updateReplicationCheckpointStats(final ReplicationCheckpoint latestReceivedCheckPoint, final IndexShard indexShard) { + ReplicationCheckpoint primaryCheckPoint = this.primaryCheckpoint.get(indexShard.shardId()); + if (primaryCheckPoint == null || latestReceivedCheckPoint.isAheadOf(primaryCheckPoint)) { + this.primaryCheckpoint.put(indexShard.shardId(), latestReceivedCheckPoint); + calculateReplicationCheckpointStats(latestReceivedCheckPoint, indexShard); + } + } + + /** + * Removes checkpoint statistics for all checkpoints up to and including the last successful sync + * and recalculates the bytes behind value for the last replicationCheckpointStats entry. + * This helps maintain only relevant checkpoint information and clean up old data. + * + * @param indexShard The index shard to prune checkpoints for + */ + protected void pruneCheckpointsUpToLastSync(final IndexShard indexShard) { + ReplicationCheckpoint latestCheckpoint = this.primaryCheckpoint.get(indexShard.shardId()); + if (latestCheckpoint != null) { + ReplicationCheckpoint indexReplicationCheckPoint = indexShard.getLatestReplicationCheckpoint(); + long segmentInfoVersion = indexReplicationCheckPoint.getSegmentInfosVersion(); + final ConcurrentNavigableMap existingCheckpointStats = replicationCheckpointStats.get( + indexShard.shardId() + ); + + if (existingCheckpointStats != null && !existingCheckpointStats.isEmpty()) { + existingCheckpointStats.keySet().removeIf(key -> key < segmentInfoVersion); + Map.Entry lastEntry = existingCheckpointStats.lastEntry(); + if (lastEntry != null) { + lastEntry.getValue().setBytesBehind(calculateBytesBehind(latestCheckpoint, indexReplicationCheckPoint)); + } + } + } + } + + private void calculateReplicationCheckpointStats(final ReplicationCheckpoint latestReceivedCheckPoint, final IndexShard indexShard) { + ReplicationCheckpoint indexShardReplicationCheckpoint = indexShard.getLatestReplicationCheckpoint(); + if (indexShardReplicationCheckpoint != null) { + long segmentInfosVersion = latestReceivedCheckPoint.getSegmentInfosVersion(); + long bytesBehind = calculateBytesBehind(latestReceivedCheckPoint, indexShardReplicationCheckpoint); + if (bytesBehind > 0) { + ConcurrentNavigableMap existingCheckpointStats = replicationCheckpointStats.get( + indexShard.shardId() + ); + if (existingCheckpointStats != null) { + existingCheckpointStats.computeIfAbsent( + segmentInfosVersion, + k -> new ReplicationCheckpointStats(bytesBehind, latestReceivedCheckPoint.getCreatedTimeStamp()) + ); + } + } + } + } + + private long calculateBytesBehind(final ReplicationCheckpoint latestCheckPoint, final ReplicationCheckpoint replicationCheckpoint) { + Store.RecoveryDiff diff = Store.segmentReplicationDiff(latestCheckPoint.getMetadataMap(), replicationCheckpoint.getMetadataMap()); + + return diff.missing.stream().mapToLong(StoreFileMetadata::length).sum(); + } + + public void initializeStats(ShardId shardId) { + replicationCheckpointStats.computeIfAbsent(shardId, k -> new ConcurrentSkipListMap<>()); + } + + private static class ReplicationCheckpointStats { + private long bytesBehind; + private final long timestamp; + + public ReplicationCheckpointStats(long bytesBehind, long timestamp) { + this.bytesBehind = bytesBehind; + this.timestamp = timestamp; + } + + public long getBytesBehind() { + return bytesBehind; + } + + public void setBytesBehind(long bytesBehind) { + this.bytesBehind = bytesBehind; + } + + public long getTimestamp() { + return timestamp; + } + } + /** * Runnable implementation to trigger a replication event. */ @@ -138,6 +276,7 @@ private void start(final long replicationId) { @Override public void onResponse(Void o) { logger.debug(() -> new ParameterizedMessage("Finished replicating {} marking as done.", target.description())); + pruneCheckpointsUpToLastSync(target.indexShard()); onGoingReplications.markAsDone(replicationId); if (target.state().getIndex().recoveredFileCount() != 0 && target.state().getIndex().recoveredBytes() != 0) { completedReplications.put(target.shardId(), target.state()); @@ -153,7 +292,7 @@ public void onFailure(Exception e) { } onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false); } - }); + }, this::updateReplicationCheckpointStats); } // pkg-private for integration tests @@ -197,12 +336,18 @@ int size() { void cancel(ShardId shardId, String reason) { onGoingReplications.cancelForShard(shardId, reason); + replicationCheckpointStats.remove(shardId); + primaryCheckpoint.remove(shardId); } SegmentReplicationTarget get(ShardId shardId) { return onGoingReplications.getOngoingReplicationTarget(shardId); } + ReplicationCheckpoint getPrimaryCheckpoint(ShardId shardId) { + return primaryCheckpoint.getOrDefault(shardId, ReplicationCheckpoint.empty(shardId)); + } + ReplicationCollection.ReplicationRef get(long id) { return onGoingReplications.get(id); } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 29410159a4955..8380187a288ba 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -38,6 +38,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable metadataMap; + private final long createdTimeStamp; public static ReplicationCheckpoint empty(ShardId shardId) { return empty(shardId, ""); @@ -55,10 +56,11 @@ private ReplicationCheckpoint(ShardId shardId, String codec) { length = 0L; this.codec = codec; this.metadataMap = Collections.emptyMap(); + this.createdTimeStamp = System.nanoTime(); } public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) { - this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap()); + this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap(), System.nanoTime()); } public ReplicationCheckpoint( @@ -77,6 +79,27 @@ public ReplicationCheckpoint( this.length = length; this.codec = codec; this.metadataMap = metadataMap; + this.createdTimeStamp = System.nanoTime(); + } + + public ReplicationCheckpoint( + ShardId shardId, + long primaryTerm, + long segmentsGen, + long segmentInfosVersion, + long length, + String codec, + Map metadataMap, + long createdTimeStamp + ) { + this.shardId = shardId; + this.primaryTerm = primaryTerm; + this.segmentsGen = segmentsGen; + this.segmentInfosVersion = segmentInfosVersion; + this.length = length; + this.codec = codec; + this.metadataMap = metadataMap; + this.createdTimeStamp = createdTimeStamp; } public ReplicationCheckpoint(StreamInput in) throws IOException { @@ -96,6 +119,11 @@ public ReplicationCheckpoint(StreamInput in) throws IOException { } else { this.metadataMap = Collections.emptyMap(); } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.createdTimeStamp = in.readLong(); + } else { + this.createdTimeStamp = 0; + } } /** @@ -159,6 +187,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeLong(createdTimeStamp); + } } @Override @@ -197,6 +228,10 @@ public Map getMetadataMap() { return metadataMap; } + public long getCreatedTimeStamp() { + return createdTimeStamp; + } + @Override public String toString() { return "ReplicationCheckpoint{" @@ -212,6 +247,8 @@ public String toString() { + length + ", codec=" + codec + + ", timestamp=" + + createdTimeStamp + '}'; } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e1e5e4a3b455e..222c6e8ba36c4 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -957,7 +957,8 @@ protected Node( remoteStoreSettings, fileCache, compositeIndexSettings, - segmentReplicator::startReplication + segmentReplicator::startReplication, + segmentReplicator::getSegmentReplicationStats ); final IngestService ingestService = new IngestService( diff --git a/server/src/test/java/org/opensearch/common/io/VersionedCodecStreamWrapperTests.java b/server/src/test/java/org/opensearch/common/io/VersionedCodecStreamWrapperTests.java index 938337fc5146e..a88df528bcb86 100644 --- a/server/src/test/java/org/opensearch/common/io/VersionedCodecStreamWrapperTests.java +++ b/server/src/test/java/org/opensearch/common/io/VersionedCodecStreamWrapperTests.java @@ -38,16 +38,19 @@ public class VersionedCodecStreamWrapperTests extends OpenSearchTestCase { private static final int VERSION = 1; IndexIOStreamHandler ioStreamHandler; + IndexIOStreamHandlerFactory ioStreamHandlerFactory; VersionedCodecStreamWrapper versionedCodecStreamWrapper; @Before public void setup() throws IOException { + ioStreamHandlerFactory = mock(IndexIOStreamHandlerFactory.class); ioStreamHandler = mock(IndexIOStreamHandler.class); - versionedCodecStreamWrapper = new VersionedCodecStreamWrapper(ioStreamHandler, VERSION, CODEC); + versionedCodecStreamWrapper = new VersionedCodecStreamWrapper(ioStreamHandlerFactory, VERSION, VERSION, CODEC); } public void testReadStream() throws IOException { DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandlerFactory.getHandler(VERSION)).thenReturn(ioStreamHandler); when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); DummyObject readData = versionedCodecStreamWrapper.readStream(createHeaderFooterBytes(CODEC, VERSION, true, true)); assertEquals(readData, expectedObject); @@ -55,6 +58,7 @@ public void testReadStream() throws IOException { public void testReadWithOldVersionThrowsException() throws IOException { DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandlerFactory.getHandler(VERSION)).thenReturn(ioStreamHandler); when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); assertThrows( IndexFormatTooOldException.class, @@ -64,6 +68,7 @@ public void testReadWithOldVersionThrowsException() throws IOException { public void testReadWithNewVersionThrowsException() throws IOException { DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandlerFactory.getHandler(VERSION)).thenReturn(ioStreamHandler); when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); assertThrows( IndexFormatTooNewException.class, @@ -73,6 +78,7 @@ public void testReadWithNewVersionThrowsException() throws IOException { public void testReadWithUnexpectedCodecThrowsException() throws IOException { DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandlerFactory.getHandler(VERSION)).thenReturn(ioStreamHandler); when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); assertThrows( CorruptIndexException.class, @@ -82,6 +88,7 @@ public void testReadWithUnexpectedCodecThrowsException() throws IOException { public void testReadWithNoHeaderThrowsException() throws IOException { DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandlerFactory.getHandler(VERSION)).thenReturn(ioStreamHandler); when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); assertThrows( CorruptIndexException.class, @@ -91,6 +98,7 @@ public void testReadWithNoHeaderThrowsException() throws IOException { public void testReadWithNoFooterThrowsException() throws IOException { DummyObject expectedObject = new DummyObject("test read"); + when(ioStreamHandlerFactory.getHandler(VERSION)).thenReturn(ioStreamHandler); when(ioStreamHandler.readContent(any())).thenReturn(expectedObject); assertThrows( CorruptIndexException.class, @@ -102,6 +110,7 @@ public void testWriteStream() throws IOException { DummyObject expectedObject = new DummyObject("test read"); BytesStreamOutput output = new BytesStreamOutput(); OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("dummy bytes", "dummy stream", output, 4096); + when(ioStreamHandlerFactory.getHandler(VERSION)).thenReturn(ioStreamHandler); doAnswer(invocation -> { IndexOutput io = invocation.getArgument(0); io.writeString("test write"); diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index bd86d3d396987..90f2b0b21cc8a 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -265,7 +265,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { () -> IndexSettings.DEFAULT_REFRESH_INTERVAL, DefaultRecoverySettings.INSTANCE, DefaultRemoteStoreSettings.INSTANCE, - s -> {} + s -> {}, + null ); } diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index 233a99cbe4a73..899e80965e4fd 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -1844,7 +1844,8 @@ public void testSegmentReplicationCheckpointTracking() { 1, 1L, Codec.getDefault().getName(), - Map.of("segment_1", segment_1) + Map.of("segment_1", segment_1), + 0L ); final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( tracker.shardId(), @@ -1853,7 +1854,8 @@ public void testSegmentReplicationCheckpointTracking() { 2, 51L, Codec.getDefault().getName(), - Map.of("segment_1", segment_1, "segment_2", segment_2) + Map.of("segment_1", segment_1, "segment_2", segment_2), + 0L ); final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint( tracker.shardId(), @@ -1862,7 +1864,8 @@ public void testSegmentReplicationCheckpointTracking() { 3, 151L, Codec.getDefault().getName(), - Map.of("segment_1", segment_1, "segment_2", segment_2, "segment_3", segment_3) + Map.of("segment_1", segment_1, "segment_2", segment_2, "segment_3", segment_3), + 0L ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); @@ -1974,7 +1977,8 @@ public void testSegmentReplicationCheckpointForRelocatingPrimary() { 1, 5L, Codec.getDefault().getName(), - Map.of("segment_1", segment_1) + Map.of("segment_1", segment_1), + 0L ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); tracker.startReplicationLagTimers(initialCheckpoint); @@ -2033,7 +2037,8 @@ public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() { 1, 1L, Codec.getDefault().getName(), - Collections.emptyMap() + Collections.emptyMap(), + 0L ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); tracker.startReplicationLagTimers(initialCheckpoint); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index df3df81361a12..d673eb49be581 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -37,7 +37,7 @@ import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; -import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandlerFactory; import org.opensearch.test.MockLogAppender; import org.opensearch.test.junit.annotations.TestLogging; import org.opensearch.threadpool.ThreadPool; @@ -696,7 +696,8 @@ public void testUploadMetadataNonEmpty() throws IOException { eq(IOContext.DEFAULT) ); VersionedCodecStreamWrapper streamWrapper = new VersionedCodecStreamWrapper<>( - new RemoteSegmentMetadataHandler(), + new RemoteSegmentMetadataHandlerFactory(), + RemoteSegmentMetadata.CURRENT_VERSION, RemoteSegmentMetadata.CURRENT_VERSION, RemoteSegmentMetadata.METADATA_CODEC ); @@ -840,7 +841,7 @@ public void testHeaderMaxVersionCorruptIndexException() throws IOException { BytesStreamOutput output = new BytesStreamOutput(); OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput("segment metadata", "metadata output stream", output, 4096); - CodecUtil.writeHeader(indexOutput, RemoteSegmentMetadata.METADATA_CODEC, 2); + CodecUtil.writeHeader(indexOutput, RemoteSegmentMetadata.METADATA_CODEC, 3); indexOutput.writeMapOfStrings(metadata); CodecUtil.writeFooter(indexOutput); indexOutput.close(); @@ -1115,7 +1116,7 @@ public void testSegmentMetadataCurrentVersion() { If author doesn't want to support old metadata files. Then this can be ignored. After taking appropriate action, fix this test by setting the correct version here */ - assertEquals(RemoteSegmentMetadata.CURRENT_VERSION, 1); + assertEquals(RemoteSegmentMetadata.CURRENT_VERSION, 2); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerFactoryTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerFactoryTests.java new file mode 100644 index 0000000000000..6911b84c58e4d --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerFactoryTests.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.metadata; + +import org.opensearch.common.io.IndexIOStreamHandler; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +/** + * Unit tests for {@link org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandlerFactoryTests}. + */ +public class RemoteSegmentMetadataHandlerFactoryTests extends OpenSearchTestCase { + + private RemoteSegmentMetadataHandlerFactory segmentMetadataHandlerFactory; + + @Before + public void setup() { + segmentMetadataHandlerFactory = new RemoteSegmentMetadataHandlerFactory(); + } + + public void testGetHandlerReturnsBasedOnVersion() { + IndexIOStreamHandler versionOneHandler = segmentMetadataHandlerFactory.getHandler(1); + assertTrue(versionOneHandler instanceof RemoteSegmentMetadataHandler); + IndexIOStreamHandler versionTwoHandler = segmentMetadataHandlerFactory.getHandler(2); + assertTrue(versionTwoHandler instanceof RemoteSegmentMetadataHandler); + } + + public void testGetHandlerWhenCalledMultipleTimesReturnsCachedHandler() { + IndexIOStreamHandler versionTwoHandlerOne = segmentMetadataHandlerFactory.getHandler(2); + IndexIOStreamHandler versionTwoHandlerTwo = segmentMetadataHandlerFactory.getHandler(2); + assertEquals(versionTwoHandlerOne, versionTwoHandlerTwo); + } + + public void testGetHandlerWhenHandlerNotProvidedThrowsException() { + Throwable throwable = assertThrows(IllegalArgumentException.class, () -> { segmentMetadataHandlerFactory.getHandler(3); }); + assertEquals("Unsupported RemoteSegmentMetadata version: 3", throwable.getMessage()); + } +} diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java index f5d54dc790e76..0a668bba28c74 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java @@ -44,7 +44,7 @@ public class RemoteSegmentMetadataHandlerTests extends IndexShardTestCase { @Before public void setup() throws IOException { - remoteSegmentMetadataHandler = new RemoteSegmentMetadataHandler(); + remoteSegmentMetadataHandler = new RemoteSegmentMetadataHandler(2); Settings indexSettings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerFactoryTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerFactoryTests.java new file mode 100644 index 0000000000000..767037160980e --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferMetadataHandlerFactoryTests.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.common.io.IndexIOStreamHandler; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +/** + * Unit tests for {@link org.opensearch.index.translog.transfer.TranslogTransferMetadataHandlerFactoryTests}. + */ +public class TranslogTransferMetadataHandlerFactoryTests extends OpenSearchTestCase { + + private TranslogTransferMetadataHandlerFactory translogTransferMetadataHandlerFactory; + + @Before + public void setup() { + translogTransferMetadataHandlerFactory = new TranslogTransferMetadataHandlerFactory(); + } + + public void testGetHandlerReturnsBasedOnVersion() { + IndexIOStreamHandler versionOneHandler = translogTransferMetadataHandlerFactory.getHandler(1); + assertTrue(versionOneHandler instanceof TranslogTransferMetadataHandler); + } + + public void testGetHandlerWhenCalledMultipleTimesReturnsCachedHandler() { + IndexIOStreamHandler versionTwoHandlerOne = translogTransferMetadataHandlerFactory.getHandler(1); + IndexIOStreamHandler versionTwoHandlerTwo = translogTransferMetadataHandlerFactory.getHandler(1); + assertEquals(versionTwoHandlerOne, versionTwoHandlerTwo); + } + + public void testGetHandlerWhenHandlerNotProvidedThrowsException() { + Throwable throwable = assertThrows(IllegalArgumentException.class, () -> { translogTransferMetadataHandlerFactory.getHandler(2); }); + assertEquals("Unsupported TranslogTransferMetadata version: 2", throwable.getMessage()); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1faaa16ce5628..8a47b87b09f30 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -100,8 +100,6 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private SegmentReplicationState state; private ReplicationCheckpoint initialCheckpoint; - private ClusterState clusterState; - private static final long TRANSPORT_TIMEOUT = 30000;// 30sec @Override @@ -140,13 +138,14 @@ public void setUp() throws Exception { indicesService = mock(IndicesService.class); ClusterService clusterService = mock(ClusterService.class); - clusterState = mock(ClusterState.class); + ClusterState clusterState = mock(ClusterState.class); RoutingTable mockRoutingTable = mock(RoutingTable.class); when(clusterService.state()).thenReturn(clusterState); when(clusterState.routingTable()).thenReturn(mockRoutingTable); when(mockRoutingTable.shardRoutingTable(any())).thenReturn(primaryShard.getReplicationGroup().getRoutingTable()); when(clusterState.nodes()).thenReturn(DiscoveryNodes.builder().add(localNode).build()); + sut = prepareForReplication(primaryShard, replicaShard, transportService, indicesService, clusterService); initialCheckpoint = primaryShard.getLatestReplicationCheckpoint(); aheadCheckpoint = new ReplicationCheckpoint( @@ -596,13 +595,6 @@ public void testShardRoutingChanged_DoesNothingForDocRepIndex() throws IOExcepti closeShards(shard); } - public void testUpdateLatestReceivedCheckpoint() { - final SegmentReplicationTargetService spy = spy(sut); - sut.updateLatestReceivedCheckpoint(checkpoint, replicaShard); - sut.updateLatestReceivedCheckpoint(aheadCheckpoint, replicaShard); - assertEquals(sut.latestReceivedCheckpoint.get(replicaShard.shardId()), aheadCheckpoint); - } - public void testForceSegmentSyncHandler() throws Exception { ForceSyncRequest forceSyncRequest = new ForceSyncRequest(1L, 1L, replicaShard.shardId()); when(indicesService.getShardOrNull(forceSyncRequest.getShardId())).thenReturn(replicaShard); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 8b4b3aff701b4..52cb39bebd2b7 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -177,6 +177,9 @@ public void onFailure(Exception e) { logger.error("Unexpected onFailure", e); Assert.fail(); } + }, (ReplicationCheckpoint checkpoint, IndexShard indexShard) -> { + assertEquals(repCheckpoint, checkpoint); + assertEquals(indexShard, spyIndexShard); }); } @@ -230,7 +233,7 @@ public void onFailure(Exception e) { assertEquals(exception, e.getCause().getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); } - }); + }, mock(BiConsumer.class)); } public void testFailureResponse_getSegmentFiles() { @@ -283,7 +286,7 @@ public void onFailure(Exception e) { assertEquals(exception, e.getCause().getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); } - }); + }, mock(BiConsumer.class)); } public void testFailure_finalizeReplication_NonCorruptionException() throws IOException { @@ -330,7 +333,7 @@ public void onFailure(Exception e) { assertEquals(exception, e.getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); } - }); + }, mock(BiConsumer.class)); } public void testFailure_finalizeReplication_IndexFormatException() throws IOException { @@ -376,7 +379,7 @@ public void onFailure(Exception e) { assertEquals(exception, e.getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); } - }); + }, mock(BiConsumer.class)); } public void testFailure_differentSegmentFiles() throws IOException { @@ -429,7 +432,7 @@ public void onFailure(Exception e) { assertTrue(e.getMessage().contains("has local copies of segments that differ from the primary")); segrepTarget.fail(new ReplicationFailedException(e), false); } - }); + }, mock(BiConsumer.class)); } /** @@ -483,7 +486,7 @@ public void onFailure(Exception e) { logger.error("Unexpected onFailure", e); Assert.fail(); } - }); + }, mock(BiConsumer.class)); } /** diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java index 81ea16c80dd79..38f1c59bd5b68 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java @@ -9,6 +9,8 @@ package org.opensearch.indices.replication; import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.Version; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -20,6 +22,8 @@ import org.opensearch.common.lucene.Lucene; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; @@ -35,9 +39,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -200,6 +206,173 @@ public void getSegmentFiles( closeShards(primary, replica); } + public void testGetSegmentReplicationStats_WhenNoReplication() { + SegmentReplicator segmentReplicator = new SegmentReplicator(threadPool); + ShardId shardId = new ShardId("index", "uuid", 0); + ReplicationStats replicationStats = segmentReplicator.getSegmentReplicationStats(shardId); + assertEquals(0, replicationStats.maxReplicationLag); + assertEquals(0, replicationStats.totalBytesBehind); + assertEquals(0, replicationStats.maxBytesBehind); + } + + public void testGetSegmentReplicationStats_WhileOnGoingReplicationAndPrimaryRefreshedToNewCheckPoint() { + ShardId shardId = new ShardId("index", "uuid", 0); + ReplicationCheckpoint firstReplicationCheckpoint = ReplicationCheckpoint.empty(shardId); + + StoreFileMetadata storeFileMetadata1 = new StoreFileMetadata("test-1", 500, "1", Version.LATEST, new BytesRef(500)); + StoreFileMetadata storeFileMetadata2 = new StoreFileMetadata("test-2", 500, "1", Version.LATEST, new BytesRef(500)); + Map stringStoreFileMetadataMapOne = new HashMap<>(); + stringStoreFileMetadataMapOne.put("test-1", storeFileMetadata1); + stringStoreFileMetadataMapOne.put("test-2", storeFileMetadata2); + ReplicationCheckpoint secondReplicationCheckpoint = new ReplicationCheckpoint( + shardId, + 2, + 2, + 2, + 1000, + "", + stringStoreFileMetadataMapOne, + System.nanoTime() - TimeUnit.MINUTES.toNanos(1) + ); + + IndexShard replicaShard = mock(IndexShard.class); + when(replicaShard.shardId()).thenReturn(shardId); + when(replicaShard.getLatestReplicationCheckpoint()).thenReturn(firstReplicationCheckpoint) + .thenReturn(firstReplicationCheckpoint) + .thenReturn(firstReplicationCheckpoint) + .thenReturn(secondReplicationCheckpoint); + + SegmentReplicator segmentReplicator = new SegmentReplicator(threadPool); + segmentReplicator.initializeStats(shardId); + segmentReplicator.updateReplicationCheckpointStats(firstReplicationCheckpoint, replicaShard); + segmentReplicator.updateReplicationCheckpointStats(secondReplicationCheckpoint, replicaShard); + + Map stringStoreFileMetadataMapTwo = new HashMap<>(); + StoreFileMetadata storeFileMetadata3 = new StoreFileMetadata("test-3", 200, "1", Version.LATEST, new BytesRef(200)); + stringStoreFileMetadataMapTwo.put("test-1", storeFileMetadata1); + stringStoreFileMetadataMapTwo.put("test-2", storeFileMetadata2); + stringStoreFileMetadataMapTwo.put("test-3", storeFileMetadata3); + ReplicationCheckpoint thirdReplicationCheckpoint = new ReplicationCheckpoint( + shardId, + 3, + 3, + 3, + 200, + "", + stringStoreFileMetadataMapTwo, + System.nanoTime() - TimeUnit.MINUTES.toNanos(1) + ); + + segmentReplicator.updateReplicationCheckpointStats(thirdReplicationCheckpoint, replicaShard); + + ReplicationStats replicationStatsFirst = segmentReplicator.getSegmentReplicationStats(shardId); + assertEquals(1200, replicationStatsFirst.totalBytesBehind); + assertEquals(1200, replicationStatsFirst.maxBytesBehind); + assertTrue(replicationStatsFirst.maxReplicationLag > 0); + + segmentReplicator.pruneCheckpointsUpToLastSync(replicaShard); + + ReplicationStats replicationStatsSecond = segmentReplicator.getSegmentReplicationStats(shardId); + assertEquals(200, replicationStatsSecond.totalBytesBehind); + assertEquals(200, replicationStatsSecond.maxBytesBehind); + assertTrue(replicationStatsSecond.maxReplicationLag > 0); + } + + public void testGetSegmentReplicationStats_WhenCheckPointReceivedOutOfOrder() { + ShardId shardId = new ShardId("index", "uuid", 0); + ReplicationCheckpoint firstReplicationCheckpoint = ReplicationCheckpoint.empty(shardId); + + StoreFileMetadata storeFileMetadata1 = new StoreFileMetadata("test-1", 500, "1", Version.LATEST, new BytesRef(500)); + StoreFileMetadata storeFileMetadata2 = new StoreFileMetadata("test-2", 500, "1", Version.LATEST, new BytesRef(500)); + Map stringStoreFileMetadataMapOne = new HashMap<>(); + stringStoreFileMetadataMapOne.put("test-1", storeFileMetadata1); + stringStoreFileMetadataMapOne.put("test-2", storeFileMetadata2); + ReplicationCheckpoint secondReplicationCheckpoint = new ReplicationCheckpoint( + shardId, + 2, + 2, + 2, + 1000, + "", + stringStoreFileMetadataMapOne, + System.nanoTime() - TimeUnit.MINUTES.toNanos(1) + ); + + IndexShard replicaShard = mock(IndexShard.class); + when(replicaShard.shardId()).thenReturn(shardId); + when(replicaShard.getLatestReplicationCheckpoint()).thenReturn(firstReplicationCheckpoint) + .thenReturn(firstReplicationCheckpoint) + .thenReturn(firstReplicationCheckpoint); + + SegmentReplicator segmentReplicator = new SegmentReplicator(threadPool); + segmentReplicator.initializeStats(shardId); + segmentReplicator.updateReplicationCheckpointStats(firstReplicationCheckpoint, replicaShard); + + Map stringStoreFileMetadataMapTwo = new HashMap<>(); + StoreFileMetadata storeFileMetadata3 = new StoreFileMetadata("test-3", 200, "1", Version.LATEST, new BytesRef(200)); + stringStoreFileMetadataMapTwo.put("test-1", storeFileMetadata1); + stringStoreFileMetadataMapTwo.put("test-2", storeFileMetadata2); + stringStoreFileMetadataMapTwo.put("test-3", storeFileMetadata3); + ReplicationCheckpoint thirdReplicationCheckpoint = new ReplicationCheckpoint( + shardId, + 3, + 3, + 3, + 200, + "", + stringStoreFileMetadataMapTwo, + System.nanoTime() - TimeUnit.MINUTES.toNanos(1) + ); + + segmentReplicator.updateReplicationCheckpointStats(thirdReplicationCheckpoint, replicaShard); + + ReplicationStats replicationStatsFirst = segmentReplicator.getSegmentReplicationStats(shardId); + assertEquals(1200, replicationStatsFirst.totalBytesBehind); + assertEquals(1200, replicationStatsFirst.maxBytesBehind); + assertTrue(replicationStatsFirst.maxReplicationLag > 0); + + segmentReplicator.updateReplicationCheckpointStats(secondReplicationCheckpoint, replicaShard); + ReplicationStats replicationStatsSecond = segmentReplicator.getSegmentReplicationStats(shardId); + assertEquals(1200, replicationStatsSecond.totalBytesBehind); + assertEquals(1200, replicationStatsSecond.maxBytesBehind); + assertTrue(replicationStatsSecond.maxReplicationLag > 0); + } + + public void testUpdateReplicationCheckpointStatsIgnoresWhenOutOfOrderCheckPointReceived() { + ShardId shardId = new ShardId("index", "uuid", 0); + IndexShard replicaShard = mock(IndexShard.class); + when(replicaShard.shardId()).thenReturn(shardId); + + SegmentReplicator segmentReplicator = new SegmentReplicator(threadPool); + ReplicationCheckpoint replicationCheckpoint = new ReplicationCheckpoint( + shardId, + 2, + 2, + 2, + 1000, + "", + new HashMap<>(), + System.nanoTime() - TimeUnit.MINUTES.toNanos(1) + ); + segmentReplicator.updateReplicationCheckpointStats(replicationCheckpoint, replicaShard); + + assertEquals(replicationCheckpoint, segmentReplicator.getPrimaryCheckpoint(shardId)); + + ReplicationCheckpoint oldReplicationCheckpoint = new ReplicationCheckpoint( + shardId, + 1, + 1, + 1, + 500, + "", + new HashMap<>(), + System.nanoTime() - TimeUnit.MINUTES.toNanos(1) + ); + segmentReplicator.updateReplicationCheckpointStats(oldReplicationCheckpoint, replicaShard); + + assertEquals(replicationCheckpoint, segmentReplicator.getPrimaryCheckpoint(shardId)); + } + protected void resolveCheckpointListener(ActionListener listener, IndexShard primary) { try (final CopyState copyState = new CopyState(primary)) { listener.onResponse( @@ -209,5 +382,4 @@ protected void resolveCheckpointListener(ActionListener throw new UncheckedIOException(e); } } - } diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index 0b30486038e3a..3b7c5560f89fb 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -81,7 +81,8 @@ public static IndexShard createMockIndexShard() throws IOException { 0L, 0L, Codec.getDefault().getName(), - SI_SNAPSHOT.asMap() + SI_SNAPSHOT.asMap(), + 0L ); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 655a9eb7d5d38..bdd4b40e398d5 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -86,6 +86,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.MapperTestUtils; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; import org.opensearch.index.cache.query.DisabledQueryCache; @@ -688,6 +689,9 @@ protected IndexShard newShard( } return new InternalTranslogFactory(); }; + // This is fine since we are not testing the node stats now + Function mockReplicationStatsProvider = mock(Function.class); + when(mockReplicationStatsProvider.apply(any())).thenReturn(new ReplicationStats(800, 800, 500)); indexShard = new IndexShard( routing, indexSettings, @@ -717,7 +721,8 @@ protected IndexShard newShard( DefaultRecoverySettings.INSTANCE, DefaultRemoteStoreSettings.INSTANCE, false, - discoveryNodes + discoveryNodes, + mockReplicationStatsProvider ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) {