Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
kkewwei committed Jan 18, 2025
1 parent 68c2918 commit 30b1961
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 25 deletions.
35 changes: 31 additions & 4 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1518,15 +1518,17 @@ public String getLowercase() {
private final VersionType versionType;
private final Origin origin;
private final long startTime;
private final InternalEngine.IndexingStrategy indexingStrategy;

public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime) {
public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime, InternalEngine.IndexingStrategy indexingStrategy) {
this.uid = uid;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.versionType = versionType;
this.origin = origin;
this.startTime = startTime;
this.indexingStrategy = indexingStrategy;
}

/**
Expand Down Expand Up @@ -1587,6 +1589,10 @@ public long startTime() {
abstract String id();

public abstract TYPE operationType();

public InternalEngine.IndexingStrategy indexingStrategy() {
return indexingStrategy;
};
}

/**
Expand Down Expand Up @@ -1617,7 +1623,25 @@ public Index(
long ifSeqNo,
long ifPrimaryTerm
) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
this(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry, ifSeqNo, ifPrimaryTerm, null);
}

public Index(
Term uid,
ParsedDocument doc,
long seqNo,
long primaryTerm,
long version,
VersionType versionType,
Origin origin,
long startTime,
long autoGeneratedIdTimestamp,
boolean isRetry,
long ifSeqNo,
long ifPrimaryTerm,
InternalEngine.IndexingStrategy indexingStrategy
) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime, indexingStrategy);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >= 0 : "ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
Expand All @@ -1630,6 +1654,7 @@ public Index(
this.ifPrimaryTerm = ifPrimaryTerm;
}


public Index(Term uid, long primaryTerm, ParsedDocument doc) {
this(uid, primaryTerm, doc, Versions.MATCH_ANY);
} // TEST ONLY
Expand Down Expand Up @@ -1706,6 +1731,8 @@ public long getIfSeqNo() {
public long getIfPrimaryTerm() {
return ifPrimaryTerm;
}


}

/**
Expand All @@ -1732,7 +1759,7 @@ public Delete(
long ifSeqNo,
long ifPrimaryTerm
) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime, null);
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >= 0 : "ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
Expand Down Expand Up @@ -1812,7 +1839,7 @@ public String reason() {
}

public NoOp(final long seqNo, final long primaryTerm, final Origin origin, final long startTime, final String reason) {
super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime);
super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime, null);
this.reason = reason;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,16 +684,16 @@ enum OpVsLuceneDocStatus {
/** the op is more recent than the one that last modified the doc found in lucene*/
OP_NEWER,
/** the op is older or the same as the one that last modified the doc found in lucene*/
OP_STALE_OR_EQUAL,
OP_STALE_OR_EQUAL,// 一样,或者更旧的
/** no doc was found in lucene */
LUCENE_DOC_NOT_FOUND
}

private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) {
Objects.requireNonNull(versionValue);
if (seqNo > versionValue.seqNo) {
return OpVsLuceneDocStatus.OP_NEWER;
} else if (seqNo == versionValue.seqNo) {
return OpVsLuceneDocStatus.OP_NEWER;// 新的
} else if (seqNo == versionValue.seqNo) {// 一样
assert versionValue.term == primaryTerm : "primary term not matched; id="
+ id
+ " seq_no="
Expand All @@ -711,12 +711,13 @@ private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
final OpVsLuceneDocStatus status;
VersionValue versionValue = getVersionFromMap(op.uid().bytes());

VersionValue versionValue = getVersionFromMap(op.uid().bytes());// 从maps中找下,看可以找到吗
assert incrementVersionLookup();
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
if (versionValue != null) {
if (versionValue != null) {// maps中找到了
status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);
} else {
} else {// 没找到
// load from index
assert incrementIndexVersionLookup();
try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {
Expand Down Expand Up @@ -906,10 +907,11 @@ public IndexResult index(Index index) throws IOException {
index.getAutoGeneratedIdTimestamp(),
index.isRetry(),
index.getIfSeqNo(),
index.getIfPrimaryTerm()
index.getIfPrimaryTerm(),
plan
);

final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;// 不带主键的写入
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo());
}
Expand All @@ -922,6 +924,9 @@ public IndexResult index(Index index) throws IOException {
if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
indexResult = indexIntoLucene(index, plan);
} else {
if (plan.versionForIndexing != index.indexingStrategy().versionForIndexing || plan.currentNotFoundOrDeleted != index.indexingStrategy().currentNotFoundOrDeleted) {
throw new RuntimeException("plan.versionForIndexing != index.indexingStrategy().versionForIndexing || plan.currentNotFoundOrDeleted != index.indexingStrategy().currentNotFoundOrDeleted");
}
indexResult = new IndexResult(
plan.versionForIndexing,
index.primaryTerm(),
Expand Down Expand Up @@ -956,6 +961,9 @@ public IndexResult index(Index index) throws IOException {
index.uid().bytes(),
new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
);
if (plan.versionForIndexing != index.indexingStrategy().versionForIndexing) {
System.out.println("wrong");
}
}
localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
if (indexResult.getTranslogLocation() == null) {
Expand Down Expand Up @@ -987,7 +995,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
assert assertNonPrimaryOrigin(index);
// needs to maintain the auto_id timestamp in case this replica becomes primary
if (canOptimizeAddDocument(index)) {// 如果不带主键的写入,肯定可以优化
mayHaveBeenIndexedBefore(index);
mayHaveBeenIndexedBefore(index);// 可能更新maxUnsafeAutoIdTimestamp和必须更新maxSeenAutoIdTimestamp
}
final IndexingStrategy plan;
// unlike the primary, replicas don't really care to about creation status of documents
Expand All @@ -1010,19 +1018,24 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
} else {
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
versionMap.enforceSafeAccess();
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
if (segRepEnabled) {
// For segrep based indices, we can't completely rely on localCheckpointTracker
// as the preserved checkpoint may not have all the operations present in lucene
// we don't need to index it again as stale op as it would create multiple documents for same seq no
plan = IndexingStrategy.processButSkipLucene(false, index.version());
} else {
plan = IndexingStrategy.processAsStaleOp(index.version());

// if (index.IndexingStrategy() != null && segRepEnabled == false) {
// return index.IndexingStrategy();
// } else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {// seqNo小于等于当前已经写入的
if (segRepEnabled) {
// For segrep based indices, we can't completely rely on localCheckpointTracker
// as the preserved checkpoint may not have all the operations present in lucene
// we don't need to index it again as stale op as it would create multiple documents for same seq no
plan = IndexingStrategy.processButSkipLucene(false, index.version());
} else {
plan = IndexingStrategy.processAsStaleOp(index.version());
}
} else {// 更新的,或者没找到
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0);
}
} else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0);
}
// }
}
return plan;
}
Expand Down Expand Up @@ -1111,6 +1124,11 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I
assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
assert plan.indexIntoLucene || plan.addStaleOpToLucene;
InternalEngine.IndexingStrategy indexingStrategy = index.indexingStrategy();
if (indexingStrategy.versionForIndexing != plan.versionForIndexing || (indexingStrategy.indexIntoLucene || indexingStrategy.addStaleOpToLucene) == false) {
throw new RuntimeException("dddddd 0");
}

/* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
Expand All @@ -1119,15 +1137,24 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I
index.parsedDoc().version().setLongValue(plan.versionForIndexing);
try {
if (plan.addStaleOpToLucene) {
if (indexingStrategy.addStaleOpToLucene == false) {
throw new RuntimeException("indexingStrategy.addStaleOpToLucene == false");
}
addStaleDocs(index.docs(), indexWriter);
} else if (plan.useLuceneUpdateDocument) {
if (indexingStrategy.useLuceneUpdateDocument == false) {
throw new RuntimeException("indexingStrategy.useLuceneUpdateDocument == false");
}
assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true);
updateDocs(index.uid(), index.docs(), indexWriter);
} else {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
addDocs(index.docs(), indexWriter);
}
if (plan.versionForIndexing != indexingStrategy.versionForIndexing || plan.currentNotFoundOrDeleted != indexingStrategy.currentNotFoundOrDeleted) {
throw new RuntimeException("plan.versionForIndexing != indexingStrategy.versionForIndexing || plan.currentNotFoundOrDeleted != indexingStrategy.currentNotFoundOrDeleted");
}
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
if (ex instanceof AlreadyClosedException == false
Expand Down

0 comments on commit 30b1961

Please sign in to comment.