Skip to content

Commit

Permalink
Verify AllocationIDs in replication actions (#20320)
Browse files Browse the repository at this point in the history
Replicated operation consist of a routing action (the original), which is in charge of sending the operation to the primary shard, a primary action which executes the operation on the resolved primary and replica actions which performs the operation on a specific replica. This commit adds the targeted shard's allocation id to the primary and replica actions and makes sure that those match the shard the actions end up executing on.

This helps preventing extremely rare failure mode where a shard moves off a node and back to it, all between an action is sent and the time it's processed. 

For example:
1) Primary action is sent to a relocating primary on node A.
2) The primary finishes relocation to node B and start relocating back.
3) The relocation back gets to the phase and opens up the target engine, on the original node, node A.
4) The primary action is executed on the target engine before the relocation finishes, at which the shard copy on node B is still the official primary - i.e., it is executed on the wrong primary.
  • Loading branch information
bleskes authored Sep 6, 2016
1 parent b6bf20c commit c56cd46
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -53,14 +55,17 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportChannelResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
Expand All @@ -69,6 +74,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -115,9 +121,12 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, request, executor, new PrimaryOperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
new PrimaryOperationTransportHandler());
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true, true,
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteShardRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());

this.transportOptions = transportOptions();
Expand Down Expand Up @@ -163,7 +172,7 @@ protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, Re

/**
* Synchronous replica operation on nodes with replica copies. This is done under the lock form
* {@link #acquireReplicaOperationLock(ShardId, long, ActionListener)}.
* {@link #acquireReplicaOperationLock(ShardId, long, String, ActionListener)}.
*/
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest);

Expand Down Expand Up @@ -230,33 +239,36 @@ public void messageReceived(Request request, TransportChannel channel) throws Ex
}
}

class PrimaryOperationTransportHandler implements TransportRequestHandler<Request> {
class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}

@Override
public void messageReceived(Request request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request, channel, (ReplicationTask) task).run();
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
}
}

class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<PrimaryShardReference> {

private final Request request;
/** targetAllocationID of the shard this request is meant for */
private final String targetAllocationID;
private final TransportChannel channel;
private final ReplicationTask replicationTask;

AsyncPrimaryAction(Request request, TransportChannel channel, ReplicationTask replicationTask) {
AsyncPrimaryAction(Request request, String targetAllocationID, TransportChannel channel, ReplicationTask replicationTask) {
this.request = request;
this.targetAllocationID = targetAllocationID;
this.channel = channel;
this.replicationTask = replicationTask;
}

@Override
protected void doRun() throws Exception {
acquirePrimaryShardReference(request.shardId(), this);
acquirePrimaryShardReference(request.shardId(), targetAllocationID, this);
}

@Override
Expand All @@ -271,7 +283,9 @@ public void onResponse(PrimaryShardReference primaryShardReference) {
final ShardRouting primary = primaryShardReference.routingEntry();
assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction, request, transportOptions,
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId()),
transportOptions,
new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
TransportReplicationAction.this::newResponseInstance) {

Expand Down Expand Up @@ -391,15 +405,17 @@ public void respond(ActionListener<TransportResponse.Empty> listener) {
}
}

class ReplicaOperationTransportHandler implements TransportRequestHandler<ReplicaRequest> {
class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<ReplicaRequest>> {
@Override
public void messageReceived(final ReplicaRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final ConcreteShardRequest<ReplicaRequest> request, final TransportChannel channel)
throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}

@Override
public void messageReceived(ReplicaRequest request, TransportChannel channel, Task task) throws Exception {
new AsyncReplicaAction(request, channel, (ReplicationTask) task).run();
public void messageReceived(ConcreteShardRequest<ReplicaRequest> requestWithAID, TransportChannel channel, Task task)
throws Exception {
new AsyncReplicaAction(requestWithAID.request, requestWithAID.targetAllocationID, channel, (ReplicationTask) task).run();
}
}

Expand All @@ -417,6 +433,8 @@ public RetryOnReplicaException(StreamInput in) throws IOException {

private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener<Releasable> {
private final ReplicaRequest request;
// allocation id of the replica this request is meant for
private final String targetAllocationID;
private final TransportChannel channel;
/**
* The task on the node with the replica shard.
Expand All @@ -426,10 +444,11 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
// something we want to avoid at all costs
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());

AsyncReplicaAction(ReplicaRequest request, TransportChannel channel, ReplicationTask task) {
AsyncReplicaAction(ReplicaRequest request, String targetAllocationID, TransportChannel channel, ReplicationTask task) {
this.request = request;
this.channel = channel;
this.task = task;
this.targetAllocationID = targetAllocationID;
}

@Override
Expand Down Expand Up @@ -464,7 +483,9 @@ public void onNewClusterState(ClusterState state) {
String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
TransportChannelResponseHandler<TransportResponse.Empty> handler =
new TransportChannelResponseHandler<>(logger, channel, extraMessage, () -> TransportResponse.Empty.INSTANCE);
transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler);
transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
new ConcreteShardRequest<>(request, targetAllocationID),
handler);
}

@Override
Expand Down Expand Up @@ -501,7 +522,7 @@ protected void responseWithFailure(Exception e) {
protected void doRun() throws Exception {
setPhase(task, "replica");
assert request.shardId() != null : "request shardId must be set";
acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), this);
acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), targetAllocationID, this);
}

/**
Expand Down Expand Up @@ -598,7 +619,7 @@ private void performLocalAction(ClusterState state, ShardRouting primary, Discov
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ",
transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
}
performAction(node, transportPrimaryAction, true);
performAction(node, transportPrimaryAction, true, new ConcreteShardRequest<>(request, primary.allocationId().getId()));
}

private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
Expand All @@ -620,7 +641,7 @@ private void performRemoteAction(ClusterState state, ShardRouting primary, Disco
request.shardId(), request, state.version(), primary.currentNodeId());
}
setPhase(task, "rerouted");
performAction(node, actionName, false);
performAction(node, actionName, false, request);
}

private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
Expand Down Expand Up @@ -671,8 +692,9 @@ private void handleBlockException(ClusterBlockException blockException) {
}
}

private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction) {
transportService.sendRequest(node, action, request, transportOptions, new TransportResponseHandler<Response>() {
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
final TransportRequest requestToPerform) {
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {

@Override
public Response newInstance() {
Expand Down Expand Up @@ -700,7 +722,7 @@ public void handleException(TransportException exp) {
(org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
"received an error from node [{}] for request [{}], scheduling a retry",
node.getId(),
request),
requestToPerform),
exp);
retry(exp);
} else {
Expand Down Expand Up @@ -794,7 +816,8 @@ void retryBecauseUnavailable(ShardId shardId, String message) {
* tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
*/
protected void acquirePrimaryShardReference(ShardId shardId, ActionListener<PrimaryShardReference> onReferenceAcquired) {
protected void acquirePrimaryShardReference(ShardId shardId, String allocationId,
ActionListener<PrimaryShardReference> onReferenceAcquired) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
// we may end up here if the cluster state used to route the primary is so stale that the underlying
Expand All @@ -804,6 +827,10 @@ protected void acquirePrimaryShardReference(ShardId shardId, ActionListener<Prim
throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(),
"actual shard is not a primary " + indexShard.routingEntry());
}
final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
if (actualAllocationId.equals(allocationId) == false) {
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
}

ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
@Override
Expand All @@ -823,9 +850,14 @@ public void onFailure(Exception e) {
/**
* tries to acquire an operation on replicas. The lock is closed as soon as replication is completed on the node.
*/
protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, ActionListener<Releasable> onLockAcquired) {
protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, final String allocationId,
ActionListener<Releasable> onLockAcquired) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
if (actualAllocationId.equals(allocationId) == false) {
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
}
indexShard.acquireReplicaOperationLock(primaryTerm, onLockAcquired, executor);
}

Expand Down Expand Up @@ -888,7 +920,8 @@ public void performOn(ShardRouting replica, ReplicaRequest request, ActionListen
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
return;
}
transportService.sendRequest(node, transportReplicaAction, request, transportOptions,
transportService.sendRequest(node, transportReplicaAction,
new ConcreteShardRequest<>(request, replica.allocationId().getId()), transportOptions,
new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
}

Expand Down Expand Up @@ -930,6 +963,72 @@ public void onFailure(Exception shardFailedError) {
}
}

/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
final class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest {

/** {@link AllocationId#getId()} of the shard this request is sent to **/
private String targetAllocationID;

private R request;

ConcreteShardRequest(Supplier<R> requestSupplier) {
request = requestSupplier.get();
// null now, but will be populated by reading from the streams
targetAllocationID = null;
}

ConcreteShardRequest(R request, String targetAllocationID) {
Objects.requireNonNull(request);
Objects.requireNonNull(targetAllocationID);
this.request = request;
this.targetAllocationID = targetAllocationID;
}

@Override
public void setParentTask(String parentTaskNode, long parentTaskId) {
request.setParentTask(parentTaskNode, parentTaskId);
}

@Override
public void setParentTask(TaskId taskId) {
request.setParentTask(taskId);
}

@Override
public TaskId getParentTask() {
return request.getParentTask();
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return request.createTask(id, type, action, parentTaskId);
}

@Override
public String getDescription() {
return "[" + request.getDescription() + "] for aID [" + targetAllocationID + "]";
}

@Override
public void readFrom(StreamInput in) throws IOException {
targetAllocationID = in.readString();
request.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(targetAllocationID);
request.writeTo(out);
}

public R getRequest() {
return request;
}

public String getTargetAllocationID() {
return targetAllocationID;
}
}

/**
* Sets the current phase on the task if it isn't null. Pulled into its own
* method because its more convenient that way.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -88,6 +88,11 @@ public boolean hasIndex(String index) {
return indicesRouting.containsKey(index);
}

public boolean hasIndex(Index index) {
IndexRoutingTable indexRouting = index(index.getName());
return indexRouting != null && indexRouting.getIndex().equals(index);
}

public IndexRoutingTable index(String index) {
return indicesRouting.get(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@ public ShardNotFoundException(ShardId shardId) {
}

public ShardNotFoundException(ShardId shardId, Throwable ex) {
super("no such shard", ex);
setShard(shardId);
this(shardId, "no such shard", ex);
}

public ShardNotFoundException(ShardId shardId, String msg, Object... args) {
this(shardId, msg, null, args);
}

public ShardNotFoundException(ShardId shardId, String msg, Throwable ex, Object... args) {
super(msg, ex, args);
setShard(shardId);
}

public ShardNotFoundException(StreamInput in) throws IOException{
super(in);
}
Expand Down
Loading

0 comments on commit c56cd46

Please sign in to comment.