-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce sequence-number-based recovery #22484
Introduce sequence-number-based recovery #22484
Conversation
6265447
to
ac1d630
Compare
retest this please |
1 similar comment
retest this please |
ac1d630
to
54e8224
Compare
This commit introduces sequence-number-based recovery. When a replica has fallen out of sync, rather than performing a file-based recovery we first attempt to replay operations since the last local checkpoint on the replica. To do this, at the start of recovery the replica tells the primary what its local checkpoint is. The primary will then wait for all operations between that local checkpoint and the current maximum sequence number to complete; this is to ensure that there are no gaps in the operations that will be replayed from the primary to the replica. This is a best-effort attempt as we currently have no guarantees on the primary that these operations will be available; if we are not able to replay all operations in the desired range, we just fallback to file-based recovery. Later work will strengthen the guarantees.
This commit simplifies sequence number-based recovery. Rather than execute a dance between the replica and the primary of having the replica request a sequence number-based recovery, then failling that recovery if it is not possible and having the replica request a second file-based recovery, we simply check on the primary side if a sequence number-based recovery is possible and immediately fallback to file-basd recovery if not.
54e8224
to
d360a23
Compare
@@ -16,6 +16,7 @@ | |||
* specific language governing permissions and limitations | |||
* under the License. | |||
*/ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a typo here
@@ -379,6 +379,7 @@ void setTook(long took) { | |||
void freeze() { | |||
freeze.set(true); | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
} else { | ||
// no version conflict | ||
if (index.origin() == Operation.Origin.PRIMARY) { | ||
seqNo = seqNoService().generateSeqNo(); | ||
} | ||
|
||
/** | ||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a Javadoc-style comment inside a method where it has no impact on javadoc
. While javac
will treat it as a block comment either way, my IDE formats it as a Javadoc-style comment instead of as a block comment and it annoys me.
b6a32d1
to
b9b816f
Compare
* master: [TEST] Fixed the incorrect indentation for the `skip` clauses in the REST tests Fix primary relocation for shadow replicas (elastic#22474)
b9b816f
to
6ec0ef6
Compare
This commit removes a field that was left behind in a previous refactoring that rendered the field obsolete.
w00t. I'll review as soon as I catch up with everything. |
40f5c2e
to
8b5ea52
Compare
If a file-based recovery completes phase one successfully, but a network partition happens before the translog is opened, during the retry loop the recovery target will proceed to attempt a sequence-number-based recovery as the index files are present. However, as the translog was never opened it will be missing on disk leading to a no such file exception while preparing for a sequence-number-based recovery. We should not let this fail the recovery, but instead proceed to attempt another file-based recovery.
8b5ea52
to
91e1ff0
Compare
A version conflict exception can happen during recovery. If this operation is from an old primary, a sequence number will have not been assigned to the operation. In this case, we should skip adding a no-op to the translog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. I left some minor comments and suggestions.
@@ -361,7 +361,7 @@ public long getTook() { | |||
|
|||
void setTranslogLocation(Translog.Location translogLocation) { | |||
if (freeze.get() == null) { | |||
assert failure == null : "failure has to be null to set translog location"; | |||
assert failure == null || translogLocation == null: "failure has to be null to set translog location"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering - was this required for this PR or is it preparation for the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change will no longer be necessary after #22626.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 8b0e501.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also pushed cea70f4.
} | ||
|
||
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); | ||
return SequenceNumbers.loadSeqNoStatsFromLuceneCommit(globalCheckpoint, indexWriter.getLiveCommitData()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this all worth it? I wonder if we should just load from store in the constructor and save on this method:
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
seqNoStats = store.loadSeqNoStats(Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()));
writer = createWriter(false);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
seqNoStats = store.loadSeqNoStats(SequenceNumbersService.UNASSIGNED_SEQ_NO);
writer = createWriter(false);
break;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree; I pushed 1929c03 (I think that this will also fit better with future developments).
@@ -57,4 +57,5 @@ public int totalOperations() { | |||
} | |||
return null; | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a shame to touch this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed dac513a.
return; | ||
} | ||
final Optional<StartRecoveryRequest> maybeRequest = getStartRecoveryRequest(recoveryTarget); | ||
if (!maybeRequest.isPresent()) return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really thank that just have the try catch for errors here, potentially part of the try with resources block ([1]) will be much cleaner and easier to read than the optional song and dance.
[1] starting at try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I agree, having tried to rewrite it, I find it easier to reason about as-is. Let's discuss if you feel strongly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Here's a patch with what I meant. Talk tomorrow :)
https://gist.github.com/bleskes/9177f512ebe803dc07dbfdda6f8ef2b7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hehe. yeah. I think the error in the cause exception should be enough to give us the information we need. No need to be heroic about the extra info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed cc2002c.
} | ||
|
||
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); | ||
recoveryTarget.indexShard().prepareForIndexRecovery(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - this seems like a weird side effect to have here. Can we move it back to the main method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 34fbb37.
replicas.add(replica); | ||
updateAllocationIDsOnPrimary(); | ||
return replica; | ||
} | ||
|
||
public synchronized IndexShard addReplica(IndexShard replica) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we name this something like closeAndAddAsInitializingReplica? (feel free to shorten, but I think we should make it clear what do, at the expense of length if need be)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should call this addReplicaWithExistingPath and give it two parameters - ShardPath and node Id? (leaving all the shard wrangling to the caller).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I pushed c0169c2.
shards.recoverReplica(recoveredReplica); | ||
if (flushPrimary && replicaHasDocsSinceLastFlushedCheckpoint) { | ||
// replica has something to catch up with, but since we flushed the primary, we should fall back to full recovery | ||
assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), not(empty())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also assert the number of translog ops recovered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 8960522; please review this one carefully. 😉
@@ -57,11 +61,71 @@ public void testIndexingDuringFileRecovery() throws Exception { | |||
} | |||
} | |||
|
|||
public void testRecoveryOfDisconnectedReplica() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a great simple test. I wonder how much effort it will be to add "ops in flight while starting recovery" to it (or another test). I'm thinking of how we test that we wait for the the translog to have a complete continuous section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 7281b75. 😇
localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint); | ||
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint); | ||
localCheckpointTracker = new LocalCheckpointTracker(indexSettings, maxSeqNo, localCheckpoint); | ||
globalCheckpointTracker = new GlobalCheckpointTracker(indexSettings, globalCheckpoint, logger); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this violate the logger per component policy we have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed b6e6cc3.
@@ -684,13 +670,20 @@ private IndexResult innerIndex(Index index) throws IOException { | |||
final IndexResult indexResult; | |||
if (checkVersionConflictResult.isPresent()) { | |||
indexResult = checkVersionConflictResult.get(); | |||
// norelease: this is not correct as this does not force an fsync, and we need to handle failures including replication | |||
if (indexResult.hasFailure() || seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed another solution for this problem in another channel. The gist was to change the way we deal with version conflicts on replicas. The idea was to try do it as another first and then base this PR on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened #22626 for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This commit reverts adding no-ops to the translog when a version conflict exception arises on a replica. Instead, we will treat these as normal operations on a replica, but this will happen in another commit.
This commit reverts a whitespace change in MultiSnapshot.java.
When reading the translog on the source during peer recovery, if an I/O exception occurs it is wrapped in an unchecked exception. This is unnecessary as we can just let the I/O exception bubble all the way up. This commit does that.
This reverts commit adafa21.
* master: (47 commits) Remove non needed import use expectThrows instead of manually testing exception Fix checkstyle and a test Update after review Read ec2 discovery address from aws instance tags Invalidate cached query results if query timed out (elastic#22807) Add remaining generated painless API Generate reference links for painless API (elastic#22775) [TEST] Fix ElasticsearchExceptionTests Add parsing method for ElasticsearchException.generateThrowableXContent() (elastic#22783) Improve connection closing in `RemoteClusterConnection` (elastic#22804) Docs: Cluster allocation explain should be on one page Remove DFS_QUERY_AND_FETCH as a search type (elastic#22787) Add repository-url module and move URLRepository (elastic#22752) fix date-processor to a new default year for every new pipeline execution. (elastic#22601) Add tests for top_hits aggregation (elastic#22754) [TEST] Added this for 93a28b0 submitted via elastic#22772 Fix typo in comment in OsProbe.java Add new ruby search library to community clients doc (elastic#22765) RangeQuery WITHIN case now normalises query (elastic#22431) ...
retest this please |
retest this please |
3 similar comments
retest this please |
retest this please |
retest this please |
Thanks @bleskes. 😄 |
The seq# base recovery logic relies on rolling back lucene to remove any operations above the global checkpoint. This part of the plan is not implemented yet but have to have these guarantees. Instead we should make the seq# logic validate that the last commit point (and the only one we have) maintains the invariant and if not, fall back to file based recovery. This commit adds a test that creates situation where rollback is needed (primary fail over with ops in flight) and fixes another issue that was surfaced by it - if a primary can't serve a seq# based recovery request and does a file copy, it still used the incoming `startSeqNo` as a filter. Relates to elastic#22484 & #elastic#10708
…#22851) The seq# base recovery logic relies on rolling back lucene to remove any operations above the global checkpoint. This part of the plan is not implemented yet but have to have these guarantees. Instead we should make the seq# logic validate that the last commit point (and the only one we have) maintains the invariant and if not, fall back to file based recovery. This commit adds a test that creates situation where rollback is needed (primary failover with ops in flight) and fixes another issue that was surfaced by it - if a primary can't serve a seq# based recovery request and does a file copy, it still used the incoming `startSeqNo` as a filter. Relates to #22484 & #10708
…sts (#22900) EvillPeerRecoveryIT checks scenario where recovery is happening while there are on going indexing operation that already have been assigned a seq# . This is fairly hard to achieve and the test goes through a couple of hoops via the plugin infra to achieve that. This PR extends the unit tests infra to allow for those hoops to happen in unit tests. This allows the test to be moved to RecoveryDuringReplicationTests Relates to #22484
This commit introduces sequence-number-based recovery. When a replica has fallen out of sync, rather than performing a file-based recovery we first attempt to replay operations since the last local checkpoint on the replica. To do this, at the start of recovery the replica tells the primary what its local checkpoint is. The primary will then wait for all operations between that local checkpoint and the current maximum sequence number to complete; this is to ensure that there are no gaps in the operations that will be replayed from the primary to the replica. This is a best-effort attempt as we currently have no guarantees on the primary that these operations will be available; if we are not able to replay all operations in the desired range, we just fallback to file-based recovery. Later work will strengthen the guarantees.
Relates #10708