Skip to content

Commit

Permalink
Trim local translog in peer recovery (#44756)
Browse files Browse the repository at this point in the history
Today, if an operation-based peer recovery occurs, we won't trim
translog but leave it as is. Some unacknowledged operations existing in
translog of that replica might suddenly reappear when it gets promoted.
With this change, we ensure trimming translog above the starting
sequence number of phase 2. This change can allow us to read translog
forward.
  • Loading branch information
dnhatn committed Aug 11, 2019
1 parent 1cd464d commit 25c6102
Show file tree
Hide file tree
Showing 17 changed files with 202 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ public void trimTranslog() {
/**
* Rolls the tranlog generation and cleans unneeded.
*/
private void rollTranslogGeneration() {
public void rollTranslogGeneration() {
final Engine engine = getEngine();
engine.rollTranslogGeneration();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public int skippedOperations() {

@Override
public Translog.Operation next() throws IOException {
// TODO: Read translog forward in 9.0+
for (; index >= 0; index--) {
final TranslogSnapshot current = translogs[index];
Translog.Operation op;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler<Recovery
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request);
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(),
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(),
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,30 @@

import java.io.IOException;

public class RecoveryFinalizeRecoveryRequest extends TransportRequest {
final class RecoveryFinalizeRecoveryRequest extends TransportRequest {

private long recoveryId;
private ShardId shardId;
private long globalCheckpoint;
private final long recoveryId;
private final ShardId shardId;
private final long globalCheckpoint;
private final long trimAboveSeqNo;

public RecoveryFinalizeRecoveryRequest(StreamInput in) throws IOException {
RecoveryFinalizeRecoveryRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
globalCheckpoint = in.readZLong();
globalCheckpoint = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
trimAboveSeqNo = in.readZLong();
} else {
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint) {
RecoveryFinalizeRecoveryRequest(final long recoveryId, final ShardId shardId, final long globalCheckpoint, final long trimAboveSeqNo) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.globalCheckpoint = globalCheckpoint;
this.trimAboveSeqNo = trimAboveSeqNo;
}

public long recoveryId() {
Expand All @@ -63,13 +66,18 @@ public long globalCheckpoint() {
return globalCheckpoint;
}

public long trimAboveSeqNo() {
return trimAboveSeqNo;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(globalCheckpoint);
out.writeZLong(globalCheckpoint);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeZLong(trimAboveSeqNo);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,9 @@ && isTargetSameHistory()

}, onFailure);

sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, finalizeStep), onFailure);
// Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2
final long trimAboveSeqNo = startingSeqNo - 1;
sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);

finalizeStep.whenComplete(r -> {
final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time
Expand Down Expand Up @@ -750,7 +752,7 @@ private void sendBatch(
}
}

void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException {
void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
Expand All @@ -767,7 +769,7 @@ void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Voi
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, finalizeListener));
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener));
finalizeListener.whenComplete(r -> {
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,16 +290,24 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
}

@Override
public void finalizeRecovery(final long globalCheckpoint, ActionListener<Void> listener) {
public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
final IndexShard indexShard = indexShard();
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
// Persist the global checkpoint.
indexShard.sync();
indexShard.persistRetentionLeases();
if (hasUncommittedOperations()) {
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
}
if (trimAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// We should erase all translog operations above trimAboveSeqNo as we have received either the same or a newer copy
// from the recovery source in phase2. Rolling a new translog generation is not strictly required here for we won't
// trim the current generation. It's merely to satisfy the assumption that the current generation does not have any
// operation that would be trimmed (see TranslogWriter#assertNoSeqAbove). This assumption does not hold for peer
// recovery because we could have received operations above startingSeqNo from the previous primary terms.
indexShard.rollTranslogGeneration();
indexShard.trimOperationOfPreviousPrimaryTerms(trimAboveSeqNo);
}
indexShard.finalizeRecovery();
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ public interface RecoveryTargetHandler {
* the global checkpoint.
*
* @param globalCheckpoint the global checkpoint on the recovery source
* @param trimAboveSeqNo The recovery target should erase its existing translog above this sequence number
* from the previous primary terms.
* @param listener the listener which will be notified when this method is completed
*/
void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener);
void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener);

/**
* Handoff the primary context between the relocation source and the relocation target.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
}

@Override
public void finalizeRecovery(final long globalCheckpoint, final ActionListener<Void> listener) {
public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, final ActionListener<Void> listener) {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint),
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint, trimAboveSeqNo),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
Expand Down Expand Up @@ -184,6 +185,7 @@
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
Expand Down Expand Up @@ -5938,4 +5940,34 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
assertNotNull(engine.failedEngine.get());
}
}

/**
* We can trim translog on primary promotion and peer recovery based on the fact we add operations with either
* REPLICA or PEER_RECOVERY origin to translog although they already exist in the engine (i.e. hasProcessed() == true).
* If we decide not to add those already-processed operations to translog, we need to study carefully the consequence
* of the translog trimming in these two places.
*/
public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws Exception {
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
applyOperations(engine, operations);
Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos));
}
primaryTerm.set(randomLongBetween(primaryTerm.get(), Long.MAX_VALUE));
engine.rollTranslogGeneration();
engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertNull(snapshot.next());
}
applyOperations(engine, operations);
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size() * 2));
assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,9 @@ protected EngineFactory getEngineFactory(ShardRouting routing) {
recoveryStart.countDown();
return new RecoveryTarget(indexShard, node, recoveryListener) {
@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
recoveryDone.set(true);
super.finalizeRecovery(globalCheckpoint, listener);
super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener);
}
};
});
Expand Down Expand Up @@ -868,13 +868,13 @@ public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.Metada
}

@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
if (hasBlocked() == false) {
// it maybe that not ops have been transferred, block now
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
blockIfNeeded(RecoveryState.Stage.FINALIZE);
super.finalizeRecovery(globalCheckpoint, listener);
super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ public void onFailure(Exception e) {
onFailureException.get(), hasToString(containsString("operation primary term [" + oldPrimaryTerm + "] is too old")));
}

closeShards(indexShard);
closeShard(indexShard, false); // skip asserting translog and Lucene as we rolled back Lucene but did not execute resync
}

public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception {
Expand Down Expand Up @@ -2767,8 +2767,8 @@ public void indexTranslogOperations(
}

@Override
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint,
public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener<Void> listener) {
super.finalizeRecovery(globalCheckpoint, trimAboveSeqNo,
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.indices.recovery;

import org.apache.lucene.analysis.TokenStream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
Expand Down Expand Up @@ -75,6 +77,7 @@
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.flush.SyncedFlushUtil;
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -101,6 +104,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -117,6 +121,8 @@

import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand All @@ -125,6 +131,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

Expand Down Expand Up @@ -1429,4 +1436,56 @@ public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception {
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
}
}
public void testPeerRecoveryTrimsLocalTranslog() throws Exception {
internalCluster().startNode();
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
String indexName = "test-index";
createIndex(indexName, Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.put("index.routing.allocation.include._name", String.join(",", dataNodes)).build());
ensureGreen(indexName);
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
DiscoveryNode nodeWithOldPrimary = clusterState.nodes().get(clusterState.routingTable()
.index(indexName).shard(0).primaryShard().currentNodeId());
MockTransportService transportService = (MockTransportService) internalCluster()
.getInstance(TransportService.class, nodeWithOldPrimary.getName());
CountDownLatch readyToRestartNode = new CountDownLatch(1);
AtomicBoolean stopped = new AtomicBoolean();
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals("indices:data/write/bulk[s][r]") && randomInt(100) < 5) {
throw new NodeClosedException(nodeWithOldPrimary);
}
// prevent the primary from marking the replica as stale so the replica can get promoted.
if (action.equals("internal:cluster/shard/failure")) {
stopped.set(true);
readyToRestartNode.countDown();
throw new NodeClosedException(nodeWithOldPrimary);
}
connection.sendRequest(requestId, action, request, options);
});
Thread[] indexers = new Thread[randomIntBetween(1, 8)];
for (int i = 0; i < indexers.length; i++) {
indexers[i] = new Thread(() -> {
while (stopped.get() == false) {
try {
IndexResponse response = client().prepareIndex(indexName, "_doc")
.setSource(Collections.singletonMap("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON)
.get();
assertThat(response.getResult(), isOneOf(CREATED, UPDATED));
} catch (ElasticsearchException ignored) {
}
}
});
}
for (Thread indexer : indexers) {
indexer.start();
}
readyToRestartNode.await();
transportService.clearAllRules();
internalCluster().restartNode(nodeWithOldPrimary.getName(), new InternalTestCluster.RestartCallback());
for (Thread indexer : indexers) {
indexer.join();
}
ensureGreen(indexName);
}
}
Loading

0 comments on commit 25c6102

Please sign in to comment.