Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw back replica local checkpoint on new primary #25452

Merged
merged 18 commits into from
Jul 5, 2017
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
}
}

/**
* Resets the checkpoint to the specified value.
*
* @param checkpoint the local checkpoint to reset this tracker to
*/
synchronized void resetCheckpoint(final long checkpoint) {
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
firstProcessedSeqNo = checkpoint + 1;
nextSeqNo = checkpoint + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that resetting nextSeqNo is incorrect. Assume that the primary-replica resync fails and that the shard here would be promoted to primary, in that case it would reuse the sequence numbers to override stuff it already had. I'll reach out to discuss.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a very long discussion about this. The solution here is fine if we add a follow-up that resets the local checkpoint tracker state on a primary during promotion (the newly promoted primary needs to reset its local checkpoint and mark the sequence numbers in its translog as completed to reestablish the state of the local checkpoint tracker, it has to do this before filling the gaps).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, such a follow-up will introduce a test that captures the problem here, namely that if we do not do something as outlined above, in this scenario a newly promoted primary can overwrite history.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more, I agree with the assessment we had, except for one thing: We should not reset the nextSeqNo variable which is exposed as getMaxSeqNo. Otherwise when writing out segments, this max sequence number information which we take from the local checkpoint tracker would be incorrect, i.e. there could be a document in the segment where the sequence number would be above max.

Put differently, nextSeqNo is not tied to the bit set (which represents the pending confirmation marker). Instead it tracks the actual translog.

this.checkpoint = checkpoint;
}

/**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ public void markSeqNoAsCompleted(final long seqNo) {
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}

/**
* Resets the local checkpoint to the specified value.
*
* @param localCheckpoint the local checkpoint to reset to
*/
public void resetLocalCheckpoint(final long localCheckpoint) {
localCheckpointTracker.resetCheckpoint(localCheckpoint);
}

/**
* The current sequence number stats.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,15 @@ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final
assert operationPrimaryTerm > primaryTerm :
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
logger.trace(
"detected new primary with primary term [{}], "
+ "resetting local checkpoint from [{}] to [{}], "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log line is incorrect, we don't know the value yet at this point towards which we are going to reset the local checkpoint. It is only determined after setting the global checkpoint in the line below. I think it's easiest to move the logging one line below and use getGlobalCheckpoint(). I would also leave out the part which says "updating global checkpoint to {}" as the given value might be below the current global checkpoint, which might be misleading in this message (we already have trace logging for the global checkpoint updates).

+ "updating global checkpoint to [{}]",
operationPrimaryTerm,
getLocalCheckpoint(),
globalCheckpoint,
globalCheckpoint);
getEngine().seqNoService().resetLocalCheckpoint(globalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global checkpoint that is provided by the new primary might be lower than the global checkpoint that we currently have (e.g. the failed primary did communicate the latest global checkpoint to us, but not to the newly appointed primary).
First we have to update the global checkpoint, then use the newly computed global checkpoint to reset the local checkpoint, otherwise the local checkpoint could end up below the global checkpoint.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 5e9d79f.

updateGlobalCheckpointOnReplica(globalCheckpoint);
getEngine().getTranslog().rollGeneration();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.not;

public class LocalCheckpointTrackerTests extends ESTestCase {

Expand All @@ -49,14 +51,14 @@ public class LocalCheckpointTrackerTests extends ESTestCase {

public static LocalCheckpointTracker createEmptyTracker() {
return new LocalCheckpointTracker(
IndexSettingsModule.newIndexSettings(
"test",
Settings
.builder()
.put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE)
.build()),
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.NO_OPS_PERFORMED
IndexSettingsModule.newIndexSettings(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why reformat?

"test",
Settings
.builder()
.put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE)
.build()),
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.NO_OPS_PERFORMED
);
}

Expand Down Expand Up @@ -236,4 +238,24 @@ public void testWaitForOpsToComplete() throws BrokenBarrierException, Interrupte

thread.join();
}

public void testResetCheckpoint() {
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
for (int i = 0; i < operations; i++) {
if (!rarely()) {
tracker.markSeqNoAsCompleted(i);
}
}

final int localCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint()));
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) localCheckpoint));
assertThat(tracker.processedSeqNo, empty());
assertThat(tracker.generateSeqNo(), equalTo((long) (localCheckpoint + 1)));
tracker.markSeqNoAsCompleted((long) (localCheckpoint + 1));
assertThat(tracker.processedSeqNo, not(empty()));
assertThat(tracker.processedSeqNo.peek().get(0), equalTo(true));
}
}
112 changes: 92 additions & 20 deletions core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

/**
Expand Down Expand Up @@ -405,26 +404,10 @@ public void testPrimaryFillsSeqNoGapsOnPromotion() throws Exception {

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
boolean gap = false;
for (int i = 0; i < operations; i++) {
if (!rarely()) {
final String id = Integer.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
getMappingUpdater(indexShard, sourceToParse.type()));
max = i;
} else {
gap = true;
}
}
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));

final int maxSeqNo = max;
if (gap) {
assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo)));
}
final int maxSeqNo = result.maxSeqNo;
final boolean gap = result.gap;

// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
Expand Down Expand Up @@ -637,6 +620,7 @@ public void onFailure(Exception e) {
@Override
public void onResponse(Releasable releasable) {
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
assertThat(indexShard.getLocalCheckpoint(), equalTo(newGlobalCheckPoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
onResponse.set(true);
releasable.close();
Expand Down Expand Up @@ -697,6 +681,7 @@ private void finish() {
assertTrue(onResponse.get());
assertNull(onFailure.get());
assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
assertThat(indexShard.getLocalCheckpoint(), equalTo(newGlobalCheckPoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
}
}
Expand All @@ -707,6 +692,44 @@ private void finish() {
closeShards(indexShard);
}

public void testThrowbackLocalCheckpointOnReplica() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);

// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));

final int globalCheckpoint =
randomIntBetween(
Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED),
Math.toIntExact(indexShard.getLocalCheckpoint()));
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquireReplicaOperationPermit(
indexShard.primaryTerm + 1,
globalCheckpoint,
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
latch.countDown();
}

@Override
public void onFailure(Exception e) {

}
},
ThreadPool.Names.SAME);

latch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) globalCheckpoint));

// ensure that after the local checkpoint throwback and indexing again, the local checkpoint advances
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));

closeShards(indexShard);
}

public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException {
final IndexShard indexShard = newStartedShard(false);

Expand Down Expand Up @@ -1966,6 +1989,55 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
closeShards(newShard);
}

class Result {
private final int localCheckpoint;
private final int maxSeqNo;
private final boolean gap;

Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) {
this.localCheckpoint = localCheckpoint;
this.maxSeqNo = maxSeqNo;
this.gap = gap;
}
}

/**
* Index on the specified shard while introducing sequence number gaps.
*
* @param indexShard the shard
* @param operations the number of operations
* @param offset the starting sequence number
* @return a pair of the maximum sequence number and whether or not a gap was introduced
* @throws IOException if an I/O exception occurs while indexing on the shard
*/
private Result indexOnReplicaWithGaps(
final IndexShard indexShard,
final int operations,
final int offset) throws IOException {
int localCheckpoint = offset;
int max = offset;
boolean gap = false;
for (int i = offset + 1; i < operations; i++) {
if (!rarely()) {
final String id = Integer.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
getMappingUpdater(indexShard, sourceToParse.type()));
if (!gap && i == localCheckpoint + 1) {
localCheckpoint++;
}
max = i;
} else {
gap = true;
}
}
assert localCheckpoint == indexShard.getLocalCheckpoint();
assert !gap || (localCheckpoint != max);
return new Result(localCheckpoint, max, gap);
}

/** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;
Expand Down