-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce primary/replica mode for GlobalCheckPointTracker #25468
Introduce primary/replica mode for GlobalCheckPointTracker #25468
Conversation
…at block GCP advancement
…balcheckpoint-tracker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a bunch of nits around assertion messages. There is one important test coverage ask for GlobalCheckpointTracker#initializeWithPrimaryContext
, which caused to mark it as request changes. All the rest LGTM. Thanks @ywelsch
/** | ||
* during relocation handoff there are no entries blocking global checkpoint advancement | ||
*/ | ||
assert !handOffInProgress || pendingInSync.isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a message that says what the pending in sync aIds are?
/** | ||
* the computed global checkpoint is always up-to-date | ||
*/ | ||
assert !primaryMode || globalCheckpoint == computeGlobalCheckPoint(pendingInSync, localCheckpoints.values(), globalCheckpoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a message with the globalCheckpoint and the result of the computation?
/** | ||
* blocking global checkpoint advancement only happens for shards that are not in-sync | ||
*/ | ||
assert !pendingInSync.contains(entry.getKey()) || !entry.getValue().inSync; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add message that indicates which aID it is?
* Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed. | ||
* Initializes the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary activation or promotion. | ||
*/ | ||
public synchronized void initializeAsPrimary(final String allocationId, final long localCheckpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how would you feel about naming this method (and it's counterpart) activatePrimaryMode
? I was confused a couple of times as initialize and primary terms already used in the IndexShard context (a primary relocation target is a primary shard and is already initializing long before the method is called) .
public synchronized void initializeAsPrimary(final String allocationId, final long localCheckpoint) { | ||
assert invariant(); | ||
assert primaryMode == false; | ||
assert localCheckpoints.get(allocationId) != null && localCheckpoints.get(allocationId).inSync && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a message with localCheckpoints.get(allocationId) and allocationId ?
persistMetadata(path, indexSettings, newRouting, currentRouting, logger); | ||
|
||
if (shardRouting.primary()) { | ||
assert Thread.holdsLock(mutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can go away, since you inlined the method
@@ -171,6 +168,8 @@ public RecoveryResponse recoverToTarget() throws IOException { | |||
} | |||
} | |||
|
|||
cancellableThreads.execute(() -> runUnderOperationPermit(() -> shard.initiateTracking(request.targetAllocationId()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question, why did you choose to do it after phase1 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only becomes relevant that we're properly tracking the target shard when starting the engine on the target shard. With this we ensure that we don't miss any local checkpoint updates from the target shard.
@@ -164,9 +165,10 @@ public void testCheckpointsAdvance() throws Exception { | |||
*/ | |||
final Matcher<Long> globalCheckpointMatcher; | |||
if (shardRouting.primary()) { | |||
globalCheckpointMatcher = numDocs == 0 ? equalTo(unassignedSeqNo) : equalTo(numDocs - 1L); | |||
globalCheckpointMatcher = numDocs == 0 ? equalTo(SequenceNumbersService.NO_OPS_PERFORMED) : equalTo(numDocs - 1L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++. It is a good that this is fixed and we start with no ops performed.
|
||
/* | ||
* Now we will add an allocation ID to each of active and initializing and ensure they propagate through. Using different lengths | ||
* than we have been using above ensures that we can not collide with a previous allocation ID | ||
*/ | ||
newActiveAllocationIds.add(randomAlphaOfLength(32)); | ||
// TODO: fix this: newActiveAllocationIds.add(initializingIds.iterator().next()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this line, as it was an illegal operation (adding a fresh in-sync allocation id while the tracker was in primary mode).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clear. thx.
@@ -523,200 +540,21 @@ public void testRaceUpdatingGlobalCheckpoint() throws InterruptedException, Brok | |||
assertThat(tracker.getGlobalCheckpoint(), equalTo((long) nextActiveLocalCheckpoint)); | |||
} | |||
|
|||
public void testPrimaryContextOlderThanAppliedClusterState() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need equivalent tests of initializeWithPrimaryContext
and it's relation to appliedClusterStateVersion
?
@bleskes I've addressed all comments. Have another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the thorough test.
|
||
activatePrimary(clusterState, oldPrimary); | ||
|
||
for (int i = 0; i < randomInt(10); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this samples the randomInt again and again.. much less likely to get to 10.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
This PR refactors the GlobalCheckPointTracker to make it more resilient. The main idea is to make it more explicit what state is actually captured and how that state is updated through replication / cluster state updates etc. It also fixes the issue where the local checkpoint information is not being updated when a shard becomes primary. The primary relocation handoff becomes very simple too, we can just verbatim copy over the internal state.
The PR still misses some tests, which I will address soon. The main reason for opening it as is to get initial feedback.