-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open engine should keep only starting commit #28228
Changes from 5 commits
d7f6ef9
f157ed5
11daf09
5c2853d
bd544cf
64e692a
a252d0b
97d3825
26df28a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,37 +45,72 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { | |
private final TranslogDeletionPolicy translogDeletionPolicy; | ||
private final EngineConfig.OpenMode openMode; | ||
private final LongSupplier globalCheckpointSupplier; | ||
private final IndexCommit startingCommit; | ||
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point. | ||
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. | ||
private IndexCommit lastCommit; // the most recent commit point | ||
|
||
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy, | ||
LongSupplier globalCheckpointSupplier) { | ||
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) { | ||
this.openMode = openMode; | ||
this.translogDeletionPolicy = translogDeletionPolicy; | ||
this.globalCheckpointSupplier = globalCheckpointSupplier; | ||
this.startingCommit = startingCommit; | ||
this.snapshottedCommits = new ObjectIntHashMap<>(); | ||
} | ||
|
||
@Override | ||
public void onInit(List<? extends IndexCommit> commits) throws IOException { | ||
switch (openMode) { | ||
case CREATE_INDEX_AND_TRANSLOG: | ||
assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]"; | ||
break; | ||
case OPEN_INDEX_CREATE_TRANSLOG: | ||
assert commits.isEmpty() == false : "index is opened, but we have no commits"; | ||
// When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately. | ||
// We therefore can simply skip processing here as `onCommit` will be called right after with a new commit. | ||
break; | ||
case OPEN_INDEX_AND_TRANSLOG: | ||
assert commits.isEmpty() == false : "index is opened, but we have no commits"; | ||
onCommit(commits); | ||
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; " | ||
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]"; | ||
keepOnlyStartingCommitOnInit(commits); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you leave a big comment explaining why this is needed? |
||
break; | ||
default: | ||
throw new IllegalArgumentException("unknown openMode [" + openMode + "]"); | ||
} | ||
} | ||
|
||
/** | ||
* Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe | ||
* at the recovering time but they can suddenly become safe in the future. | ||
* The following issues can happen if unsafe commits are kept oninit. | ||
* <p> | ||
* 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1) | ||
* and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2) | ||
* is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use | ||
* the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the | ||
* commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica. | ||
* <p> | ||
* 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit | ||
* c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2). | ||
* The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new | ||
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery | ||
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1 | ||
* while the local checkpoint of c2 is 2. | ||
* <p> | ||
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced | ||
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, | ||
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. | ||
*/ | ||
private synchronized void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) throws IOException { | ||
commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete); | ||
assert startingCommit.isDeleted() == false : "Starting commit must not be deleted"; | ||
lastCommit = startingCommit; | ||
safeCommit = startingCommit; | ||
// OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history, | ||
// We therefore should not use that index commit to update the translog deletion policy. | ||
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels strange here. Move it to the constructor? |
||
updateTranslogDeletionPolicy(); | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException { | ||
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -185,7 +185,7 @@ public InternalEngine(EngineConfig engineConfig) { | |
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]"; | ||
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit); | ||
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy, | ||
translog::getLastSyncedGlobalCheckpoint); | ||
translog::getLastSyncedGlobalCheckpoint, startingCommit); | ||
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit); | ||
updateMaxUnsafeAutoIdTimestampFromWriter(writer); | ||
assert engineConfig.getForceNewHistoryUUID() == false | ||
|
@@ -411,28 +411,44 @@ public void skipTranslogRecovery() { | |
} | ||
|
||
private IndexCommit getStartingCommitPoint() throws IOException { | ||
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { | ||
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); | ||
final long minRetainedTranslogGen = translog.getMinFileGeneration(); | ||
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory()); | ||
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose full translog | ||
// files are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. | ||
// To avoid this issue, we only select index commits whose translog files are fully retained. | ||
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { | ||
final List<IndexCommit> recoverableCommits = new ArrayList<>(); | ||
for (IndexCommit commit : existingCommits) { | ||
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { | ||
recoverableCommits.add(commit); | ||
final IndexCommit startingIndexCommit; | ||
final List<IndexCommit> existingCommits; | ||
switch (openMode) { | ||
case CREATE_INDEX_AND_TRANSLOG: | ||
startingIndexCommit = null; | ||
break; | ||
case OPEN_INDEX_CREATE_TRANSLOG: | ||
// Use the last commit | ||
existingCommits = DirectoryReader.listCommits(store.directory()); | ||
startingIndexCommit = existingCommits.get(existingCommits.size() - 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we assert there is only one? I believe this is the case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added an assert but moved to |
||
break; | ||
case OPEN_INDEX_AND_TRANSLOG: | ||
// Use the safe commit | ||
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); | ||
final long minRetainedTranslogGen = translog.getMinFileGeneration(); | ||
existingCommits = DirectoryReader.listCommits(store.directory()); | ||
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog | ||
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. | ||
// To avoid this issue, we only select index commits whose translog are fully retained. | ||
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) { | ||
final List<IndexCommit> recoverableCommits = new ArrayList<>(); | ||
for (IndexCommit commit : existingCommits) { | ||
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { | ||
recoverableCommits.add(commit); | ||
} | ||
} | ||
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " + | ||
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; | ||
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); | ||
} else { | ||
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint. | ||
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); | ||
} | ||
assert recoverableCommits.isEmpty() == false : "No commit point with full translog found; " + | ||
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]"; | ||
return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint); | ||
} else { | ||
return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint); | ||
} | ||
break; | ||
default: | ||
throw new IllegalArgumentException("unknown mode: " + openMode); | ||
} | ||
return null; | ||
return startingIndexCommit; | ||
} | ||
|
||
private void recoverFromTranslogInternal() throws IOException { | ||
|
@@ -557,9 +573,7 @@ private ExternalSearcherManager createSearcherManager(SearchFactory externalSear | |
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); | ||
internalSearcherManager = new SearcherManager(directoryReader, | ||
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); | ||
// The index commit from IndexWriterConfig is null if the engine is open with other modes | ||
// rather than CREATE_INDEX_AND_TRANSLOG. In those cases lastCommittedSegmentInfos will be retrieved from the last commit. | ||
lastCommittedSegmentInfos = store.readCommittedSegmentsInfo(indexWriter.getConfig().getIndexCommit()); | ||
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know we now trim things and this OK, but it feels risky - maybe we'll change our mind later in the future clean less heavily. It will be hard to find this place and change. Maybe just keep it as is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The starting commit will be deleted before this line is executed if the open mode is |
||
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager, | ||
externalSearcherFactory); | ||
success = true; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,7 +54,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase { | |
public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { | ||
final AtomicLong globalCheckpoint = new AtomicLong(); | ||
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); | ||
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); | ||
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( | ||
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); | ||
|
||
final LongArrayList maxSeqNoList = new LongArrayList(); | ||
final LongArrayList translogGenList = new LongArrayList(); | ||
|
@@ -93,7 +94,8 @@ public void testAcquireIndexCommit() throws Exception { | |
final AtomicLong globalCheckpoint = new AtomicLong(); | ||
final UUID translogUUID = UUID.randomUUID(); | ||
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); | ||
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); | ||
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( | ||
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); | ||
long lastMaxSeqNo = between(1, 1000); | ||
long lastTranslogGen = between(1, 20); | ||
int safeIndex = 0; | ||
|
@@ -156,11 +158,12 @@ public void testLegacyIndex() throws Exception { | |
final UUID translogUUID = UUID.randomUUID(); | ||
|
||
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); | ||
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); | ||
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( | ||
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); | ||
|
||
long legacyTranslogGen = randomNonNegativeLong(); | ||
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen); | ||
indexPolicy.onInit(singletonList(legacyCommit)); | ||
indexPolicy.onCommit(singletonList(legacyCommit)); | ||
verify(legacyCommit, never()).delete(); | ||
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen)); | ||
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen)); | ||
|
@@ -188,7 +191,8 @@ public void testLegacyIndex() throws Exception { | |
public void testDeleteInvalidCommits() throws Exception { | ||
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); | ||
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); | ||
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get); | ||
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( | ||
OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get, null); | ||
|
||
final int invalidCommits = between(1, 10); | ||
final List<IndexCommit> commitList = new ArrayList<>(); | ||
|
@@ -211,6 +215,35 @@ public void testDeleteInvalidCommits() throws Exception { | |
} | ||
} | ||
|
||
/** | ||
* Keeping existing unsafe commits can be problematic because these commits are not safe at the recovering time | ||
* but they can suddenly become safe in the future. See {@link CombinedDeletionPolicy#keepOnlyStartingCommitOnInit(List)} | ||
*/ | ||
public void testKeepOnlyStartingCommitOnInit() throws Exception { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add java docs explaining why this behavior is needed. I think it will be hard to figure out. |
||
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); | ||
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); | ||
final UUID translogUUID = UUID.randomUUID(); | ||
final List<IndexCommit> commitList = new ArrayList<>(); | ||
int totalCommits = between(2, 20); | ||
for (int i = 0; i < totalCommits; i++) { | ||
commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong())); | ||
} | ||
final IndexCommit startingCommit = randomFrom(commitList); | ||
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( | ||
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, startingCommit); | ||
indexPolicy.onInit(commitList); | ||
for (IndexCommit commit : commitList) { | ||
if (commit.equals(startingCommit) == false) { | ||
verify(commit, times(1)).delete(); | ||
} | ||
} | ||
verify(startingCommit, never()).delete(); | ||
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), | ||
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); | ||
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), | ||
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); | ||
} | ||
|
||
IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { | ||
final Map<String, String> userData = new HashMap<>(); | ||
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we're changing the logic of
OPEN_INDEX_AND_TRANSLOG
, I think we can also consolidate the code paths forOPEN_INDEX_CREATE_TRANSLOG
&OPEN_INDEX_AND_TRANSLOG
to be the same whenever we open an index - less exceptions and more unity. To do so we need to setstartingCommit
to a valid value inInternalEngine#getStartingCommitPoint
whenever we are actually opening something.