Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
kkewwei committed Jan 18, 2025
1 parent faa0bd4 commit d03b6a0
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,8 @@ private static Engine.Result performOpOnReplica(
primaryResponse.getVersion(),
indexRequest.getAutoGeneratedTimestamp(),
indexRequest.isRetry(),
sourceToParse
sourceToParse,
primaryResponse.indexingStrategy()
);
break;
case DELETE:
Expand Down
23 changes: 16 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.ReadOnlyEngine;
import org.opensearch.index.engine.RefreshFailedEngineException;
Expand Down Expand Up @@ -1054,6 +1055,7 @@ public Engine.IndexResult applyIndexOperationOnPrimary(
isRetry,
Engine.Operation.Origin.PRIMARY,
sourceToParse,
null,
null
);
}
Expand All @@ -1065,7 +1067,8 @@ public Engine.IndexResult applyIndexOperationOnReplica(
long version,
long autoGeneratedTimeStamp,
boolean isRetry,
SourceToParse sourceToParse
SourceToParse sourceToParse,
InternalEngine.IndexingStrategy indexingStrategy
) throws IOException {
return applyIndexOperation(
getEngine(),
Expand All @@ -1079,7 +1082,8 @@ public Engine.IndexResult applyIndexOperationOnReplica(
isRetry,
Engine.Operation.Origin.REPLICA,
sourceToParse,
id
id,
indexingStrategy
);
}

Expand All @@ -1095,7 +1099,8 @@ private Engine.IndexResult applyIndexOperation(
boolean isRetry,
Engine.Operation.Origin origin,
SourceToParse sourceToParse,
String id
String id,
InternalEngine.IndexingStrategy indexingStrategy
) throws IOException {

// For Segment Replication enabled replica shards we can be skip parsing the documents as we directly copy segments from primary
Expand Down Expand Up @@ -1136,7 +1141,8 @@ private Engine.IndexResult applyIndexOperation(
autoGeneratedTimeStamp,
isRetry,
ifSeqNo,
ifPrimaryTerm
ifPrimaryTerm,
indexingStrategy
);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
Expand Down Expand Up @@ -1165,7 +1171,8 @@ public static Engine.Index prepareIndex(
long autoGeneratedIdTimestamp,
boolean isRetry,
long ifSeqNo,
long ifPrimaryTerm
long ifPrimaryTerm,
InternalEngine.IndexingStrategy indexingStrategy
) {
long startTime = System.nanoTime();
ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
Expand All @@ -1185,7 +1192,8 @@ public static Engine.Index prepareIndex(
autoGeneratedIdTimestamp,
isRetry,
ifSeqNo,
ifPrimaryTerm
ifPrimaryTerm,
indexingStrategy
);
}

Expand Down Expand Up @@ -2417,7 +2425,8 @@ private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation o
MediaTypeRegistry.xContentType(index.source()),
index.routing()
),
index.id()
index.id(),
null
);
break;
case DELETE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception {
1,
randomNonNegativeLong(),
false,
new SourceToParse("index", "replica", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse("index", "replica", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
shards.promoteReplicaToPrimary(promotedReplica).get();
oldPrimary.close("demoted", randomBoolean(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2344,7 +2344,8 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(shard.shardId().getIndexName(), "id", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(shard.shardId().getIndexName(), "id", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
Expand All @@ -2353,7 +2354,8 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
3,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(shard.shardId().getIndexName(), "id-3", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(shard.shardId().getIndexName(), "id-3", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
// Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery.
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
Expand All @@ -2364,7 +2366,8 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
3,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(shard.shardId().getIndexName(), "id-2", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(shard.shardId().getIndexName(), "id-2", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
shard.applyIndexOperationOnReplica(
UUID.randomUUID().toString(),
Expand All @@ -2373,7 +2376,8 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(shard.shardId().getIndexName(), "id-5", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(shard.shardId().getIndexName(), "id-5", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
shard.sync(); // advance local checkpoint

Expand Down Expand Up @@ -2521,7 +2525,8 @@ public void testRecoverFromStoreWithNoOps() throws IOException {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
sourceToParse
sourceToParse,
null
);

final ShardRouting primaryShardRouting = shard.routingEntry();
Expand Down Expand Up @@ -2649,7 +2654,8 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "doc-0", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(indexName, "doc-0", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
flushShard(shard);
shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here.
Expand All @@ -2660,7 +2666,8 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "doc-1", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(indexName, "doc-1", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1"));
Expand All @@ -2673,7 +2680,8 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "doc-2", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(indexName, "doc-2", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2"));
Expand Down Expand Up @@ -4155,7 +4163,8 @@ private Result indexOnReplicaWithGaps(final IndexShard indexShard, final int ope
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
sourceToParse
sourceToParse,
null
);
if (!gap && i == localCheckpoint + 1) {
localCheckpoint++;
Expand Down Expand Up @@ -4780,7 +4789,8 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(shard.shardId.getIndexName(), Long.toString(i), new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(shard.shardId.getIndexName(), Long.toString(i), new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
shard.updateGlobalCheckpointOnReplica(shard.getLocalCheckpoint(), "test");
if (randomInt(100) < 10) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ private SeqNoStats populateRandomData(IndexShard shard) throws IOException {
shard.getOperationPrimaryTerm(),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(shard.shardId().getIndexName(), UUIDs.randomBase64UUID(), new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(shard.shardId().getIndexName(), UUIDs.randomBase64UUID(), new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
if (randomInt(100) < 5) {
shard.flush(new FlushRequest().waitIfOngoing(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "id", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(indexName, "id", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
// index #3
orgReplica.applyIndexOperationOnReplica(
Expand All @@ -200,7 +201,8 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "id-3", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(indexName, "id-3", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
// Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1.
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
Expand All @@ -212,7 +214,8 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "id-2", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(indexName, "id-2", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);
orgReplica.sync(); // advance local checkpoint
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
Expand All @@ -224,7 +227,8 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
1,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
new SourceToParse(indexName, "id-5", new BytesArray("{}"), MediaTypeRegistry.JSON)
new SourceToParse(indexName, "id-5", new BytesArray("{}"), MediaTypeRegistry.JSON),
null
);

if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,8 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source
0,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP,
false,
sourceToParse
sourceToParse,
null
);
shard.sync(); // advance local checkpoint
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
Expand Down

0 comments on commit d03b6a0

Please sign in to comment.