Skip to content

Commit

Permalink
Use lastRefreshedCheckpoint insteand of processedCheckpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Dec 15, 2022
1 parent 499a425 commit 9562461
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2748,7 +2748,7 @@ public long tryDeleteDocument(IndexReader readerIn, int docID) {
/**
* Returned the last local checkpoint value has been refreshed internally.
*/
final long lastRefreshedCheckpoint() {
public final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

Expand Down Expand Up @@ -96,10 +97,12 @@ public void afterRefresh(boolean didRefresh) {
try {
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
// Ideally, we want this to be done in async flow. (GitHub issue #4315)
if (isRefreshAfterCommit()) {
deleteStaleCommits();
}

String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Expand Down Expand Up @@ -173,11 +176,20 @@ private boolean isRefreshAfterCommit() throws IOException {
}

String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException {
long processedLocalCheckpoint = indexShard.getEngine().getProcessedLocalCheckpoint();
// We use lastRefreshedCheckpoint as local checkpoint for the SegmentInfosSnapshot. This is better than using
// getProcessedLocalCheckpoint() as processedCheckpoint can advance between reading the value and setting up
// in SegmentInfos.userData. This could lead to data loss as, during recovery, translog will be replayed based on
// LOCAL_CHECKPOINT_KEY.
// lastRefreshedCheckpoint is updated after refresh listeners are executed, this means, InternalEngine.lastRefreshedCheckpoint()
// will return checkpoint of last refresh but that does not impact the correctness as duplicate sequence numbers
// will not be replayed.
assert indexShard.getEngine() instanceof InternalEngine : "Expected shard with InternalEngine, got: "
+ indexShard.getEngine().getClass();
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();

Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(processedLocalCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(processedLocalCheckpoint));
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint));
segmentInfosSnapshot.setUserData(userData, false);

long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename);
Expand Down

0 comments on commit 9562461

Please sign in to comment.