Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify AllocationIDs in replication actions #20320

Merged
merged 18 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -53,14 +55,17 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportChannelResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
Expand Down Expand Up @@ -115,9 +120,12 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, request, executor, new PrimaryOperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, () -> new RequestWithAllocationID<>(request), executor,
new PrimaryOperationTransportHandler());
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true, true,
transportService.registerRequestHandler(transportReplicaAction,
() -> new RequestWithAllocationID<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());

this.transportOptions = transportOptions();
Expand Down Expand Up @@ -163,7 +171,7 @@ protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, Re

/**
* Synchronous replica operation on nodes with replica copies. This is done under the lock form
* {@link #acquireReplicaOperationLock(ShardId, long, ActionListener)}.
* {@link #acquireReplicaOperationLock(ShardId, long, String, ActionListener)}.
*/
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest);

Expand Down Expand Up @@ -230,33 +238,36 @@ public void messageReceived(Request request, TransportChannel channel) throws Ex
}
}

class PrimaryOperationTransportHandler implements TransportRequestHandler<Request> {
class PrimaryOperationTransportHandler implements TransportRequestHandler<RequestWithAllocationID<Request>> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
public void messageReceived(final RequestWithAllocationID<Request> request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}

@Override
public void messageReceived(Request request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request, channel, (ReplicationTask) task).run();
public void messageReceived(RequestWithAllocationID<Request> request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request.request, request.allocationId, channel, (ReplicationTask) task).run();
}
}

class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<PrimaryShardReference> {

private final Request request;
/** allocationId of the shard this request is meant for */
private final String allocationId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this targetAllocationID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

private final TransportChannel channel;
private final ReplicationTask replicationTask;

AsyncPrimaryAction(Request request, TransportChannel channel, ReplicationTask replicationTask) {
AsyncPrimaryAction(Request request, String allocationId, TransportChannel channel, ReplicationTask replicationTask) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this constructor parameter targetAllocationID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed this one, and the corresponding field (it looks like you edit the comment on the field though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed. Fixed.

On 05 Sep 2016, at 20:55, Jason Tedor notifications@github.com wrote:

In core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java:

     private final TransportChannel channel;
     private final ReplicationTask replicationTask;
  •    AsyncPrimaryAction(Request request, TransportChannel channel, ReplicationTask replicationTask) {
    
  •    AsyncPrimaryAction(Request request, String allocationId, TransportChannel channel, ReplicationTask replicationTask) {
    

I think you missed this one, and the corresponding field (it looks like you edit the comment on the field though).


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub, or mute the thread.

this.request = request;
this.allocationId = allocationId;
this.channel = channel;
this.replicationTask = replicationTask;
}

@Override
protected void doRun() throws Exception {
acquirePrimaryShardReference(request.shardId(), this);
acquirePrimaryShardReference(request.shardId(), allocationId, this);
}

@Override
Expand All @@ -271,7 +282,9 @@ public void onResponse(PrimaryShardReference primaryShardReference) {
final ShardRouting primary = primaryShardReference.routingEntry();
assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction, request, transportOptions,
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new RequestWithAllocationID<>(request, primary.allocationId().getRelocationId()),
transportOptions,
new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
TransportReplicationAction.this::newResponseInstance) {

Expand Down Expand Up @@ -391,15 +404,17 @@ public void respond(ActionListener<TransportResponse.Empty> listener) {
}
}

class ReplicaOperationTransportHandler implements TransportRequestHandler<ReplicaRequest> {
class ReplicaOperationTransportHandler implements TransportRequestHandler<RequestWithAllocationID<ReplicaRequest>> {
@Override
public void messageReceived(final ReplicaRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final RequestWithAllocationID<ReplicaRequest> request, final TransportChannel channel)
throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}

@Override
public void messageReceived(ReplicaRequest request, TransportChannel channel, Task task) throws Exception {
new AsyncReplicaAction(request, channel, (ReplicationTask) task).run();
public void messageReceived(RequestWithAllocationID<ReplicaRequest> request, TransportChannel channel, Task task)
throws Exception {
new AsyncReplicaAction(request.request, request.allocationId, channel, (ReplicationTask) task).run();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the parameter request needs a better name or the field request on the RequestWithAllocationID needs a better name? It's request.request that is triggering this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the request to requestWithAID

}
}

Expand All @@ -417,6 +432,8 @@ public RetryOnReplicaException(StreamInput in) throws IOException {

private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener<Releasable> {
private final ReplicaRequest request;
// allocation id of the replica this request is meant for
private final String allocationId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this targetAllocationID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

private final TransportChannel channel;
/**
* The task on the node with the replica shard.
Expand All @@ -426,10 +443,11 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
// something we want to avoid at all costs
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());

AsyncReplicaAction(ReplicaRequest request, TransportChannel channel, ReplicationTask task) {
AsyncReplicaAction(ReplicaRequest request, String allocationId, TransportChannel channel, ReplicationTask task) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this constructor parameter to targetAllocationID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

this.request = request;
this.channel = channel;
this.task = task;
this.allocationId = allocationId;
}

@Override
Expand Down Expand Up @@ -464,7 +482,9 @@ public void onNewClusterState(ClusterState state) {
String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
TransportChannelResponseHandler<TransportResponse.Empty> handler =
new TransportChannelResponseHandler<>(logger, channel, extraMessage, () -> TransportResponse.Empty.INSTANCE);
transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler);
transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
new RequestWithAllocationID<>(request, allocationId),
handler);
}

@Override
Expand Down Expand Up @@ -501,7 +521,7 @@ protected void responseWithFailure(Exception e) {
protected void doRun() throws Exception {
setPhase(task, "replica");
assert request.shardId() != null : "request shardId must be set";
acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), this);
acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), allocationId, this);
}

/**
Expand Down Expand Up @@ -598,7 +618,7 @@ private void performLocalAction(ClusterState state, ShardRouting primary, Discov
logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ",
transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
}
performAction(node, transportPrimaryAction, true);
performAction(node, transportPrimaryAction, true, new RequestWithAllocationID<>(request, primary.allocationId().getId()));
}

private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
Expand All @@ -620,7 +640,7 @@ private void performRemoteAction(ClusterState state, ShardRouting primary, Disco
request.shardId(), request, state.version(), primary.currentNodeId());
}
setPhase(task, "rerouted");
performAction(node, actionName, false);
performAction(node, actionName, false, request);
}

private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
Expand Down Expand Up @@ -671,8 +691,9 @@ private void handleBlockException(ClusterBlockException blockException) {
}
}

private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction) {
transportService.sendRequest(node, action, request, transportOptions, new TransportResponseHandler<Response>() {
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
final TransportRequest requestToPerform) {
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {

@Override
public Response newInstance() {
Expand Down Expand Up @@ -700,7 +721,7 @@ public void handleException(TransportException exp) {
(org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
"received an error from node [{}] for request [{}], scheduling a retry",
node.getId(),
request),
requestToPerform),
exp);
retry(exp);
} else {
Expand Down Expand Up @@ -794,7 +815,8 @@ void retryBecauseUnavailable(ShardId shardId, String message) {
* tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
*/
protected void acquirePrimaryShardReference(ShardId shardId, ActionListener<PrimaryShardReference> onReferenceAcquired) {
protected void acquirePrimaryShardReference(ShardId shardId, String allocationId,
ActionListener<PrimaryShardReference> onReferenceAcquired) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
// we may end up here if the cluster state used to route the primary is so stale that the underlying
Expand All @@ -804,6 +826,10 @@ protected void acquirePrimaryShardReference(ShardId shardId, ActionListener<Prim
throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(),
"actual shard is not a primary " + indexShard.routingEntry());
}
final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
if (actualAllocationId.equals(allocationId) == false) {
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
}

ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
@Override
Expand All @@ -823,9 +849,14 @@ public void onFailure(Exception e) {
/**
* tries to acquire an operation on replicas. The lock is closed as soon as replication is completed on the node.
*/
protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, ActionListener<Releasable> onLockAcquired) {
protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, final String allocationId,
ActionListener<Releasable> onLockAcquired) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
if (actualAllocationId.equals(allocationId) == false) {
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
}
indexShard.acquireReplicaOperationLock(primaryTerm, onLockAcquired, executor);
}

Expand Down Expand Up @@ -888,7 +919,8 @@ public void performOn(ShardRouting replica, ReplicaRequest request, ActionListen
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
return;
}
transportService.sendRequest(node, transportReplicaAction, request, transportOptions,
transportService.sendRequest(node, transportReplicaAction,
new RequestWithAllocationID<>(request, replica.allocationId().getId()), transportOptions,
new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
}

Expand Down Expand Up @@ -930,6 +962,69 @@ public void onFailure(Exception shardFailedError) {
}
}

/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
final class RequestWithAllocationID<R extends TransportRequest> extends TransportRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call this maybe ConcreteShardRequest or something like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah - I couldn't find a good name so I prefer a really concrete name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcreteShardRequest is much better than RequestWithAllocationID ? lets name classes about what they are not what properties they hold

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to me ConcreteShardRequest is meaningless unless you know about this PR and its context, which all of us will soon forget. If you feel ConcreteShardRequest is a much better name, I will happily make you happy :)


/** {@link AllocationId#getId()} of the shard this request is sent to **/
private String allocationId;
Copy link
Member

@jasontedor jasontedor Sep 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this targetAllocationID?


private R request;

RequestWithAllocationID(Supplier<R> requestSupplier) {
request = requestSupplier.get();
allocationId = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the invariant that this can be null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh that is for deserialization :( bummer can you document it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah :(. maybe we should add a variant of registerRequestHandler which takes a function of a stream and returns a request.

}

RequestWithAllocationID(R request, String allocationId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this constructor parameter should be targetAllocationID too to be consistent with my other suggestions.

this.request = request;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use Objects.requireNonNull() here for both request and allocation ID

this.allocationId = allocationId;
}

@Override
public void setParentTask(String parentTaskNode, long parentTaskId) {
request.setParentTask(parentTaskNode, parentTaskId);
}

@Override
public void setParentTask(TaskId taskId) {
request.setParentTask(taskId);
}

@Override
public TaskId getParentTask() {
return request.getParentTask();
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return request.createTask(id, type, action, parentTaskId);
}

@Override
public String getDescription() {
return "[" + request.getDescription() + "] for aID [" + allocationId + "]";
}

@Override
public void readFrom(StreamInput in) throws IOException {
allocationId = in.readString();
request.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(allocationId);
request.writeTo(out);
}

public R getRequest() {
return request;
}

public String getAllocationId() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this getTargetAllocationID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

return allocationId;
}
}

/**
* Sets the current phase on the task if it isn't null. Pulled into its own
* method because its more convenient that way.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -88,6 +88,11 @@ public boolean hasIndex(String index) {
return indicesRouting.containsKey(index);
}

public boolean hasIndex(Index index) {
IndexRoutingTable indexRouting = index(index.getName());
return indexRouting != null && indexRouting.getIndex().equals(index);
}

public IndexRoutingTable index(String index) {
return indicesRouting.get(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@ public ShardNotFoundException(ShardId shardId) {
}

public ShardNotFoundException(ShardId shardId, Throwable ex) {
super("no such shard", ex);
setShard(shardId);
this(shardId, "no such shard", ex);
}

public ShardNotFoundException(ShardId shardId, String msg, Object... args) {
this(shardId, msg, null, args);
}

public ShardNotFoundException(ShardId shardId, String msg, Throwable ex, Object... args) {
super(msg, ex, args);
setShard(shardId);
}

public ShardNotFoundException(StreamInput in) throws IOException{
super(in);
}
Expand Down
Loading