From 132745ddacaeb9888709df437427f69e1a55976c Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 2 Sep 2016 15:29:52 +0200 Subject: [PATCH 01/14] add aid to replication requests --- .../TransportReplicationAction.java | 133 ++++++++++++++---- .../index/shard/ShardNotFoundException.java | 12 +- .../TransportReplicationActionTests.java | 32 +++-- 3 files changed, 139 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 883d21154bde5..0098493a425c7 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -38,10 +38,12 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; @@ -53,14 +55,17 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannelResponseHandler; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -115,9 +120,12 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); - transportService.registerRequestHandler(transportPrimaryAction, request, executor, new PrimaryOperationTransportHandler()); + transportService.registerRequestHandler(transportPrimaryAction, () -> new RequestWithAllocationID<>(request), executor, + new PrimaryOperationTransportHandler()); // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true, true, + transportService.registerRequestHandler(transportReplicaAction, + () -> new RequestWithAllocationID<>(replicaRequest), + executor, true, true, new ReplicaOperationTransportHandler()); this.transportOptions = transportOptions(); @@ -163,7 +171,7 @@ protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, Re /** * Synchronous replica operation on nodes with replica copies. This is done under the lock form - * {@link #acquireReplicaOperationLock(ShardId, long, ActionListener)}. + * {@link #acquireReplicaOperationLock(ShardId, long, String, ActionListener)}. */ protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest); @@ -230,33 +238,36 @@ public void messageReceived(Request request, TransportChannel channel) throws Ex } } - class PrimaryOperationTransportHandler implements TransportRequestHandler { + class PrimaryOperationTransportHandler implements TransportRequestHandler> { @Override - public void messageReceived(final Request request, final TransportChannel channel) throws Exception { + public void messageReceived(final RequestWithAllocationID request, final TransportChannel channel) throws Exception { throw new UnsupportedOperationException("the task parameter is required for this operation"); } @Override - public void messageReceived(Request request, TransportChannel channel, Task task) { - new AsyncPrimaryAction(request, channel, (ReplicationTask) task).run(); + public void messageReceived(RequestWithAllocationID request, TransportChannel channel, Task task) { + new AsyncPrimaryAction(request.request, request.allocationId, channel, (ReplicationTask) task).run(); } } class AsyncPrimaryAction extends AbstractRunnable implements ActionListener { private final Request request; + /** allocationId of the shard this request is meant for */ + private final String allocationId; private final TransportChannel channel; private final ReplicationTask replicationTask; - AsyncPrimaryAction(Request request, TransportChannel channel, ReplicationTask replicationTask) { + AsyncPrimaryAction(Request request, String allocationId, TransportChannel channel, ReplicationTask replicationTask) { this.request = request; + this.allocationId = allocationId; this.channel = channel; this.replicationTask = replicationTask; } @Override protected void doRun() throws Exception { - acquirePrimaryShardReference(request.shardId(), this); + acquirePrimaryShardReference(request.shardId(), allocationId, this); } @Override @@ -271,7 +282,9 @@ public void onResponse(PrimaryShardReference primaryShardReference) { final ShardRouting primary = primaryShardReference.routingEntry(); assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary; DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId()); - transportService.sendRequest(relocatingNode, transportPrimaryAction, request, transportOptions, + transportService.sendRequest(relocatingNode, transportPrimaryAction, + new RequestWithAllocationID<>(request, primary.allocationId().getRelocationId()), + transportOptions, new TransportChannelResponseHandler(logger, channel, "rerouting indexing to target primary " + primary, TransportReplicationAction.this::newResponseInstance) { @@ -391,15 +404,17 @@ public void respond(ActionListener listener) { } } - class ReplicaOperationTransportHandler implements TransportRequestHandler { + class ReplicaOperationTransportHandler implements TransportRequestHandler> { @Override - public void messageReceived(final ReplicaRequest request, final TransportChannel channel) throws Exception { + public void messageReceived(final RequestWithAllocationID request, final TransportChannel channel) + throws Exception { throw new UnsupportedOperationException("the task parameter is required for this operation"); } @Override - public void messageReceived(ReplicaRequest request, TransportChannel channel, Task task) throws Exception { - new AsyncReplicaAction(request, channel, (ReplicationTask) task).run(); + public void messageReceived(RequestWithAllocationID request, TransportChannel channel, Task task) + throws Exception { + new AsyncReplicaAction(request.request, request.allocationId, channel, (ReplicationTask) task).run(); } } @@ -417,6 +432,8 @@ public RetryOnReplicaException(StreamInput in) throws IOException { private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener { private final ReplicaRequest request; + // allocation id of the replica this request is meant for + private final String allocationId; private final TransportChannel channel; /** * The task on the node with the replica shard. @@ -426,10 +443,11 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio // something we want to avoid at all costs private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); - AsyncReplicaAction(ReplicaRequest request, TransportChannel channel, ReplicationTask task) { + AsyncReplicaAction(ReplicaRequest request, String allocationId, TransportChannel channel, ReplicationTask task) { this.request = request; this.channel = channel; this.task = task; + this.allocationId = allocationId; } @Override @@ -501,7 +519,7 @@ protected void responseWithFailure(Exception e) { protected void doRun() throws Exception { setPhase(task, "replica"); assert request.shardId() != null : "request shardId must be set"; - acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), this); + acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), allocationId, this); } /** @@ -598,7 +616,7 @@ private void performLocalAction(ClusterState state, ShardRouting primary, Discov logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ", transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId()); } - performAction(node, transportPrimaryAction, true); + performAction(node, transportPrimaryAction, true, new RequestWithAllocationID<>(request, primary.allocationId().getId())); } private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) { @@ -620,7 +638,7 @@ private void performRemoteAction(ClusterState state, ShardRouting primary, Disco request.shardId(), request, state.version(), primary.currentNodeId()); } setPhase(task, "rerouted"); - performAction(node, actionName, false); + performAction(node, actionName, false, request); } private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) { @@ -671,8 +689,9 @@ private void handleBlockException(ClusterBlockException blockException) { } } - private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction) { - transportService.sendRequest(node, action, request, transportOptions, new TransportResponseHandler() { + private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction, + final TransportRequest requestToPerform) { + transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler() { @Override public Response newInstance() { @@ -700,7 +719,7 @@ public void handleException(TransportException exp) { (org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage( "received an error from node [{}] for request [{}], scheduling a retry", node.getId(), - request), + requestToPerform), exp); retry(exp); } else { @@ -794,7 +813,8 @@ void retryBecauseUnavailable(ShardId shardId, String message) { * tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally * and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}). */ - protected void acquirePrimaryShardReference(ShardId shardId, ActionListener onReferenceAcquired) { + protected void acquirePrimaryShardReference(ShardId shardId, String allocationId, + ActionListener onReferenceAcquired) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); // we may end up here if the cluster state used to route the primary is so stale that the underlying @@ -804,6 +824,10 @@ protected void acquirePrimaryShardReference(ShardId shardId, ActionListener onAcquired = new ActionListener() { @Override @@ -823,9 +847,14 @@ public void onFailure(Exception e) { /** * tries to acquire an operation on replicas. The lock is closed as soon as replication is completed on the node. */ - protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, ActionListener onLockAcquired) { + protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, final String allocationId, + ActionListener onLockAcquired) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); + final String actualAllocationId = indexShard.routingEntry().allocationId().getId(); + if (actualAllocationId.equals(allocationId) == false) { + throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId); + } indexShard.acquireReplicaOperationLock(primaryTerm, onLockAcquired, executor); } @@ -888,7 +917,8 @@ public void performOn(ShardRouting replica, ReplicaRequest request, ActionListen listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]")); return; } - transportService.sendRequest(node, transportReplicaAction, request, transportOptions, + transportService.sendRequest(node, transportReplicaAction, + new RequestWithAllocationID<>(request, replica.allocationId().getId()), transportOptions, new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); } @@ -930,6 +960,61 @@ public void onFailure(Exception shardFailedError) { } } + /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/ + final class RequestWithAllocationID extends TransportRequest { + + /** {@link AllocationId#getId()} of the shard this request is sent to **/ + String allocationId; + + R request; + + RequestWithAllocationID(Supplier requestSupplier) { + request = requestSupplier.get(); + allocationId = null; + } + + RequestWithAllocationID(R request, String allocationId) { + this.request = request; + this.allocationId = allocationId; + } + + @Override + public void setParentTask(String parentTaskNode, long parentTaskId) { + request.setParentTask(parentTaskNode, parentTaskId); + } + + @Override + public void setParentTask(TaskId taskId) { + request.setParentTask(taskId); + } + + @Override + public TaskId getParentTask() { + return request.getParentTask(); + } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return request.createTask(id, type, action, parentTaskId); + } + + @Override + public String getDescription() { + return "[" + request.getDescription() + "] for aID [" + allocationId + "]"; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + allocationId = in.readString(); + request.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(allocationId); + request.writeTo(out); + } + } + /** * Sets the current phase on the task if it isn't null. Pulled into its own * method because its more convenient that way. diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardNotFoundException.java b/core/src/main/java/org/elasticsearch/index/shard/ShardNotFoundException.java index fa2c8ce710337..aa46240fd490f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShardNotFoundException.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShardNotFoundException.java @@ -33,10 +33,18 @@ public ShardNotFoundException(ShardId shardId) { } public ShardNotFoundException(ShardId shardId, Throwable ex) { - super("no such shard", ex); - setShard(shardId); + this(shardId, "no such shard", ex); + } + + public ShardNotFoundException(ShardId shardId, String msg, Object... args) { + this(shardId, msg, null, args); + } + public ShardNotFoundException(ShardId shardId, String msg, Throwable ex, Object... args) { + super(msg, ex, args); + setShard(shardId); } + public ShardNotFoundException(StreamInput in) throws IOException{ super(in); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 6c30f015124ad..35bad1a051f2d 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -55,7 +56,6 @@ import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; @@ -411,7 +411,7 @@ public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throw isRelocated.set(true); executeOnPrimary = false; } - action.new AsyncPrimaryAction(request, createTransportChannel(listener), task) { + action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override protected ReplicationOperation createReplicatedOperation(Request request, ActionListener actionListener, Action.PrimaryShardReference primaryShardReference, @@ -452,7 +452,8 @@ public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); ClusterState state = state(index, true, ShardRoutingState.RELOCATING); - String primaryTargetNodeId = state.getRoutingTable().shardRoutingTable(shardId).primaryShard().relocatingNodeId(); + final ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + String primaryTargetNodeId = primaryShard.relocatingNodeId(); // simulate execution of the primary phase on the relocation target node state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryTargetNodeId)).build(); setState(clusterService, state); @@ -460,7 +461,7 @@ public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); - action.new AsyncPrimaryAction(request, createTransportChannel(listener), task) { + action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override protected ReplicationOperation createReplicatedOperation(Request request, ActionListener actionListener, Action.PrimaryShardReference primaryShardReference, @@ -596,7 +597,9 @@ public void testShadowIndexDisablesReplication() throws Exception { state = ClusterState.builder(state).metaData(metaData).build(); setState(clusterService, state); AtomicBoolean executed = new AtomicBoolean(); - action.new AsyncPrimaryAction(new Request(shardId), createTransportChannel(new PlainActionFuture<>()), null) { + ShardRouting primaryShard = state.routingTable().shardRoutingTable(shardId).primaryShard(); + action.new AsyncPrimaryAction(new Request(shardId), primaryShard.allocationId().getId(), + createTransportChannel(new PlainActionFuture<>()), null) { @Override protected ReplicationOperation createReplicatedOperation(Request request, ActionListener actionListener, Action.PrimaryShardReference primaryShardReference, @@ -613,8 +616,10 @@ public void testCounterOnPrimary() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); // no replica, we only want to test on primary - setState(clusterService, state(index, true, ShardRoutingState.STARTED)); + final ClusterState state = state(index, true, ShardRoutingState.STARTED); + setState(clusterService, state); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + final ShardRouting primaryShard = state.routingTable().shardRoutingTable(shardId).primaryShard(); Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); @@ -622,7 +627,7 @@ public void testCounterOnPrimary() throws Exception { final boolean throwExceptionOnCreation = i == 1; final boolean throwExceptionOnRun = i == 2; final boolean respondWithError = i == 3; - action.new AsyncPrimaryAction(request, createTransportChannel(listener), task) { + action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { @Override protected ReplicationOperation createReplicatedOperation(Request request, ActionListener actionListener, Action.PrimaryShardReference primaryShardReference, @@ -666,8 +671,9 @@ public void execute() throws Exception { public void testReplicasCounter() throws Exception { final ShardId shardId = new ShardId("test", "_na_", 0); - setState(clusterService, state(shardId.getIndexName(), true, - ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + final ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + setState(clusterService, state); + final ShardRouting replicaRouting = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); boolean throwException = randomBoolean(); final ReplicationTask task = maybeTask(); Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) { @@ -683,7 +689,8 @@ protected ReplicaResult shardOperationOnReplica(Request request) { }; final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); try { - replicaOperationTransportHandler.messageReceived(new Request().setShardId(shardId), + replicaOperationTransportHandler.messageReceived( + action.new RequestWithAllocationID(new Request().setShardId(shardId), replicaRouting.allocationId().getId()), createTransportChannel(new PlainActionFuture<>()), task); } catch (ElasticsearchException e) { assertThat(e.getMessage(), containsString("simulated")); @@ -827,7 +834,7 @@ protected boolean resolveIndex() { } @Override - protected void acquirePrimaryShardReference(ShardId shardId, ActionListener onReferenceAcquired) { + protected void acquirePrimaryShardReference(ShardId shardId, String allocationId, ActionListener onReferenceAcquired) { count.incrementAndGet(); PrimaryShardReference primaryShardReference = new PrimaryShardReference(null, null) { @Override @@ -858,7 +865,8 @@ public void close() { } @Override - protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, ActionListener onLockAcquired) { + protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, String allocationId, + ActionListener onLockAcquired) { count.incrementAndGet(); onLockAcquired.onResponse(count::decrementAndGet); } From ca272904143b6875bff1221ec6df7f07587b7fb0 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 2 Sep 2016 22:26:57 +0200 Subject: [PATCH 02/14] finx IndicesRequestIT --- .../TransportReplicationAction.java | 8 ++++++-- .../action/IndicesRequestIT.java | 19 ++++++++++++++----- .../TransportReplicationActionTests.java | 14 ++++++++++++++ 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 0098493a425c7..5151aa20d219d 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -964,9 +964,9 @@ public void onFailure(Exception shardFailedError) { final class RequestWithAllocationID extends TransportRequest { /** {@link AllocationId#getId()} of the shard this request is sent to **/ - String allocationId; + private String allocationId; - R request; + private R request; RequestWithAllocationID(Supplier requestSupplier) { request = requestSupplier.get(); @@ -1013,6 +1013,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(allocationId); request.writeTo(out); } + + public R getRequest() { + return request; + } } /** diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 934fdae254bcd..5034943fa9b41 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -69,6 +69,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.support.replication.TransportReplicationActionTests; import org.elasticsearch.action.termvectors.MultiTermVectorsAction; import org.elasticsearch.action.termvectors.MultiTermVectorsRequest; import org.elasticsearch.action.termvectors.TermVectorsAction; @@ -117,7 +118,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.instanceOf; @ClusterScope(scope = Scope.SUITE, numClientNodes = 1, minNumDataNodes = 2) public class IndicesRequestIT extends ESIntegTestCase { @@ -638,8 +638,7 @@ private static void assertSameIndices(IndicesRequest originalRequest, boolean op assertThat("no internal requests intercepted for action [" + action + "]", requests.size(), greaterThan(0)); } for (TransportRequest internalRequest : requests) { - assertThat(internalRequest, instanceOf(IndicesRequest.class)); - IndicesRequest indicesRequest = (IndicesRequest) internalRequest; + IndicesRequest indicesRequest = convertRequest(internalRequest); assertThat(internalRequest.getClass().getName(), indicesRequest.indices(), equalTo(originalRequest.indices())); assertThat(indicesRequest.indicesOptions(), equalTo(originalRequest.indicesOptions())); } @@ -651,14 +650,24 @@ private static void assertIndicesSubset(List indices, String... actions) List requests = consumeTransportRequests(action); assertThat("no internal requests intercepted for action [" + action + "]", requests.size(), greaterThan(0)); for (TransportRequest internalRequest : requests) { - assertThat(internalRequest, instanceOf(IndicesRequest.class)); - for (String index : ((IndicesRequest) internalRequest).indices()) { + IndicesRequest indicesRequest = convertRequest(internalRequest); + for (String index : indicesRequest.indices()) { assertThat(indices, hasItem(index)); } } } } + final static IndicesRequest convertRequest(TransportRequest request) { + final IndicesRequest indicesRequest; + if (request instanceof IndicesRequest) { + indicesRequest = (IndicesRequest) request; + } else { + indicesRequest = TransportReplicationActionTests.resolveRequest(request); + } + return indicesRequest; + } + private String randomIndexOrAlias() { String index = randomFrom(indices); if (randomBoolean()) { diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 35bad1a051f2d..e69dd32b86223 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -62,6 +62,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; @@ -99,6 +100,19 @@ public class TransportReplicationActionTests extends ESTestCase { + /** + * takes a request that was sent by a {@link TransportReplicationAction} and captured + * and returns the underlying request if it's wrapped or the original (cast to the expected type). + * + * This will throw a {@link ClassCastException} if the request is of the wrong type. + */ + public static R resolveRequest(TransportRequest requestOrWrappedRequest) { + if (requestOrWrappedRequest instanceof TransportReplicationAction.RequestWithAllocationID) { + requestOrWrappedRequest = ((TransportReplicationAction.RequestWithAllocationID)requestOrWrappedRequest).getRequest(); + } + return (R) requestOrWrappedRequest; + } + private static ThreadPool threadPool; private ClusterService clusterService; From ebfebab8f9447f2230e7cf35bf08d8b00f79384c Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 3 Sep 2016 23:06:18 +0200 Subject: [PATCH 03/14] TransportReplicationActionTests with mocks --- .../cluster/routing/RoutingTable.java | 7 +- .../TransportReplicationActionTests.java | 101 +++++++++++------- 2 files changed, 71 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 2ca165308b194..2d960ce0450bb 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -27,8 +27,8 @@ import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.common.Nullable; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -88,6 +88,11 @@ public boolean hasIndex(String index) { return indicesRouting.containsKey(index); } + public boolean hasIndex(Index index) { + IndexRoutingTable indexRouting = index(index.getName()); + return indexRouting != null && indexRouting.getIndex().equals(index); + } + public IndexRoutingTable index(String index) { return indicesRouting.get(index); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index e69dd32b86223..5da2979099e4e 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -48,12 +48,16 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -94,6 +98,11 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -475,7 +484,7 @@ public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); AtomicBoolean executed = new AtomicBoolean(); - action.new AsyncPrimaryAction(request, primaryShard.allocationId().getId(), createTransportChannel(listener), task) { + action.new AsyncPrimaryAction(request, primaryShard.allocationId().getRelocationId(), createTransportChannel(listener), task) { @Override protected ReplicationOperation createReplicatedOperation(Request request, ActionListener actionListener, Action.PrimaryShardReference primaryShardReference, @@ -488,6 +497,11 @@ public void execute() throws Exception { } }; } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } }.run(); assertThat(executed.get(), equalTo(true)); assertPhase(task, "finished"); @@ -818,7 +832,7 @@ class Action extends TransportReplicationAction { Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { - super(settings, actionName, transportService, clusterService, null, threadPool, + super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, new ShardStateAction(settings, clusterService, transportService, null, null, threadPool), new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME); @@ -846,44 +860,59 @@ protected ReplicaResult shardOperationOnReplica(Request request) { protected boolean resolveIndex() { return false; } + } - @Override - protected void acquirePrimaryShardReference(ShardId shardId, String allocationId, ActionListener onReferenceAcquired) { - count.incrementAndGet(); - PrimaryShardReference primaryShardReference = new PrimaryShardReference(null, null) { - @Override - public boolean isRelocated() { - return isRelocated.get(); - } - - @Override - public void failShard(String reason, @Nullable Exception e) { - throw new UnsupportedOperationException(); - } - - @Override - public ShardRouting routingEntry() { - ShardRouting shardRouting = clusterService.state().getRoutingTable() - .shardRoutingTable(shardId).primaryShard(); - assert shardRouting != null; - return shardRouting; - } - - @Override - public void close() { - count.decrementAndGet(); - } - }; + final IndicesService mockIndicesService(ClusterService clusterService) { + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.indexServiceSafe(any(Index.class))).then(invocation -> { + Index index = (Index)invocation.getArguments()[0]; + final ClusterState state = clusterService.state(); + final IndexMetaData indexSafe = state.metaData().getIndexSafe(index); + return mockIndexService(indexSafe, clusterService); + }); + when(indicesService.indexService(any(Index.class))).then(invocation -> { + Index index = (Index) invocation.getArguments()[0]; + final ClusterState state = clusterService.state(); + if (state.metaData().hasIndex(index.getName())) { + final IndexMetaData indexSafe = state.metaData().getIndexSafe(index); + return mockIndexService(clusterService.state().metaData().getIndexSafe(index), clusterService); + } else { + return null; + } + }); + return indicesService; + } - onReferenceAcquired.onResponse(primaryShardReference); - } + final IndexService mockIndexService(final IndexMetaData indexMetaData, ClusterService clusterService) { + final IndexService indexService = mock(IndexService.class); + when(indexService.getShard(anyInt())).then(invocation -> { + int shard = (Integer) invocation.getArguments()[0]; + final ShardId shardId = new ShardId(indexMetaData.getIndex(), shard); + if (shard > indexMetaData.getNumberOfShards()) { + throw new ShardNotFoundException(shardId); + } + return mockIndexShard(shardId, clusterService); + }); + return indexService; + } - @Override - protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, String allocationId, - ActionListener onLockAcquired) { + private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { + final IndexShard indexShard = mock(IndexShard.class); + doAnswer(invocation -> { + ActionListener callback = (ActionListener) invocation.getArguments()[0]; count.incrementAndGet(); - onLockAcquired.onResponse(count::decrementAndGet); - } + callback.onResponse(count::decrementAndGet); + return null; + }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); + when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { + final ClusterState state = clusterService.state(); + return state.getRoutingNodes().node(state.nodes().getLocalNodeId()).getByShardId(shardId); + }); + when(indexShard.state()).thenAnswer(invocationOnMock -> isRelocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED); + doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class)); + when(indexShard.getPrimaryTerm()).thenAnswer(i -> + clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); + return indexShard; } class NoopReplicationOperation extends ReplicationOperation { From f66da90b7200065dea7c27d94c9978914e77aa74 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 4 Sep 2016 12:58:50 +0200 Subject: [PATCH 04/14] add unit tests --- .../TransportReplicationActionTests.java | 65 +++++++++++++++++-- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 5da2979099e4e..2b2733bdbe7f9 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.action.support.replication; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; @@ -47,6 +49,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; @@ -85,7 +88,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; @@ -100,6 +102,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -760,6 +763,48 @@ public void testDefaultWaitForActiveShardsUsesIndexSetting() throws Exception { assertEquals(ActiveShardCount.from(requestWaitForActiveShards), request.waitForActiveShards()); } + /** test that a primary request is reject if it arrives at a shard with a wrong allocation id */ + public void testPrimaryActionRejectsWrongAid() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + setState(clusterService, state(index, true, ShardRoutingState.STARTED)); + PlainActionFuture listener = new PlainActionFuture<>(); + Request request = new Request(shardId).timeout("1ms"); + action.new PrimaryOperationTransportHandler().messageReceived( + action.new RequestWithAllocationID(request, "_not_a_valid_aid_"), + createTransportChannel(listener), maybeTask() + ); + try { + listener.get(); + fail("using a wrong aid didn't fail the operation"); + } catch (ExecutionException execException) { + Throwable throwable = execException.getCause(); + logger.debug((Supplier) () -> new ParameterizedMessage("got exception e"), throwable); + assertTrue(throwable.getClass() + " is not a retry exception", action.retryPrimaryException(throwable)); + } + } + + /** test that a replica request is reject if it arrives at a shard with a wrong allocation id */ + public void testReplicaActionRejectsWrongAid() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + setState(clusterService, state(index, false, ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + PlainActionFuture listener = new PlainActionFuture<>(); + Request request = new Request(shardId).timeout("1ms"); + action.new ReplicaOperationTransportHandler().messageReceived( + action.new RequestWithAllocationID(request, "_not_a_valid_aid_"), + createTransportChannel(listener), maybeTask() + ); + try { + listener.get(); + fail("using a wrong aid didn't fail the operation"); + } catch (ExecutionException execException) { + Throwable throwable = execException.getCause(); + logger.debug((Supplier) () -> new ParameterizedMessage("got exception e"), throwable); + assertTrue(throwable.getClass() + " is not a retry exception", action.retryPrimaryException(throwable)); + } + } + private void assertIndexShardCounter(int expected) { assertThat(count.get(), equalTo(expected)); } @@ -904,6 +949,18 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService callback.onResponse(count::decrementAndGet); return null; }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); + doAnswer(invocation -> { + long term = (Long)invocation.getArguments()[0]; + ActionListener callback = (ActionListener) invocation.getArguments()[1]; + final long primaryTerm = indexShard.getPrimaryTerm(); + if (term < primaryTerm) { + throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", + shardId, term, primaryTerm)); + } + count.incrementAndGet(); + callback.onResponse(count::decrementAndGet); + return null; + }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); return state.getRoutingNodes().node(state.nodes().getLocalNodeId()).getByShardId(shardId); @@ -930,11 +987,6 @@ public void execute() throws Exception { * Transport channel that is needed for replica operation testing. */ public TransportChannel createTransportChannel(final PlainActionFuture listener) { - return createTransportChannel(listener, error -> { - }); - } - - public TransportChannel createTransportChannel(final PlainActionFuture listener, Consumer consumer) { return new TransportChannel() { @Override @@ -959,7 +1011,6 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op @Override public void sendResponse(Exception exception) throws IOException { - consumer.accept(exception); listener.onFailure(exception); } From 767c01cdf7f434d882a017508f0e4673759cfeb8 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 4 Sep 2016 13:27:09 +0200 Subject: [PATCH 05/14] better error message --- .../support/replication/TransportReplicationActionTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 2b2733bdbe7f9..97cbf17a4d71c 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -800,8 +800,9 @@ action.new ReplicaOperationTransportHandler().messageReceived( fail("using a wrong aid didn't fail the operation"); } catch (ExecutionException execException) { Throwable throwable = execException.getCause(); - logger.debug((Supplier) () -> new ParameterizedMessage("got exception e"), throwable); - assertTrue(throwable.getClass() + " is not a retry exception", action.retryPrimaryException(throwable)); + if (action.retryPrimaryException(throwable) == false) { + throw new AssertionError("thrown exception is not retriable", throwable); + } } } From a8fc4be1d6779411f4fa850912a6aef9f43e1f15 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 4 Sep 2016 17:56:23 +0200 Subject: [PATCH 06/14] better error message when shard is not assigned locally --- .../replication/TransportReplicationActionTests.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 97cbf17a4d71c..31efd5e1f1c24 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; @@ -964,7 +965,11 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); - return state.getRoutingNodes().node(state.nodes().getLocalNodeId()).getByShardId(shardId); + final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + if (node == null) { + throw new ShardNotFoundException(shardId, "shard is no longer assigned to current node"); + } + return node.getByShardId(shardId); }); when(indexShard.state()).thenAnswer(invocationOnMock -> isRelocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED); doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class)); From e722c95dc6842aaffc82f966b75fcbc34368f8b5 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 4 Sep 2016 18:28:22 +0200 Subject: [PATCH 07/14] fix testReplicaActionRejectsWrongAid --- .../TransportReplicationActionTests.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 31efd5e1f1c24..dbb791ea9da04 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -789,7 +789,12 @@ action.new PrimaryOperationTransportHandler().messageReceived( public void testReplicaActionRejectsWrongAid() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - setState(clusterService, state(index, false, ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + ClusterState state = state(index, false, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + final ShardRouting replica = state.routingTable().shardRoutingTable(shardId).replicaShards().get(0); + // simulate execution of the node holding the replica + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(replica.currentNodeId())).build(); + setState(clusterService, state); + PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); action.new ReplicaOperationTransportHandler().messageReceived( @@ -804,6 +809,7 @@ action.new ReplicaOperationTransportHandler().messageReceived( if (action.retryPrimaryException(throwable) == false) { throw new AssertionError("thrown exception is not retriable", throwable); } + assertThat(throwable.getMessage(), containsString("_not_a_valid_aid_")); } } @@ -966,10 +972,11 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); - if (node == null) { + final ShardRouting routing = node.getByShardId(shardId); + if (routing == null) { throw new ShardNotFoundException(shardId, "shard is no longer assigned to current node"); } - return node.getByShardId(shardId); + return routing; }); when(indexShard.state()).thenAnswer(invocationOnMock -> isRelocated.get() ? IndexShardState.RELOCATED : IndexShardState.STARTED); doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class)); From 712a87c9a509871538259bd2bb8e6ce318f93eca Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 4 Sep 2016 19:39:33 +0200 Subject: [PATCH 08/14] meh --- .../test/java/org/elasticsearch/action/IndicesRequestIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 5034943fa9b41..1a11e3f48034c 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -658,7 +658,7 @@ private static void assertIndicesSubset(List indices, String... actions) } } - final static IndicesRequest convertRequest(TransportRequest request) { + static IndicesRequest convertRequest(TransportRequest request) { final IndicesRequest indicesRequest; if (request instanceof IndicesRequest) { indicesRequest = (IndicesRequest) request; From 22e4f844647363d7fe8f7ca6f4a81f66c0eb097a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 5 Sep 2016 09:11:45 +0200 Subject: [PATCH 09/14] fix retry on replica --- .../TransportReplicationAction.java | 8 ++- .../TransportReplicationActionTests.java | 56 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 5151aa20d219d..2e758a98b7601 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -482,7 +482,9 @@ public void onNewClusterState(ClusterState state) { String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]"; TransportChannelResponseHandler handler = new TransportChannelResponseHandler<>(logger, channel, extraMessage, () -> TransportResponse.Empty.INSTANCE); - transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler); + transportService.sendRequest(clusterService.localNode(), transportReplicaAction, + new RequestWithAllocationID<>(request, allocationId), + handler); } @Override @@ -1017,6 +1019,10 @@ public void writeTo(StreamOutput out) throws IOException { public R getRequest() { return request; } + + public String getAllocationId() { + return allocationId; + } } /** diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index dbb791ea9da04..e8ee54c1d8a66 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -101,6 +101,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; @@ -813,6 +814,61 @@ action.new ReplicaOperationTransportHandler().messageReceived( } } + /** + * test throwing a {@link org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnReplicaException} + * causes a retry + */ + public void testRetryOnReplica() throws Exception { + final ShardId shardId = new ShardId("test", "_na_", 0); + ClusterState state = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + final ShardRouting replica = state.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); + // simulate execution of the node holding the replica + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(replica.currentNodeId())).build(); + setState(clusterService, state); + AtomicBoolean throwException = new AtomicBoolean(true); + final ReplicationTask task = maybeTask(); + Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) { + @Override + protected ReplicaResult shardOperationOnReplica(Request request) { + assertPhase(task, "replica"); + if (throwException.get()) { + throw new RetryOnReplicaException(shardId, "simulation"); + } + return new ReplicaResult(); + } + }; + final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); + final PlainActionFuture listener = new PlainActionFuture<>(); + final Request request = new Request().setShardId(shardId); + request.primaryTerm(state.metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); + replicaOperationTransportHandler.messageReceived( + action.new RequestWithAllocationID(request, replica.allocationId().getId()), + createTransportChannel(listener), task); + if (listener.isDone()) { + listener.get(); // fail with the exception if there + fail("listener shouldn't be done"); + } + + // no retry yet + List capturedRequests = + transport.getCapturedRequestsByTargetNodeAndClear().get(replica.currentNodeId()); + assertThat(capturedRequests, nullValue()); + + // release the waiting + throwException.set(false); + setState(clusterService, state); + + capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear().get(replica.currentNodeId()); + assertThat(capturedRequests, notNullValue()); + assertThat(capturedRequests.size(), equalTo(1)); + final CapturingTransport.CapturedRequest capturedRequest = capturedRequests.get(0); + assertThat(capturedRequest.action, equalTo("testActionWithExceptions[r]")); + assertThat(capturedRequest.request, instanceOf(Action.RequestWithAllocationID.class)); + assertThat(((Action.RequestWithAllocationID) capturedRequest.request).getRequest(), equalTo(request)); + assertThat(((Action.RequestWithAllocationID) capturedRequest.request).getAllocationId(), equalTo(replica.allocationId().getId())); + } + + private void assertIndexShardCounter(int expected) { assertThat(count.get(), equalTo(expected)); } From 4a63c09cc5a741fc5ccde30e75fcb4d5d30cd3d2 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 5 Sep 2016 20:51:22 +0200 Subject: [PATCH 10/14] feedback --- .../TransportReplicationAction.java | 40 ++++++++++--------- .../TransportReplicationActionTests.java | 13 +++--- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2e758a98b7601..6bcd5713fba9e 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -74,6 +74,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Supplier; @@ -246,14 +247,14 @@ public void messageReceived(final RequestWithAllocationID request, fina @Override public void messageReceived(RequestWithAllocationID request, TransportChannel channel, Task task) { - new AsyncPrimaryAction(request.request, request.allocationId, channel, (ReplicationTask) task).run(); + new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run(); } } class AsyncPrimaryAction extends AbstractRunnable implements ActionListener { private final Request request; - /** allocationId of the shard this request is meant for */ + /** targetAllocationID of the shard this request is meant for */ private final String allocationId; private final TransportChannel channel; private final ReplicationTask replicationTask; @@ -412,9 +413,9 @@ public void messageReceived(final RequestWithAllocationID reques } @Override - public void messageReceived(RequestWithAllocationID request, TransportChannel channel, Task task) + public void messageReceived(RequestWithAllocationID requestWithAID, TransportChannel channel, Task task) throws Exception { - new AsyncReplicaAction(request.request, request.allocationId, channel, (ReplicationTask) task).run(); + new AsyncReplicaAction(requestWithAID.request, requestWithAID.targetAllocationID, channel, (ReplicationTask) task).run(); } } @@ -433,7 +434,7 @@ public RetryOnReplicaException(StreamInput in) throws IOException { private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener { private final ReplicaRequest request; // allocation id of the replica this request is meant for - private final String allocationId; + private final String targetAllocationID; private final TransportChannel channel; /** * The task on the node with the replica shard. @@ -443,11 +444,11 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio // something we want to avoid at all costs private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); - AsyncReplicaAction(ReplicaRequest request, String allocationId, TransportChannel channel, ReplicationTask task) { + AsyncReplicaAction(ReplicaRequest request, String targetAllocationID, TransportChannel channel, ReplicationTask task) { this.request = request; this.channel = channel; this.task = task; - this.allocationId = allocationId; + this.targetAllocationID = targetAllocationID; } @Override @@ -483,7 +484,7 @@ public void onNewClusterState(ClusterState state) { TransportChannelResponseHandler handler = new TransportChannelResponseHandler<>(logger, channel, extraMessage, () -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, - new RequestWithAllocationID<>(request, allocationId), + new RequestWithAllocationID<>(request, targetAllocationID), handler); } @@ -521,7 +522,7 @@ protected void responseWithFailure(Exception e) { protected void doRun() throws Exception { setPhase(task, "replica"); assert request.shardId() != null : "request shardId must be set"; - acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), allocationId, this); + acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), targetAllocationID, this); } /** @@ -966,18 +967,21 @@ public void onFailure(Exception shardFailedError) { final class RequestWithAllocationID extends TransportRequest { /** {@link AllocationId#getId()} of the shard this request is sent to **/ - private String allocationId; + private String targetAllocationID; private R request; RequestWithAllocationID(Supplier requestSupplier) { request = requestSupplier.get(); - allocationId = null; + // null now, but will be populated by reading from the streams + targetAllocationID = null; } - RequestWithAllocationID(R request, String allocationId) { + RequestWithAllocationID(R request, String targetAllocationID) { + Objects.requireNonNull(request); + Objects.requireNonNull(targetAllocationID); this.request = request; - this.allocationId = allocationId; + this.targetAllocationID = targetAllocationID; } @Override @@ -1001,18 +1005,18 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId) @Override public String getDescription() { - return "[" + request.getDescription() + "] for aID [" + allocationId + "]"; + return "[" + request.getDescription() + "] for aID [" + targetAllocationID + "]"; } @Override public void readFrom(StreamInput in) throws IOException { - allocationId = in.readString(); + targetAllocationID = in.readString(); request.readFrom(in); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(allocationId); + out.writeString(targetAllocationID); request.writeTo(out); } @@ -1020,8 +1024,8 @@ public R getRequest() { return request; } - public String getAllocationId() { - return allocationId; + public String getTargetAllocationID() { + return targetAllocationID; } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index e8ee54c1d8a66..8090d8d5c705c 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.action.support.replication; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; @@ -50,7 +48,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; @@ -765,7 +762,7 @@ public void testDefaultWaitForActiveShardsUsesIndexSetting() throws Exception { assertEquals(ActiveShardCount.from(requestWaitForActiveShards), request.waitForActiveShards()); } - /** test that a primary request is reject if it arrives at a shard with a wrong allocation id */ + /** test that a primary request is rejected if it arrives at a shard with a wrong allocation id */ public void testPrimaryActionRejectsWrongAid() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); @@ -781,12 +778,12 @@ action.new PrimaryOperationTransportHandler().messageReceived( fail("using a wrong aid didn't fail the operation"); } catch (ExecutionException execException) { Throwable throwable = execException.getCause(); - logger.debug((Supplier) () -> new ParameterizedMessage("got exception e"), throwable); + logger.debug("got exception:" , throwable); assertTrue(throwable.getClass() + " is not a retry exception", action.retryPrimaryException(throwable)); } } - /** test that a replica request is reject if it arrives at a shard with a wrong allocation id */ + /** test that a replica request is rejected if it arrives at a shard with a wrong allocation id */ public void testReplicaActionRejectsWrongAid() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); @@ -865,7 +862,7 @@ protected ReplicaResult shardOperationOnReplica(Request request) { assertThat(capturedRequest.action, equalTo("testActionWithExceptions[r]")); assertThat(capturedRequest.request, instanceOf(Action.RequestWithAllocationID.class)); assertThat(((Action.RequestWithAllocationID) capturedRequest.request).getRequest(), equalTo(request)); - assertThat(((Action.RequestWithAllocationID) capturedRequest.request).getAllocationId(), equalTo(replica.allocationId().getId())); + assertThat(((Action.RequestWithAllocationID) capturedRequest.request).getTargetAllocationID(), equalTo(replica.allocationId().getId())); } @@ -1018,7 +1015,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService ActionListener callback = (ActionListener) invocation.getArguments()[1]; final long primaryTerm = indexShard.getPrimaryTerm(); if (term < primaryTerm) { - throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", + throw new IllegalArgumentException(String.format("%s operation term [%d] is too old (current [%d])", shardId, term, primaryTerm)); } count.incrementAndGet(); From 83ac99be02841a1b826d47d7fb32d9f7efd58127 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 5 Sep 2016 21:00:14 +0200 Subject: [PATCH 11/14] one more rename --- .../support/replication/TransportReplicationAction.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 6bcd5713fba9e..806b27e406347 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -255,20 +255,20 @@ class AsyncPrimaryAction extends AbstractRunnable implements ActionListener Date: Tue, 6 Sep 2016 14:09:16 +0200 Subject: [PATCH 12/14] line length --- .../support/replication/TransportReplicationActionTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 8090d8d5c705c..07d1987dfb077 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -862,7 +862,8 @@ protected ReplicaResult shardOperationOnReplica(Request request) { assertThat(capturedRequest.action, equalTo("testActionWithExceptions[r]")); assertThat(capturedRequest.request, instanceOf(Action.RequestWithAllocationID.class)); assertThat(((Action.RequestWithAllocationID) capturedRequest.request).getRequest(), equalTo(request)); - assertThat(((Action.RequestWithAllocationID) capturedRequest.request).getTargetAllocationID(), equalTo(replica.allocationId().getId())); + assertThat(((Action.RequestWithAllocationID) capturedRequest.request).getTargetAllocationID(), + equalTo(replica.allocationId().getId())); } From c1996570dac29bd9e99e90be13b8e9d086c9f3d6 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 6 Sep 2016 14:10:20 +0200 Subject: [PATCH 13/14] be concrete --- .../TransportReplicationAction.java | 30 +++++++++---------- .../TransportReplicationActionTests.java | 18 +++++------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 806b27e406347..0fa027744ac62 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -121,11 +121,11 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler()); - transportService.registerRequestHandler(transportPrimaryAction, () -> new RequestWithAllocationID<>(request), executor, + transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, new PrimaryOperationTransportHandler()); // we must never reject on because of thread pool capacity on replicas transportService.registerRequestHandler(transportReplicaAction, - () -> new RequestWithAllocationID<>(replicaRequest), + () -> new ConcreteShardRequest<>(replicaRequest), executor, true, true, new ReplicaOperationTransportHandler()); @@ -239,14 +239,14 @@ public void messageReceived(Request request, TransportChannel channel) throws Ex } } - class PrimaryOperationTransportHandler implements TransportRequestHandler> { + class PrimaryOperationTransportHandler implements TransportRequestHandler> { @Override - public void messageReceived(final RequestWithAllocationID request, final TransportChannel channel) throws Exception { + public void messageReceived(final ConcreteShardRequest request, final TransportChannel channel) throws Exception { throw new UnsupportedOperationException("the task parameter is required for this operation"); } @Override - public void messageReceived(RequestWithAllocationID request, TransportChannel channel, Task task) { + public void messageReceived(ConcreteShardRequest request, TransportChannel channel, Task task) { new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run(); } } @@ -284,7 +284,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) { assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary; DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, - new RequestWithAllocationID<>(request, primary.allocationId().getRelocationId()), + new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId()), transportOptions, new TransportChannelResponseHandler(logger, channel, "rerouting indexing to target primary " + primary, TransportReplicationAction.this::newResponseInstance) { @@ -405,15 +405,15 @@ public void respond(ActionListener listener) { } } - class ReplicaOperationTransportHandler implements TransportRequestHandler> { + class ReplicaOperationTransportHandler implements TransportRequestHandler> { @Override - public void messageReceived(final RequestWithAllocationID request, final TransportChannel channel) + public void messageReceived(final ConcreteShardRequest request, final TransportChannel channel) throws Exception { throw new UnsupportedOperationException("the task parameter is required for this operation"); } @Override - public void messageReceived(RequestWithAllocationID requestWithAID, TransportChannel channel, Task task) + public void messageReceived(ConcreteShardRequest requestWithAID, TransportChannel channel, Task task) throws Exception { new AsyncReplicaAction(requestWithAID.request, requestWithAID.targetAllocationID, channel, (ReplicationTask) task).run(); } @@ -484,7 +484,7 @@ public void onNewClusterState(ClusterState state) { TransportChannelResponseHandler handler = new TransportChannelResponseHandler<>(logger, channel, extraMessage, () -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, - new RequestWithAllocationID<>(request, targetAllocationID), + new ConcreteShardRequest<>(request, targetAllocationID), handler); } @@ -619,7 +619,7 @@ private void performLocalAction(ClusterState state, ShardRouting primary, Discov logger.trace("send action [{}] on primary [{}] for request [{}] with cluster state version [{}] to [{}] ", transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId()); } - performAction(node, transportPrimaryAction, true, new RequestWithAllocationID<>(request, primary.allocationId().getId())); + performAction(node, transportPrimaryAction, true, new ConcreteShardRequest<>(request, primary.allocationId().getId())); } private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) { @@ -921,7 +921,7 @@ public void performOn(ShardRouting replica, ReplicaRequest request, ActionListen return; } transportService.sendRequest(node, transportReplicaAction, - new RequestWithAllocationID<>(request, replica.allocationId().getId()), transportOptions, + new ConcreteShardRequest<>(request, replica.allocationId().getId()), transportOptions, new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); } @@ -964,20 +964,20 @@ public void onFailure(Exception shardFailedError) { } /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/ - final class RequestWithAllocationID extends TransportRequest { + final class ConcreteShardRequest extends TransportRequest { /** {@link AllocationId#getId()} of the shard this request is sent to **/ private String targetAllocationID; private R request; - RequestWithAllocationID(Supplier requestSupplier) { + ConcreteShardRequest(Supplier requestSupplier) { request = requestSupplier.get(); // null now, but will be populated by reading from the streams targetAllocationID = null; } - RequestWithAllocationID(R request, String targetAllocationID) { + ConcreteShardRequest(R request, String targetAllocationID) { Objects.requireNonNull(request); Objects.requireNonNull(targetAllocationID); this.request = request; diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 07d1987dfb077..286a18059a677 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -118,8 +118,8 @@ public class TransportReplicationActionTests extends ESTestCase { * This will throw a {@link ClassCastException} if the request is of the wrong type. */ public static R resolveRequest(TransportRequest requestOrWrappedRequest) { - if (requestOrWrappedRequest instanceof TransportReplicationAction.RequestWithAllocationID) { - requestOrWrappedRequest = ((TransportReplicationAction.RequestWithAllocationID)requestOrWrappedRequest).getRequest(); + if (requestOrWrappedRequest instanceof TransportReplicationAction.ConcreteShardRequest) { + requestOrWrappedRequest = ((TransportReplicationAction.ConcreteShardRequest)requestOrWrappedRequest).getRequest(); } return (R) requestOrWrappedRequest; } @@ -720,7 +720,7 @@ protected ReplicaResult shardOperationOnReplica(Request request) { final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); try { replicaOperationTransportHandler.messageReceived( - action.new RequestWithAllocationID(new Request().setShardId(shardId), replicaRouting.allocationId().getId()), + action.new ConcreteShardRequest(new Request().setShardId(shardId), replicaRouting.allocationId().getId()), createTransportChannel(new PlainActionFuture<>()), task); } catch (ElasticsearchException e) { assertThat(e.getMessage(), containsString("simulated")); @@ -770,7 +770,7 @@ public void testPrimaryActionRejectsWrongAid() throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); action.new PrimaryOperationTransportHandler().messageReceived( - action.new RequestWithAllocationID(request, "_not_a_valid_aid_"), + action.new ConcreteShardRequest(request, "_not_a_valid_aid_"), createTransportChannel(listener), maybeTask() ); try { @@ -796,7 +796,7 @@ public void testReplicaActionRejectsWrongAid() throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); Request request = new Request(shardId).timeout("1ms"); action.new ReplicaOperationTransportHandler().messageReceived( - action.new RequestWithAllocationID(request, "_not_a_valid_aid_"), + action.new ConcreteShardRequest(request, "_not_a_valid_aid_"), createTransportChannel(listener), maybeTask() ); try { @@ -839,7 +839,7 @@ protected ReplicaResult shardOperationOnReplica(Request request) { final Request request = new Request().setShardId(shardId); request.primaryTerm(state.metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); replicaOperationTransportHandler.messageReceived( - action.new RequestWithAllocationID(request, replica.allocationId().getId()), + action.new ConcreteShardRequest(request, replica.allocationId().getId()), createTransportChannel(listener), task); if (listener.isDone()) { listener.get(); // fail with the exception if there @@ -860,9 +860,9 @@ protected ReplicaResult shardOperationOnReplica(Request request) { assertThat(capturedRequests.size(), equalTo(1)); final CapturingTransport.CapturedRequest capturedRequest = capturedRequests.get(0); assertThat(capturedRequest.action, equalTo("testActionWithExceptions[r]")); - assertThat(capturedRequest.request, instanceOf(Action.RequestWithAllocationID.class)); - assertThat(((Action.RequestWithAllocationID) capturedRequest.request).getRequest(), equalTo(request)); - assertThat(((Action.RequestWithAllocationID) capturedRequest.request).getTargetAllocationID(), + assertThat(capturedRequest.request, instanceOf(TransportReplicationAction.ConcreteShardRequest.class)); + assertThat(((TransportReplicationAction.ConcreteShardRequest) capturedRequest.request).getRequest(), equalTo(request)); + assertThat(((TransportReplicationAction.ConcreteShardRequest) capturedRequest.request).getTargetAllocationID(), equalTo(replica.allocationId().getId())); } From edfdff25e1f30484cd6df177318d991aee4de4a5 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 6 Sep 2016 14:19:52 +0200 Subject: [PATCH 14/14] locale --- .../support/replication/TransportReplicationActionTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 286a18059a677..c8aec62339491 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -81,6 +81,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -1016,7 +1017,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService ActionListener callback = (ActionListener) invocation.getArguments()[1]; final long primaryTerm = indexShard.getPrimaryTerm(); if (term < primaryTerm) { - throw new IllegalArgumentException(String.format("%s operation term [%d] is too old (current [%d])", + throw new IllegalArgumentException(String.format(Locale.ROOT, "%s operation term [%d] is too old (current [%d])", shardId, term, primaryTerm)); } count.incrementAndGet();