-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replicate write failures #23314
Replicate write failures #23314
Conversation
Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually? |
@@ -184,6 +192,11 @@ public Failure(StreamInput in) throws IOException { | |||
id = in.readOptionalString(); | |||
cause = in.readException(); | |||
status = ExceptionsHelper.status(cause); | |||
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { | |||
seqNo = in.readLong(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @jasontedor want's us to strandardize on readZlong. @jasontedor correct me if I'm wrong..
@@ -309,36 +311,47 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind | |||
for (int i = 0; i < request.items().length; i++) { | |||
BulkItemRequest item = request.items()[i]; | |||
assert item.getPrimaryResponse() != null : "expected primary response to be set for item [" + i + "] request ["+ item.request() +"]"; | |||
if (item.getPrimaryResponse().isFailed() == false && | |||
item.getPrimaryResponse().getResponse().getResult() != DocWriteResponse.Result.NOOP) { | |||
if (item.getPrimaryResponse().getResponse().getResult() != DocWriteResponse.Result.NOOP) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder (though not sure) if we can check for the existence of a seqNo and that's it.
if (operationResult.hasFailure()) { | ||
// check if any transient write operation failures should be bubbled up | ||
Exception failure = operationResult.getFailure(); | ||
assert failure instanceof VersionConflictEngineException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI we don't throw version conflicts on replicas any more... (not related to your change)
@@ -445,4 +458,12 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind | |||
primaryResponse.getSeqNo(), request.primaryTerm(), version, versionType); | |||
return replica.delete(delete); | |||
} | |||
|
|||
private Engine.NoOpResult executeNoOpRequestOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should call this executeFailedSeqNoOnReplica .. might be a clearer name at this level?
private Engine.NoOpResult executeNoOpRequestOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { | ||
final long version = docWriteRequest.version(); | ||
final VersionType versionType = docWriteRequest.versionType().versionTypeForReplicationAndRecovery(); | ||
final Engine.NoOp noOp = replica.prepareNoOpOnReplica(docWriteRequest.type(), docWriteRequest.id(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are versions relevant here?
Thx @areek . I think this is in the right direction. I left some comments. I also think it needs some work in InternalEngine as the primary also needs to add a noop to its translog. |
a2e4e4d
to
535168b
Compare
5e89b72
to
e904db7
Compare
Thanks @bleskes for the review. I updated the PR adding:
|
e904db7
to
9d62610
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @areek . Basics look good. I left some comments.
} | ||
|
||
/** Result holder of executing shard bulk on primary */ | ||
public static class BulkShardResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a new class? can't we use WritePrimaryResult?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially used WritePrimaryResult
for this, but that changed the visibility of WritePrimaryResult
from protected to public. Hence I introduced BulkShardResult
which allows us to use TransportShardBulkAction#performOnPrimary
in ESIndexLevelReplicationTestCase
without exposing WritePrimaryResult
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the new class and exposed WritePrimaryResult instead, as discussed
if (primaryResponse.isFailed()) { | ||
return primaryResponse.getFailure().getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO; | ||
} else { | ||
// NOTE: for bwc as pre-6.0 write requests has unassigned seq no |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this comment, can you clarify?
return new WriteReplicaResult<>(request, location, null, replica, logger); | ||
} | ||
|
||
public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { | ||
Translog.Location location = null; | ||
for (int i = 0; i < request.items().length; i++) { | ||
BulkItemRequest item = request.items()[i]; | ||
if (shouldExecuteReplicaItem(item, i)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder how much this check buys us now. We basically have 3 options. Either we index normally, index a failure/noop or skip all together. This is now hard to understand because it's partially in this method and partially here in the first if clause. Can we bring it all together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I attempted to bring it all together through an enum (ReplicaItemExecutionMode
) with three modes NORMAL
, NOOP
, FAILURE
return new Engine.NoOp(uid, seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason); | ||
} | ||
|
||
public Engine.NoOpResult noOp(Engine.NoOp noOp) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should call these something like markSeqNoAsNoOp. It feels weird to the the shard to do nothing ;) The previous method can maybe called preparingMarkingSeqNoAsNoOp?
@@ -2615,10 +2615,13 @@ public void testHandleDocumentFailure() throws Exception { | |||
} | |||
Engine.IndexResult indexResult = engine.index(indexForDoc(doc1)); | |||
assertNotNull(indexResult.getFailure()); | |||
|
|||
// document failures should be recorded in translog | |||
assertNotNull(indexResult.getTranslogLocation()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commenting here because this is the only change in this file. Shouldn't we have some test around the noop used on replicas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is already a test for engine noOp
operation here, which the replica uses to record failures. I added a unit test (testNoOpReplicationOnPrimaryDocumentFailure
) in TransportShardBulkActionTests
to verify the noOp parameters when a primary document failure is seen
final BulkItemResponse response = index(indexRequest); | ||
if (response.isFailed()) { | ||
throw response.getFailure().getCause(); | ||
} else if (response.isFailed() == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is always true no?
*/ | ||
public void testDocumentFailureReplication() throws Exception { | ||
IndexMetaData metaData = buildIndexMetaData(1); | ||
final ReplicationGroup replicationGroupWithDocumentFailureOnPrimary = new ReplicationGroup(metaData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call it something short and put it in a try finaly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
IndexMetaData metaData = buildIndexMetaData(1); | ||
final ReplicationGroup replicationGroupWithDocumentFailureOnPrimary = new ReplicationGroup(metaData) { | ||
@Override | ||
protected EngineFactory getEngineFactory(ShardRouting routing) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we encapsulate this in a utility class (similar to what i did)? there's a lot of craft here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the pointer, I added a utility class
d15d1f6
to
ed82ee6
Compare
thanks for the feedback @bleskes. I addressed all the comments. would appreciate another review. |
ed82ee6
to
1c943df
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM. I left some minor comments. I also notice I went through this code too often now and I've "grown too used to it". Would be great if @dakrone can give it a review as well with some fresh eyes.
@@ -533,6 +596,12 @@ static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int | |||
return replica.delete(delete); | |||
} | |||
|
|||
private static Engine.NoOpResult executeFailedSeqNoOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { | |||
final Engine.NoOp noOp = replica.preparingMarkingSeqNoAsNoOp(docWriteRequest.type(), docWriteRequest.id(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is typically called prepareX
final long startTime, | ||
final String reason) { | ||
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); | ||
final Term uid, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance we can fold them into 1 line now?
final Origin origin, | ||
final long startTime, | ||
final String reason) { | ||
super(uid, seqNo, primaryTerm, 0, null, origin, startTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it work to use an illegal version value? -1?
@@ -567,28 +567,35 @@ private IndexShardState changeState(IndexShardState newState, String reason) { | |||
return result; | |||
} | |||
|
|||
public Engine.NoOp preparingMarkingSeqNoAsNoOp(String type, String id, long seqNo, String reason) { | |||
verifyReplicationTarget(); | |||
final Term uid = extractUid(type, id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does noop need an doc id? it's weird no?
@@ -567,28 +567,35 @@ private IndexShardState changeState(IndexShardState newState, String reason) { | |||
return result; | |||
} | |||
|
|||
public Engine.NoOp preparingMarkingSeqNoAsNoOp(String type, String id, long seqNo, String reason) { | |||
verifyReplicationTarget(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
1c943df
to
180d654
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I left a few minor comments, thanks @areek!
* When primary execution failed before sequence no was generated | ||
* or primary execution was a noop (only possible when request is originating from pre-6.0 nodes) | ||
*/ | ||
NOOP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an assert that will fail on 7.0.0 that we should remove the NOOP and all handling code for it?
operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica); | ||
assert operationResult != null : "operation result must never be null when primary response has no failure"; | ||
location = syncOperationResultOrThrow(operationResult, location); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a default
clause for this switch statement?
@@ -533,6 +596,12 @@ static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int | |||
return replica.delete(delete); | |||
} | |||
|
|||
private static Engine.NoOpResult executeFailedSeqNoOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be named something a little closer to what it does (the name makes it sound like it executes the failing request on the replica), so maybe executeFailureNoOpOnReplica(..)
?
@@ -569,28 +569,34 @@ private IndexShardState changeState(IndexShardState newState, String reason) { | |||
return result; | |||
} | |||
|
|||
public Engine.NoOp prepareMarkingSeqNoAsNoOp(long seqNo, String reason) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To follow the standard that we have use here, I think this should be named prepareNoOpOnReplica(...)
(since we have prepareDeleteOnReplica
and prepareIndexOnReplica
)
return new Engine.NoOp(seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason); | ||
} | ||
|
||
public Engine.NoOpResult markSeqNoAsNoOp(Engine.NoOp noOp) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the other IndexShard
methods correspond to their Engine
implementations, so I think this should be called just noOp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was my ask. I find IndexShard.noOp
to be confusing - why are we doing nothing?. I would be good with renaming the engine method to match the one here. @dakrone would you be good with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's reasonable to me!
180d654
to
a53a58f
Compare
Currently, when a primary write operation fails after generating a sequence number, the failure is not communicated to the replicas. Ideally, every operation which generates a sequence number on primary should be recorded in all replicas. In this change, a sequence number is associated with write operation failure. When a failure with an assinged seqence number arrives at a replica, the failure cause and sequence number is recorded in the translog and the sequence number is marked as completed via executing `Engine.noOp` on the replica engine.
Test that document failure (w/ seq no generated) are recorded as no-op in the translog for primary and replica shards
depanding on it's primary execution
01400ad
to
d153fdc
Compare
* master: Add BucketMetricValue interface (elastic#24188) Enable index-time sorting (elastic#24055) Clarify elasticsearch user uid:gid mapping in Docker docs Update field-names-field.asciidoc (elastic#24178) ElectMasterService.hasEnoughMasterNodes should return false if no masters were found Remove Ubuntu 12.04 (elastic#24161) [Test] Add unit tests for InternalHDRPercentilesTests (elastic#24157) Replicate write failures (elastic#23314) Rename variable in translog simple commit test Strengthen translog commit with open view test Stronger check in translog prepare and commit test Fix translog prepare commit and commit test ingest-node.asciidoc - Clarify json processor (elastic#21876) Painless: more testing for script_stack (elastic#24168)
Currently, when a primary write operation fails after generating
a sequence number, the failure is not communicated to the replicas.
Ideally, every operation which generates a sequence number on primary
should be recorded in all replicas.
In this change, a sequence number is associated with write operation
failure. When a failure with an assinged seqence number arrives at a
replica, the failure cause and sequence number is recorded in the translog
and the sequence number is marked as completed via executing
Engine.noOp
on the replica engine.