Skip to content

Commit

Permalink
Harden global checkpoint tracker
Browse files Browse the repository at this point in the history
This commit refactors the global checkpont tracker to make it more
resilient. The main idea is to make it more explicit what state is
actually captured and how that state is updated through
replication/cluster state updates etc. It also fixes the issue where the
local checkpoint information is not being updated when a shard becomes
primary. The primary relocation handoff becomes very simple too, we can
just verbatim copy over the internal state.

Relates #25468
  • Loading branch information
ywelsch authored and jasontedor committed Jul 7, 2017
1 parent 2ba9fd2 commit baa87db
Show file tree
Hide file tree
Showing 23 changed files with 1,150 additions and 988 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ public void readFrom(StreamInput in) throws IOException {
localCheckpoint = in.readZLong();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
localCheckpoint = SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT;
}
}

Expand Down Expand Up @@ -1202,6 +1202,8 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.PrimaryContext;
import org.elasticsearch.index.shard.ShardId;

import java.util.Set;
Expand All @@ -41,6 +40,11 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
*/
public static final long NO_OPS_PERFORMED = -1L;

/**
* Represents a local checkpoint coming from a pre-6.0 node
*/
public static final long PRE_60_NODE_LOCAL_CHECKPOINT = -3L;

private final LocalCheckpointTracker localCheckpointTracker;
private final GlobalCheckpointTracker globalCheckpointTracker;

Expand Down Expand Up @@ -135,6 +139,16 @@ public void updateLocalCheckpointForShard(final String allocationId, final long
globalCheckpointTracker.updateLocalCheckpoint(allocationId, checkpoint);
}

/**
* Called when the recovery process for a shard is ready to open the engine on the target shard.
* See {@link GlobalCheckpointTracker#initiateTracking(String)} for details.
*
* @param allocationId the allocation ID of the shard for which recovery was initiated
*/
public void initiateTracking(final String allocationId) {
globalCheckpointTracker.initiateTracking(allocationId);
}

/**
* Marks the shard with the provided allocation ID as in-sync with the primary shard. See
* {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} for additional details.
Expand Down Expand Up @@ -173,26 +187,45 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint) {
globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint);
}

/**
* Returns the local checkpoint information tracked for a specific shard. Used by tests.
*/
public synchronized long getTrackedLocalCheckpointForShard(final String allocationId) {
return globalCheckpointTracker.getTrackedLocalCheckpointForShard(allocationId).getLocalCheckpoint();
}

/**
* Activates the global checkpoint tracker in primary mode (see {@link GlobalCheckpointTracker#primaryMode}.
* Called on primary activation or promotion.
*/
public void activatePrimaryMode(final String allocationId, final long localCheckpoint) {
globalCheckpointTracker.activatePrimaryMode(allocationId, localCheckpoint);
}

/**
* Notifies the service of the current allocation IDs in the cluster state. See
* {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(long, Set, Set)} for details.
* {@link GlobalCheckpointTracker#updateFromMaster(long, Set, Set, Set)} for details.
*
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param activeAllocationIds the allocation IDs of the currently active shard copies
* @param inSyncAllocationIds the allocation IDs of the currently in-sync shard copies
* @param initializingAllocationIds the allocation IDs of the currently initializing shard copies
* @param pre60AllocationIds the allocation IDs of shards that are allocated to pre-6.0 nodes
*/
public void updateAllocationIdsFromMaster(
final long applyingClusterStateVersion, final Set<String> activeAllocationIds, final Set<String> initializingAllocationIds) {
globalCheckpointTracker.updateAllocationIdsFromMaster(applyingClusterStateVersion, activeAllocationIds, initializingAllocationIds);
final long applyingClusterStateVersion, final Set<String> inSyncAllocationIds, final Set<String> initializingAllocationIds,
final Set<String> pre60AllocationIds) {
globalCheckpointTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, initializingAllocationIds,
pre60AllocationIds);
}

/**
* Updates the known allocation IDs and the local checkpoints for the corresponding allocations from a primary relocation source.
* Activates the global checkpoint tracker in primary mode (see {@link GlobalCheckpointTracker#primaryMode}.
* Called on primary relocation target during primary relocation handoff.
*
* @param primaryContext the sequence number context
* @param primaryContext the primary context used to initialize the state
*/
public void updateAllocationIdsFromPrimaryContext(final PrimaryContext primaryContext) {
globalCheckpointTracker.updateAllocationIdsFromPrimaryContext(primaryContext);
public void activateWithPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) {
globalCheckpointTracker.activateWithPrimaryContext(primaryContext);
}

/**
Expand All @@ -209,15 +242,22 @@ public boolean pendingInSync() {
*
* @return the primary context
*/
public PrimaryContext primaryContext() {
return globalCheckpointTracker.primaryContext();
public GlobalCheckpointTracker.PrimaryContext startRelocationHandoff() {
return globalCheckpointTracker.startRelocationHandoff();
}

/**
* Marks a relocation handoff attempt as successful. Moves the tracker into replica mode.
*/
public void completeRelocationHandoff() {
globalCheckpointTracker.completeRelocationHandoff();
}

/**
* Releases a previously acquired primary context.
* Fails a relocation handoff attempt.
*/
public void releasePrimaryContext() {
globalCheckpointTracker.releasePrimaryContext();
public void abortRelocationHandoff() {
globalCheckpointTracker.abortRelocationHandoff();
}

}
Loading

0 comments on commit baa87db

Please sign in to comment.