-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Block older operations on primary term transition #24779
Conversation
5110598
to
bf5ab75
Compare
Today a replica learns of a new primary term via a cluster state update and there is not a clean transition between the older primary term and the newer primary term. This commit modifies this situation so that: - a replica shard learns of a new primary term via replication operations executed under the mandate of the new primary - when a replica shard learns of a new primary term, it blocks operations on older terms from reaching the engine, with a clear transition point between the operations on the older term and the operations on the newer term This work paves the way for a primary/replica sync on primary promotion. Future work will also ensure a clean transition point on a promoted primary, and prepare a replica shard for a sync with the promoted primary.
64bc8c3
to
857fcdb
Compare
throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", | ||
shardId, opPrimaryTerm, primaryTerm)); | ||
if (operationPrimaryTerm > primaryTerm | ||
&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are many incoming operations with higher term, each one of them will go into this branch and invoke blockOperations (until one completes). This can create additional contention when the first blockOperations is completed and subsequent operations unnecessarily call blockOperations. I've adapted your code in 7ff4a7c so that only the first operation with higher term calls blockOperations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution has the problem that if, primaryTerm == 0
, an operation comes in with operationPrimaryTerm == 1
then another operation comes in with operationPrimaryTerm == 2
and then another ops comes in with operationPrimaryTerm == 1
, it maybe be that the last op is processed before the primaryTerm was incremented to 1 (or 2). This can happen if the first ops passed the check but didn't submit it's block. The 2 op incremented pendingPrimaryTerm
but didn't submit the block and then the 3rd op just passes this along without waiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and now I see that we guard against it with if (operationPrimaryTerm == currentPrimaryTerm)
later on, so the third operation will be failed but with the wrong message (we will say it's too old and give a currentPrimaryTerm
of 0 while the ops term is 1). I think is all just too complex and isn't worth it given how rare primary promotions are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. I think the main discussion point is the concurrency control. I will reach out to discuss in another channel.
@@ -180,7 +178,7 @@ protected void resolveRequest(final IndexMetaData indexMetaData, final Request r | |||
|
|||
/** | |||
* Synchronous replica operation on nodes with replica copies. This is done under the lock form |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is done while having (under?) a permit
&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) { | ||
try { | ||
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { | ||
if (operationPrimaryTerm > primaryTerm) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment as to how it's possible that the term will not be higher (i.e. race condition between checking pendingPrimaryTerm and submitting the blockOperations
throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", | ||
shardId, opPrimaryTerm, primaryTerm)); | ||
if (operationPrimaryTerm > primaryTerm | ||
&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution has the problem that if, primaryTerm == 0
, an operation comes in with operationPrimaryTerm == 1
then another operation comes in with operationPrimaryTerm == 2
and then another ops comes in with operationPrimaryTerm == 1
, it maybe be that the last op is processed before the primaryTerm was incremented to 1 (or 2). This can happen if the first ops passed the check but didn't submit it's block. The 2 op incremented pendingPrimaryTerm
but didn't submit the block and then the 3rd op just passes this along without waiting.
throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", | ||
shardId, opPrimaryTerm, primaryTerm)); | ||
if (operationPrimaryTerm > primaryTerm | ||
&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and now I see that we guard against it with if (operationPrimaryTerm == currentPrimaryTerm)
later on, so the third operation will be failed but with the wrong message (we will say it's too old and give a currentPrimaryTerm
of 0 while the ops term is 1). I think is all just too complex and isn't worth it given how rare primary promotions are.
public void onResponse(final Releasable releasable) { | ||
assert operationPrimaryTerm <= primaryTerm | ||
: "operation primary term [" + operationPrimaryTerm + "] should be at most [" + primaryTerm + "]"; | ||
if (operationPrimaryTerm < primaryTerm) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment please on how this can happen...
*/ | ||
public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException { | ||
if (closed) { | ||
throw new IndexShardClosedException(shardId); | ||
} | ||
try { | ||
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { | ||
assert semaphore.availablePermits() == 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
ensureYellow(); | ||
|
||
// this forces the primary term to propagate to the replicas | ||
client().index(new IndexRequest("test", "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON)).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we make sure we change it/only do it sometimes once we can?
@@ -561,6 +561,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard | |||
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); | |||
final Set<String> initializingIds = | |||
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); | |||
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should move this if clause to before update the routing entry.. @ywelsch this class is your baby, any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss this separately and proceed in a follow-up if needed.
ThreadPool.Names.INDEX); | ||
}; | ||
|
||
final Thread first = new Thread(function.apply(randomBoolean())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we lock down the expected end term based on these booleans and assert for that?
@Override | ||
public void onResponse(Releasable releasable) { | ||
counter.incrementAndGet(); | ||
latch.countDown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a check on the term here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @jasontedor
With #24779 in place, we can now guaranteed that a single translog generation file will never have a sequence number conflict that needs to be resolved by looking at primary terms. These conflicts can a occur when a replica contains an operation which isn't part of the history of a newly promoted primary. That primary can then assign a different operation to the same slot and replicate it to the replica. PS. Knowing that each generation file is conflict free will simplifying repairing these conflicts when we read from the translog. PPS. This PR also fixes some bugs in the piping of primary terms in the bulk shard action. These bugs are a result of the legacy of IndexRequest/DeleteRequest being a ReplicationRequest. We need to change that as a follow up. Relates to #10708
Today a replica learns of a new primary term via a cluster state update and there is not a clean transition between the older primary term and the newer primary term. This commit modifies this situation so that:
This work paves the way for a primary/replica sync on primary promotion. Future work will also ensure a clean transition point on a promoted primary, and prepare a replica shard for a sync with the promoted primary.
Relates #10708