-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Guarantee that translog generations are seqNo conflict free #24825
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some minors LGTM in general
@@ -1260,9 +1263,16 @@ public int hashCode() { | |||
return 31 * 31 * 31 + 31 * 31 * Long.hashCode(seqNo) + 31 * Long.hashCode(primaryTerm) + reason().hashCode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this hashcode looks odd....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed. I generated a new one.
@@ -90,6 +96,13 @@ private TranslogWriter( | |||
assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo; | |||
this.maxSeqNo = initialCheckpoint.maxSeqNo; | |||
this.globalCheckpointSupplier = globalCheckpointSupplier; | |||
boolean assertionsEnabled = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert (seenSequenceNumbers = new HashMap<>()) != null;
if you wanna have a one-liner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then I think I can't make seenSequenceNumbers
final which is a shame...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please integrate #24834 and use it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
integrated. Nice work.
DocWriteResponse primaryResponse, | ||
IndexRequest request, | ||
IndexShard replica) throws IOException { | ||
DocWriteResponse primaryResponse, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you maybe un-indent this into a single line?
try { | ||
verifyReplicationTarget(); | ||
assert primaryTerm == this.primaryTerm : "op term [ " + primaryTerm + " ] != shard term [" + this.primaryTerm + "]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a hard exception? something is bloody wrong here if this happens? maybe IllegalStateException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one is an interesting one - yes things are totally fucked up. I'm torn as to whether to change it - with assertions nodes die / test fail with the right exception as we never catch them. An IlegalStateException will cause the shard to be failed and then the failure may cascade further. I'm tempted to keep as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be kept as is exactly for the reason that @bleskes mentions regarding the consequences in production code of changing this to a hard failure and the possible loss of visibility when tests are running. This really should never happen.
@@ -597,6 +598,7 @@ private IndexShardState changeState(IndexShardState newState, String reason) { | |||
public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm, | |||
long version, VersionType versionType) { | |||
verifyReplicationTarget(); | |||
assert primaryTerm == this.primaryTerm : "op term [ " + primaryTerm + " ] != shard term [" + this.primaryTerm + "]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here?
@@ -1879,8 +1881,9 @@ public void acquireReplicaOperationPermit( | |||
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { | |||
assert operationPrimaryTerm > primaryTerm; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a message to this assertion while you are at it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
return new Translog.Location(generation, offset, data.length()); | ||
} | ||
|
||
private boolean assertSeqNoNotSeen(long seqNo, BytesReference data) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's only called in one place but can you make it synchronized just for readability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left minor comments around primary term but LGTM o.w.
@@ -1257,12 +1260,22 @@ public boolean equals(Object obj) { | |||
|
|||
@Override | |||
public int hashCode() { | |||
return 31 * 31 * 31 + 31 * 31 * Long.hashCode(seqNo) + 31 * Long.hashCode(primaryTerm) + reason().hashCode(); | |||
int result = (int) (seqNo ^ (seqNo >>> 32)); | |||
result = 31 * result + (int) (primaryTerm ^ (primaryTerm >>> 32)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result = 31 * result + Long.hashCode(primaryTerm);
is simpler (id. for seqNo above)
another alternative would be to use Objects.hashCode(seqNo, primaryTerm, reason)
either way, we can avoid the bit-fiddling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just let IntelliJ autogen it.
}); | ||
} catch (final InterruptedException | TimeoutException e) { | ||
} catch (final InterruptedException | TimeoutException | IOException | AlreadyClosedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just catch (Exception)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, this just got out of hand.
@@ -522,10 +522,11 @@ private IndexShardState changeState(IndexShardState newState, String reason) { | |||
} | |||
} | |||
|
|||
public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long version, VersionType versionType, long autoGeneratedIdTimestamp, | |||
boolean isRetry) { | |||
public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long primaryTerm, long version, VersionType versionType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling this just primaryTerm
is confusing (in light of a future PR that uses this code during resync). Here it means the term of the primary that was sending this operation, not necessarily the primary term of the log entry that is being replicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it means the term of the primary that was sending this operation, not necessarily the primary term of the log entry that is being replicated
Actually it's the term of the op in the log and not the authority of the primary that sends this op (that one gause into the permit method). I guess this proves your point about being confusing. Any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spoke to Yannick on a different channel. We decided to rename the parameter name opPrimaryTerm and relax the assertion to reflect semantics (as opposed to current usage).
@@ -667,7 +669,7 @@ static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest r | |||
final long version = primaryResponse.getVersion(); | |||
assert versionType.validateVersionForWrites(version); | |||
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), | |||
primaryResponse.getSeqNo(), request.primaryTerm(), version, versionType); | |||
primaryResponse.getSeqNo(), primaryTerm, version, versionType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's trappy that the IndexRequest/DeleteRequest object might not have a primaryTerm properly set if it's wrapped in a BulkShardRequest. Maybe we could override BulkShardRequest.primaryTerm(long)
to set the primary term of the inner objects until we have fixed this discrepancy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments.
return 31 * 31 * 31 + 31 * 31 * Long.hashCode(seqNo) + 31 * Long.hashCode(primaryTerm) + reason().hashCode(); | ||
int result = (int) (seqNo ^ (seqNo >>> 32)); | ||
result = 31 * result + (int) (primaryTerm ^ (primaryTerm >>> 32)); | ||
result = 31 * result + reason.hashCode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this? It was perfectly fine as it was, and far more readable. Note that the previous calculation was simply the result of expanding the usual hash code calculation:
accumulator = 1
for (o in objects) {
accumulator = 31 * accumulator + hash(o)
}
return accumulator
So you end up with 31^k + 31^(k - 1) * hash_1 + 31^(k - 2) * hash_2 + ... 31^0 * hash_k
hence why it looks like 31^3 + ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can roll it back. I just moved it to be the standard autogen-ed intellij code, which is what we see all over the place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just going to roll this back. It's not related to my changed and it seems it's contentious.
if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
// nothing to do | ||
} else if (seenSequenceNumbers.containsKey(seqNo)) { | ||
final Tuple<BytesReference, Exception> previous = seenSequenceNumbers.get(seqNo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not StackTraceElement[]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it this way so it can easily integrate into a caused by clause and have a proper message. Do you have a better suggestion there?
} | ||
} else { | ||
seenSequenceNumbers.put(seqNo, | ||
new Tuple<>(new BytesArray(data.toBytesRef(), true), new RuntimeException("stack capture previous op"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not Thread.currentThread().getStackTrace()
?
@@ -90,6 +96,13 @@ private TranslogWriter( | |||
assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo; | |||
this.maxSeqNo = initialCheckpoint.maxSeqNo; | |||
this.globalCheckpointSupplier = globalCheckpointSupplier; | |||
boolean assertionsEnabled = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please integrate #24834 and use it here.
@@ -522,10 +522,11 @@ private IndexShardState changeState(IndexShardState newState, String reason) { | |||
} | |||
} | |||
|
|||
public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long version, VersionType versionType, long autoGeneratedIdTimestamp, | |||
boolean isRetry) { | |||
public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long primaryTerm, long version, VersionType versionType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree.
try { | ||
verifyReplicationTarget(); | ||
assert primaryTerm == this.primaryTerm : "op term [ " + primaryTerm + " ] != shard term [" + this.primaryTerm + "]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be kept as is exactly for the reason that @bleskes mentions regarding the consequences in production code of changing this to a hard failure and the possible loss of visibility when tests are running. This really should never happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left another comment.
DocWriteResponse primaryResponse, | ||
IndexRequest request, | ||
IndexShard replica) throws IOException { | ||
private static Engine.IndexResult executeIndexRequestOnReplica(DocWriteResponse primaryResponse, IndexRequest request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A reason that I prefer the line-per-parameter form for methods definitions that spill over multiple lines is because had you maintained it in this case the diff would simply be:
--- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -531,6 +531,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
private static Engine.IndexResult executeIndexRequestOnReplica(
DocWriteResponse primaryResponse,
IndexRequest request,
+ long primaryTerm,
IndexShard replica) throws IOException {
final Engine.Index operation;
and git blame
would trace that additional parameter back to this PR rather than the entire method declaration.
retest this please. |
thanks @s1monw @ywelsch @jasontedor |
With #24779 in place, we can now guaranteed that a single translog generation file will never have a sequence number conflict that needs to be resolved by looking at primary terms. These conflicts can a occur when a replica contains an operation which isn't part of the history of a newly promoted primary. That primary can then assign a different operation to the same slot and replicate it to the replica.
PS. Knowing that each generation file is conflict free will simplifying repairing these conflicts when we read from the translog.
PPS. This PR also fixes some bugs in the piping of primary terms in the bulk shard action. These bugs are a result of the legacy of IndexRequest/DeleteRequest being a ReplicationRequest. We need to change that as a follow up.
Relates to #10708