diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/CloseIndexResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/CloseIndexResponseTests.java index 78e7e42e6dd8f..25f82a843c42b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/CloseIndexResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/indices/CloseIndexResponseTests.java @@ -191,7 +191,7 @@ public final void testBwcFromXContent() throws IOException { { final boolean acknowledged = randomBoolean(); final boolean shardsAcknowledged = acknowledged ? randomBoolean() : false; - final ShardsAcknowledgedResponse expected = new ShardsAcknowledgedResponse(acknowledged, shardsAcknowledged){}; + final ShardsAcknowledgedResponse expected = ShardsAcknowledgedResponse.of(acknowledged, shardsAcknowledged); final XContentType xContentType = randomFrom(XContentType.values()); final BytesReference bytes = toShuffledXContent(expected, xContentType, getParams(), randomBoolean()); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index 1d1f58d97ac1c..430ad0d0a9e66 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -59,12 +59,7 @@ public void testAckedUpdateTask() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch processedLatch = new CountDownLatch(1); clusterService.submitStateUpdateTask("test", - new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) { - @Override - protected Void newResponse(boolean acknowledged) { - return null; - } - + new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) { @Override public boolean mustAck(DiscoveryNode discoveryNode) { return true; @@ -123,12 +118,7 @@ public void testAckedUpdateTaskSameClusterState() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch processedLatch = new CountDownLatch(1); clusterService.submitStateUpdateTask("test", - new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) { - @Override - protected Void newResponse(boolean acknowledged) { - return null; - } - + new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) { @Override public void onAllNodesAcked(@Nullable Exception e) { allNodesAcked.set(true); @@ -182,12 +172,7 @@ public void testAckedUpdateTaskNoAckExpected() throws Exception { final CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask( - "test", new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) { - @Override - protected Void newResponse(boolean acknowledged) { - return null; - } - + "test", new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) { @Override public boolean mustAck(DiscoveryNode discoveryNode) { return false; @@ -243,12 +228,7 @@ public void testAckedUpdateTaskTimeoutZero() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch processedLatch = new CountDownLatch(1); clusterService.submitStateUpdateTask("test", - new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TimeValue.ZERO, TEN_SECONDS), null) { - @Override - protected Void newResponse(boolean acknowledged) { - return null; - } - + new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TimeValue.ZERO, TEN_SECONDS), null) { @Override public boolean mustAck(DiscoveryNode discoveryNode) { return false; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index f83ddf49707ff..432eb0479f35e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -149,7 +149,7 @@ private void submitStateUpdate(final ClusterRerouteRequest request, final Action }))); } - static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask { + static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask { private final ClusterRerouteRequest request; private final ActionListener listener; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 6259f29663d4d..012d8851c94f1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -82,7 +82,7 @@ protected void masterOperation(final ClusterUpdateSettingsRequest request, final final ActionListener listener) { final SettingsUpdater updater = new SettingsUpdater(clusterSettings); clusterService.submitStateUpdateTask("cluster_update_settings", - new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) { + new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) { private volatile boolean changed = false; @@ -125,7 +125,7 @@ private void reroute(final boolean updateSettingsAcked) { // to the components until the ClusterStateListener instances have been invoked, but are visible after // the first update task has been completed. clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", - new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override public boolean mustAck(DiscoveryNode discoveryNode) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java index 7cffbddcc7a03..b31a9d80181be 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/TransportIndicesAliasesAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.AliasAction; @@ -146,10 +145,10 @@ protected void masterOperation(final IndicesAliasesRequest request, final Cluste IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest(unmodifiableList(finalActions)) .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()); - indexAliasesService.indicesAliases(updateRequest, new ActionListener() { + indexAliasesService.indicesAliases(updateRequest, new ActionListener() { @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(AcknowledgedResponse.of(response.isAcknowledged())); + public void onResponse(AcknowledgedResponse response) { + listener.onResponse(response); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index c449594380ea3..bfd9b330298fd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -23,11 +23,11 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; @@ -84,7 +84,7 @@ protected void masterOperation(CreateIndexRequest request, ClusterState state, ActionListener finalListener) { AtomicReference indexNameRef = new AtomicReference<>(); - ActionListener listener = ActionListener.wrap( + ActionListener listener = ActionListener.wrap( response -> { String indexName = indexNameRef.get(); assert indexName != null; @@ -105,12 +105,7 @@ protected void masterOperation(CreateIndexRequest request, finalListener::onFailure ); clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", - new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { - - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override public ClusterState execute(ClusterState currentState) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java index 831c1ed6ed00a..df768b61c71e7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java @@ -121,14 +121,7 @@ public void onFailure(Exception e) { final String taskSource = "delete-dangling-index [" + indexName + "] [" + indexUUID + "]"; clusterService.submitStateUpdateTask( - taskSource, - new AckedClusterStateUpdateTask(deleteRequest, clusterStateUpdatedListener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - + taskSource, new AckedClusterStateUpdateTask(deleteRequest, clusterStateUpdatedListener) { @Override public ClusterState execute(final ClusterState currentState) { return deleteDanglingIndex(currentState, indexToDelete); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java index f3d20dca7d060..510667619f298 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/delete/TransportDeleteIndexAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; @@ -88,11 +87,11 @@ protected void masterOperation(final DeleteIndexRequest request, final ClusterSt .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .indices(concreteIndices.toArray(new Index[concreteIndices.size()])); - deleteIndexService.deleteIndices(deleteRequest, new ActionListener() { + deleteIndexService.deleteIndices(deleteRequest, new ActionListener() { @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(AcknowledgedResponse.of(response.isAcknowledged())); + public void onResponse(AcknowledgedResponse response) { + listener.onResponse(response); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index 4c217d9b8598a..782bc2142dce8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -127,11 +126,11 @@ static void performMappingUpdate(Index[] concreteIndices, .indices(concreteIndices).type(request.type()) .source(request.source()); - metadataMappingService.putMapping(updateRequest, new ActionListener() { + metadataMappingService.putMapping(updateRequest, new ActionListener() { @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(AcknowledgedResponse.of(response.isAcknowledged())); + public void onResponse(AcknowledgedResponse response) { + listener.onResponse(response); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java index d17c55236e9fe..83489463f879c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/open/TransportOpenIndexAction.java @@ -25,9 +25,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -84,10 +84,10 @@ protected void masterOperation(final OpenIndexRequest request, final ClusterStat .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .indices(concreteIndices).waitForActiveShards(request.waitForActiveShards()); - indexStateService.openIndex(updateRequest, new ActionListener() { + indexStateService.openIndex(updateRequest, new ActionListener() { @Override - public void onResponse(OpenIndexClusterStateUpdateResponse response) { + public void onResponse(ShardsAcknowledgedResponse response) { listener.onResponse(new OpenIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged())); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java index cbd22d909d586..96bcaffc74bef 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -82,10 +81,10 @@ protected void masterOperation(final UpdateSettingsRequest request, final Cluste .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()); - updateSettingsService.updateSettings(clusterStateUpdateRequest, new ActionListener() { + updateSettingsService.updateSettings(clusterStateUpdateRequest, new ActionListener() { @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(AcknowledgedResponse.of(response.isAcknowledged())); + public void onResponse(AcknowledgedResponse response) { + listener.onResponse(response); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/template/delete/TransportDeleteIndexTemplateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/template/delete/TransportDeleteIndexTemplateAction.java index 97ec8faf8ad50..9257ab65be9eb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/template/delete/TransportDeleteIndexTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/template/delete/TransportDeleteIndexTemplateAction.java @@ -67,8 +67,8 @@ protected void masterOperation(final DeleteIndexTemplateRequest request, final C .masterTimeout(request.masterNodeTimeout()), new MetadataIndexTemplateService.RemoveListener() { @Override - public void onResponse(MetadataIndexTemplateService.RemoveResponse response) { - listener.onResponse(AcknowledgedResponse.of(response.acknowledged())); + public void onResponse(AcknowledgedResponse response) { + listener.onResponse(response); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeSettingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeSettingsAction.java index 1836e54bd0990..fc396dba91f77 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeSettingsAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -65,10 +64,10 @@ protected void masterOperation(final UpgradeSettingsRequest request, final Clust .versions(request.versions()) .masterNodeTimeout(request.masterNodeTimeout()); - updateSettingsService.upgradeIndexSettings(clusterStateUpdateRequest, new ActionListener() { + updateSettingsService.upgradeIndexSettings(clusterStateUpdateRequest, new ActionListener() { @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(AcknowledgedResponse.of(response.isAcknowledged())); + public void onResponse(AcknowledgedResponse response) { + listener.onResponse(response); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/master/ShardsAcknowledgedResponse.java b/server/src/main/java/org/elasticsearch/action/support/master/ShardsAcknowledgedResponse.java index 308d2dd57d0b5..cbe2fb47278d4 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/ShardsAcknowledgedResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/ShardsAcknowledgedResponse.java @@ -31,7 +31,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; -public abstract class ShardsAcknowledgedResponse extends AcknowledgedResponse { +public class ShardsAcknowledgedResponse extends AcknowledgedResponse { protected static final ParseField SHARDS_ACKNOWLEDGED = new ParseField("shards_acknowledged"); @@ -42,6 +42,10 @@ protected static void declareAcknowledged ObjectParser.ValueType.BOOLEAN); } + public static final ShardsAcknowledgedResponse NOT_ACKNOWLEDGED = new ShardsAcknowledgedResponse(false, false); + private static final ShardsAcknowledgedResponse SHARDS_NOT_ACKNOWLEDGED = new ShardsAcknowledgedResponse(true, false); + private static final ShardsAcknowledgedResponse ACKNOWLEDGED = new ShardsAcknowledgedResponse(true, true); + private final boolean shardsAcknowledged; protected ShardsAcknowledgedResponse(StreamInput in, boolean readShardsAcknowledged, boolean readAcknowledged) throws IOException { @@ -53,6 +57,15 @@ protected ShardsAcknowledgedResponse(StreamInput in, boolean readShardsAcknowled } } + public static ShardsAcknowledgedResponse of(boolean acknowledged, boolean shardsAcknowledged) { + if (acknowledged) { + return shardsAcknowledged ? ACKNOWLEDGED : SHARDS_NOT_ACKNOWLEDGED; + } else { + assert shardsAcknowledged == false; + return NOT_ACKNOWLEDGED; + } + } + protected ShardsAcknowledgedResponse(boolean acknowledged, boolean shardsAcknowledged) { super(acknowledged); assert acknowledged || shardsAcknowledged == false; // if it's not acknowledged, then shards acked should be false too @@ -90,5 +103,4 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(super.hashCode(), isShardsAcknowledged()); } - } diff --git a/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java b/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java index cceaf5709a04f..85dbe6146bba8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ack.AckedRequest; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; @@ -29,18 +30,20 @@ * An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when * all the nodes have acknowledged a cluster state update request */ -public abstract class AckedClusterStateUpdateTask extends ClusterStateUpdateTask implements AckedClusterStateTaskListener { +public abstract class AckedClusterStateUpdateTask extends ClusterStateUpdateTask implements AckedClusterStateTaskListener { - private final ActionListener listener; + private final ActionListener listener; private final AckedRequest request; - protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener listener) { + protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener listener) { this(Priority.NORMAL, request, listener); } - protected AckedClusterStateUpdateTask(Priority priority, AckedRequest request, ActionListener listener) { + @SuppressWarnings("unchecked") + protected AckedClusterStateUpdateTask(Priority priority, AckedRequest request, + ActionListener listener) { super(priority, request.masterNodeTimeout()); - this.listener = listener; + this.listener = (ActionListener) listener; this.request = request; } @@ -64,7 +67,9 @@ public void onAllNodesAcked(@Nullable Exception e) { listener.onResponse(newResponse(e == null)); } - protected abstract Response newResponse(boolean acknowledged); + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return AcknowledgedResponse.of(acknowledged); + } /** * Called once the acknowledgement timeout defined by diff --git a/server/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateResponse.java b/server/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateResponse.java deleted file mode 100644 index 42db2e29c10dc..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateResponse.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.ack; - -/** - * Base response returned after a cluster state update - */ -public class ClusterStateUpdateResponse { - - private final boolean acknowledged; - - public ClusterStateUpdateResponse(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - /** - * Whether the cluster state update was acknowledged or not - */ - public boolean isAcknowledged() { - return acknowledged; - } -} diff --git a/server/src/main/java/org/elasticsearch/cluster/ack/CreateIndexClusterStateUpdateResponse.java b/server/src/main/java/org/elasticsearch/cluster/ack/CreateIndexClusterStateUpdateResponse.java deleted file mode 100644 index c7baded410cf8..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/ack/CreateIndexClusterStateUpdateResponse.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.ack; - -/** - * A cluster state update response with specific fields for index creation. - */ -public class CreateIndexClusterStateUpdateResponse extends ClusterStateUpdateResponse { - - private final boolean shardsAcknowledged; - - public CreateIndexClusterStateUpdateResponse(boolean acknowledged, boolean shardsAcknowledged) { - super(acknowledged); - this.shardsAcknowledged = shardsAcknowledged; - } - - /** - * Returns whether the requisite number of shard copies started before the completion of the operation. - */ - public boolean isShardsAcknowledged() { - return shardsAcknowledged; - } -} diff --git a/server/src/main/java/org/elasticsearch/cluster/ack/OpenIndexClusterStateUpdateResponse.java b/server/src/main/java/org/elasticsearch/cluster/ack/OpenIndexClusterStateUpdateResponse.java deleted file mode 100644 index 33089fa009cdd..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/ack/OpenIndexClusterStateUpdateResponse.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.cluster.ack; - -/** - * A cluster state update response with specific fields for index opening. - */ -public class OpenIndexClusterStateUpdateResponse extends ClusterStateUpdateResponse { - - private final boolean shardsAcknowledged; - - public OpenIndexClusterStateUpdateResponse(boolean acknowledged, boolean shardsAcknowledged) { - super(acknowledged); - this.shardsAcknowledged = shardsAcknowledged; - } - - /** - * Returns whether the requisite number of shard copies started before the completion of the operation. - */ - public boolean isShardsAcknowledged() { - return shardsAcknowledged; - } -} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 6764bdf88d883..514f903f25d48 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -31,7 +31,6 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; @@ -69,7 +68,7 @@ public MetadataCreateDataStreamService(ThreadPool threadPool, public void createDataStream(CreateDataStreamClusterStateUpdateRequest request, ActionListener finalListener) { AtomicReference firstBackingIndexRef = new AtomicReference<>(); - ActionListener listener = ActionListener.wrap( + ActionListener listener = ActionListener.wrap( response -> { if (response.isAcknowledged()) { String firstBackingIndexName = firstBackingIndexRef.get(); @@ -87,19 +86,13 @@ public void createDataStream(CreateDataStreamClusterStateUpdateRequest request, finalListener::onFailure ); clusterService.submitStateUpdateTask("create-data-stream [" + request.name + "]", - new AckedClusterStateUpdateTask(Priority.HIGH, request, listener) { - + new AckedClusterStateUpdateTask(Priority.HIGH, request, listener) { @Override public ClusterState execute(ClusterState currentState) throws Exception { ClusterState clusterState = createDataStream(metadataCreateIndexService, currentState, request); firstBackingIndexRef.set(clusterState.metadata().dataStreams().get(request.name).getIndices().get(0).getName()); return clusterState; } - - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } }); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 38d33d4c6ebae..fe1529c899952 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -33,10 +33,10 @@ import org.elasticsearch.action.admin.indices.shrink.ResizeType; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; -import org.elasticsearch.cluster.ack.CreateIndexClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -256,18 +256,19 @@ public static void validateIndexOrAliasName(String index, BiFunction listener) { + final ActionListener listener) { + logger.trace("createIndex[{}]", request); onlyCreateIndex(request, ActionListener.wrap(response -> { if (response.isAcknowledged()) { activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(), @@ -276,24 +277,21 @@ public void createIndex(final CreateIndexClusterStateUpdateRequest request, logger.debug("[{}] index created, but the operation timed out while waiting for " + "enough shards to be started.", request.index()); } - listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcknowledged)); + listener.onResponse(ShardsAcknowledgedResponse.of(true, shardsAcknowledged)); }, listener::onFailure); } else { - listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false)); + logger.trace("index creation not acknowledged for [{}]", request); + listener.onResponse(ShardsAcknowledgedResponse.NOT_ACKNOWLEDGED); } }, listener::onFailure)); } private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request, - final ActionListener listener) { + final ActionListener listener) { normalizeRequestSetting(request); clusterService.submitStateUpdateTask( "create-index [" + request.index() + "], cause [" + request.cause() + "]", - new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override public ClusterState execute(ClusterState currentState) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java index e2547c5d08650..9fa7b959a28a1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java @@ -23,10 +23,10 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.RestoreInProgress; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -66,25 +66,18 @@ public MetadataDeleteIndexService(Settings settings, ClusterService clusterServi this.allocationService = allocationService; } - public void deleteIndices(final DeleteIndexClusterStateUpdateRequest request, - final ActionListener listener) { + public void deleteIndices(final DeleteIndexClusterStateUpdateRequest request, final ActionListener listener) { if (request.indices() == null || request.indices().length == 0) { throw new IllegalArgumentException("Index name is required"); } clusterService.submitStateUpdateTask("delete-index " + Arrays.toString(request.indices()), - new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { - - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } - - @Override - public ClusterState execute(final ClusterState currentState) { - return deleteIndices(currentState, Sets.newHashSet(request.indices())); - } - }); + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { + @Override + public ClusterState execute(final ClusterState currentState) { + return deleteIndices(currentState, Sets.newHashSet(request.indices())); + } + }); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java index 2ea44d0440119..55c9635470a13 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java @@ -22,9 +22,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.AliasAction.NewAliasValidator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; @@ -75,14 +75,9 @@ public MetadataIndexAliasesService(ClusterService clusterService, IndicesService } public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, - final ActionListener listener) { + final ActionListener listener) { clusterService.submitStateUpdateTask("index-aliases", - new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } - + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override public ClusterState execute(ClusterState currentState) { return applyAliasActions(currentState, request.actions()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java index 5ab4472b7749c..36a996defe996 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java @@ -40,12 +40,12 @@ import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse.AddBlockShardResult; import org.elasticsearch.action.admin.indices.readonly.TransportVerifyShardIndexBlockAction; import org.elasticsearch.action.support.ActiveShardsObserver; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; -import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -788,7 +788,7 @@ static Tuple> closeRoutingTable(final Clus } public void openIndex(final OpenIndexClusterStateUpdateRequest request, - final ActionListener listener) { + final ActionListener listener) { onlyOpenIndex(request, ActionListener.wrap(response -> { if (response.isAcknowledged()) { String[] indexNames = Arrays.stream(request.indices()).map(Index::getName).toArray(String[]::new); @@ -798,28 +798,23 @@ public void openIndex(final OpenIndexClusterStateUpdateRequest request, logger.debug("[{}] indices opened, but the operation timed out while waiting for " + "enough shards to be started.", Arrays.toString(indexNames)); } - listener.onResponse(new OpenIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcknowledged)); + listener.onResponse(ShardsAcknowledgedResponse.of(true, shardsAcknowledged)); }, listener::onFailure); } else { - listener.onResponse(new OpenIndexClusterStateUpdateResponse(false, false)); + listener.onResponse(ShardsAcknowledgedResponse.NOT_ACKNOWLEDGED); } }, listener::onFailure)); } private void onlyOpenIndex(final OpenIndexClusterStateUpdateRequest request, - final ActionListener listener) { + final ActionListener listener) { if (request.indices() == null || request.indices().length == 0) { throw new IllegalArgumentException("Index name is required"); } final String indicesAsString = Arrays.toString(request.indices()); clusterService.submitStateUpdateTask("open-indices " + indicesAsString, - new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } - + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override public ClusterState execute(final ClusterState currentState) { final ClusterState updatedState = openIndices(request.indices(), currentState); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index 18b5346753f1a..b6856d507bf77 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -155,7 +155,7 @@ public ClusterState execute(ClusterState currentState) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new RemoveResponse(true)); + listener.onResponse(AcknowledgedResponse.TRUE); } }); } @@ -1399,21 +1399,9 @@ public RemoveRequest masterTimeout(TimeValue masterTimeout) { } } - public static class RemoveResponse { - private final boolean acknowledged; - - public RemoveResponse(boolean acknowledged) { - this.acknowledged = acknowledged; - } - - public boolean acknowledged() { - return acknowledged; - } - } - public interface RemoveListener { - void onResponse(RemoveResponse response); + void onResponse(AcknowledgedResponse response); void onFailure(Exception e); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java index 1518aeee83f72..fb2535a688af2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java @@ -24,11 +24,11 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateTaskListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -371,7 +371,7 @@ public String describeTasks(List tasks) { } } - public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener listener) { + public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener listener) { clusterService.submitStateUpdateTask("put-mapping " + Strings.arrayToCommaDelimitedString(request.indices()), request, ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()), @@ -390,12 +390,12 @@ public boolean mustAck(DiscoveryNode discoveryNode) { @Override public void onAllNodesAcked(@Nullable Exception e) { - listener.onResponse(new ClusterStateUpdateResponse(e == null)); + listener.onResponse(AcknowledgedResponse.of(e == null)); } @Override public void onAckTimeout() { - listener.onResponse(new ClusterStateUpdateResponse(false)); + listener.onResponse(AcknowledgedResponse.FALSE); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java index 32bb85f3f67cf..f5e4d410168ca 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -26,9 +26,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeSettingsClusterStateUpdateRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; @@ -87,7 +87,7 @@ public MetadataUpdateSettingsService(ClusterService clusterService, AllocationSe } public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, - final ActionListener listener) { + final ActionListener listener) { final Settings normalizedSettings = Settings.builder().put(request.settings()).normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX).build(); Settings.Builder settingsForClosedIndices = Settings.builder(); @@ -116,14 +116,9 @@ public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request final boolean preserveExisting = request.isPreserveExisting(); clusterService.submitStateUpdateTask("update-settings " + Arrays.toString(request.indices()), - new AckedClusterStateUpdateTask(Priority.URGENT, request, + new AckedClusterStateUpdateTask(Priority.URGENT, request, wrapPreservingContext(listener, threadPool.getThreadContext())) { - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } - @Override public ClusterState execute(ClusterState currentState) { @@ -324,15 +319,9 @@ private static boolean maybeUpdateClusterBlock(String[] actualIndices, ClusterBl public void upgradeIndexSettings(final UpgradeSettingsClusterStateUpdateRequest request, - final ActionListener listener) { + final ActionListener listener) { clusterService.submitStateUpdateTask("update-index-compatibility-versions", - new AckedClusterStateUpdateTask(Priority.URGENT, request, - wrapPreservingContext(listener, threadPool.getThreadContext())) { - - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } + new AckedClusterStateUpdateTask(Priority.URGENT, request, wrapPreservingContext(listener, threadPool.getThreadContext())) { @Override public ClusterState execute(ClusterState currentState) { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index abe248beb1794..0f63ba2c29b40 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -249,18 +249,12 @@ public ScriptService getScriptService() { */ public void delete(DeletePipelineRequest request, ActionListener listener) { clusterService.submitStateUpdateTask("delete-pipeline-" + request.getId(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) { - return innerDelete(request, currentState); - } - }); + new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + return innerDelete(request, currentState); + } + }); } static ClusterState innerDelete(DeletePipelineRequest request, ClusterState currentState) { @@ -334,22 +328,16 @@ static List innerGetPipelines(IngestMetadata ingestMetada * Stores the specified pipeline definition in the request. */ public void putPipeline(Map ingestInfos, PutPipelineRequest request, - ActionListener listener) throws Exception { - // validates the pipeline and processor configuration before submitting a cluster update task: - validatePipeline(ingestInfos, request); - clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) { - return innerPut(request, currentState); - } - }); + ActionListener listener) throws Exception { + // validates the pipeline and processor configuration before submitting a cluster update task: + validatePipeline(ingestInfos, request); + clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), + new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + return innerPut(request, currentState); + } + }); } /** diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 4731bbdd1527a..f9bc2163f64a7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -34,7 +35,6 @@ import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -120,13 +120,13 @@ public RepositoriesService(Settings settings, ClusterService clusterService, Tra * @param request register repository request * @param listener register repository listener */ - public void registerRepository(final PutRepositoryRequest request, final ActionListener listener) { + public void registerRepository(final PutRepositoryRequest request, final ActionListener listener) { assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]"; final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings()); validate(request.name()); - final ActionListener registrationListener; + final ActionListener registrationListener; if (request.verify()) { registrationListener = ActionListener.delegateFailure(listener, (delegatedListener, clusterStateUpdateResponse) -> { if (clusterStateUpdateResponse.isAcknowledged()) { @@ -150,11 +150,7 @@ public void registerRepository(final PutRepositoryRequest request, final ActionL } clusterService.submitStateUpdateTask("put_repository [" + request.name() + "]", - new AckedClusterStateUpdateTask(request, registrationListener) { - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } + new AckedClusterStateUpdateTask(request, registrationListener) { @Override public ClusterState execute(ClusterState currentState) { @@ -207,6 +203,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) { } }); } + /** * Unregisters repository in the cluster *

@@ -215,13 +212,9 @@ public boolean mustAck(DiscoveryNode discoveryNode) { * @param request unregister repository request * @param listener unregister repository listener */ - public void unregisterRepository(final DeleteRepositoryRequest request, final ActionListener listener) { + public void unregisterRepository(final DeleteRepositoryRequest request, final ActionListener listener) { clusterService.submitStateUpdateTask("delete_repository [" + request.name() + "]", - new AckedClusterStateUpdateTask(request, listener) { - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } + new AckedClusterStateUpdateTask(request, listener) { @Override public ClusterState execute(ClusterState currentState) { diff --git a/server/src/main/java/org/elasticsearch/script/ScriptService.java b/server/src/main/java/org/elasticsearch/script/ScriptService.java index 0643538b5bc72..e04ae98925ea5 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptService.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptService.java @@ -466,16 +466,9 @@ public void putStoredScript(ClusterService clusterService, PutStoredScriptReques throw new IllegalArgumentException("failed to parse/compile stored script [" + request.id() + "]", exception); } - clusterService.submitStateUpdateTask("put-script-" + request.id(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - + clusterService.submitStateUpdateTask("put-script-" + request.id(), new AckedClusterStateUpdateTask(request, listener) { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { ScriptMetadata smd = currentState.metadata().custom(ScriptMetadata.TYPE); smd = ScriptMetadata.putStoredScript(smd, request.id(), source); Metadata.Builder mdb = Metadata.builder(currentState.getMetadata()).putCustom(ScriptMetadata.TYPE, smd); @@ -488,22 +481,16 @@ public ClusterState execute(ClusterState currentState) throws Exception { public void deleteStoredScript(ClusterService clusterService, DeleteStoredScriptRequest request, ActionListener listener) { clusterService.submitStateUpdateTask("delete-script-" + request.id(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - ScriptMetadata smd = currentState.metadata().custom(ScriptMetadata.TYPE); - smd = ScriptMetadata.deleteStoredScript(smd, request.id()); - Metadata.Builder mdb = Metadata.builder(currentState.getMetadata()).putCustom(ScriptMetadata.TYPE, smd); - - return ClusterState.builder(currentState).metadata(mdb).build(); - } - }); + new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + ScriptMetadata smd = currentState.metadata().custom(ScriptMetadata.TYPE); + smd = ScriptMetadata.deleteStoredScript(smd, request.id()); + Metadata.Builder mdb = Metadata.builder(currentState.getMetadata()).putCustom(ScriptMetadata.TYPE, smd); + + return ClusterState.builder(currentState).metadata(mdb).build(); + } + }); } public StoredScriptSource getStoredScript(ClusterState state, GetStoredScriptRequest request) { diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index f8e73cbcc2adc..277305d880fa8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; @@ -190,7 +191,7 @@ public void testThreadContext() throws InterruptedException { final TimeValue ackTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000)); final TimeValue masterTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000)); - master.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, masterTimeout), null) { + master.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, masterTimeout), null) { @Override public ClusterState execute(ClusterState currentState) { assertTrue(threadPool.getThreadContext().isSystemContext()); @@ -223,11 +224,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS latch.countDown(); } - @Override - protected Void newResponse(boolean acknowledged) { - return null; - } - @Override public void onAllNodesAcked(@Nullable Exception e) { assertFalse(threadPool.getThreadContext().isSystemContext()); @@ -919,8 +915,7 @@ public void testAcking() throws InterruptedException { publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> publishListener.onFailure(new FailedToCommitClusterStateException("mock exception"))); - masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask( - ackedRequest(TimeValue.ZERO, null), null) { + masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(ackedRequest(TimeValue.ZERO, null), null) { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).build(); @@ -932,7 +927,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } @Override - protected Void newResponse(boolean acknowledged) { + protected AcknowledgedResponse newResponse(boolean acknowledged) { fail(); return null; } @@ -965,8 +960,7 @@ public void onAckTimeout() { ackListener.onNodeAck(node3, null); }); - masterService.submitStateUpdateTask( - "test2", new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, null), null) { + masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, null), null) { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).build(); @@ -978,7 +972,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } @Override - protected Void newResponse(boolean acknowledged) { + protected AcknowledgedResponse newResponse(boolean acknowledged) { fail(); return null; } diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportDeleteAutoscalingPolicyAction.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportDeleteAutoscalingPolicyAction.java index a09286b53cd38..1505c55ddf701 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportDeleteAutoscalingPolicyAction.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportDeleteAutoscalingPolicyAction.java @@ -60,22 +60,12 @@ protected void masterOperation( final ClusterState state, final ActionListener listener ) { - clusterService.submitStateUpdateTask( - "delete-autoscaling-policy", - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(final boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - - @Override - public ClusterState execute(final ClusterState currentState) { - return deleteAutoscalingPolicy(currentState, request.name(), logger); - } - + clusterService.submitStateUpdateTask("delete-autoscaling-policy", new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(final ClusterState currentState) { + return deleteAutoscalingPolicy(currentState, request.name(), logger); } - ); + }); } @Override diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportPutAutoscalingPolicyAction.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportPutAutoscalingPolicyAction.java index ebdd3679a068a..4287ce39f9e16 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportPutAutoscalingPolicyAction.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportPutAutoscalingPolicyAction.java @@ -60,22 +60,12 @@ protected void masterOperation( final ClusterState state, ActionListener listener ) { - clusterService.submitStateUpdateTask( - "put-autoscaling-policy", - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(final boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - - @Override - public ClusterState execute(final ClusterState currentState) { - return putAutoscalingPolicy(currentState, request, logger); - } - + clusterService.submitStateUpdateTask("put-autoscaling-policy", new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(final ClusterState currentState) { + return putAutoscalingPolicy(currentState, request, logger); } - ); + }); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java index dd1cf1c92ba09..b73815c6d075d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java @@ -47,15 +47,9 @@ protected void masterOperation(ActivateAutoFollowPatternAction.Request request, ClusterState state, ActionListener listener) throws Exception { clusterService.submitStateUpdateTask("activate-auto-follow-pattern-" + request.getName(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(final boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - + new AckedClusterStateUpdateTask(request, listener) { @Override - public ClusterState execute(final ClusterState currentState) throws Exception { + public ClusterState execute(final ClusterState currentState) { return innerActivate(request, currentState); } }); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java index 43a497e073dd2..7e77acc60ae1c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternAction.java @@ -41,20 +41,14 @@ public TransportDeleteAutoFollowPatternAction(TransportService transportService, @Override protected void masterOperation(DeleteAutoFollowPatternAction.Request request, ClusterState state, - ActionListener listener) throws Exception { + ActionListener listener) { clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getName(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return innerDelete(request, currentState); - } - }); + new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + return innerDelete(request, currentState); + } + }); } static ClusterState innerDelete(DeleteAutoFollowPatternAction.Request request, ClusterState currentState) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index df8734575b47b..2819b6ee43583 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -63,7 +63,7 @@ public TransportPutAutoFollowPatternAction( @Override protected void masterOperation(PutAutoFollowPatternAction.Request request, ClusterState state, - ActionListener listener) throws Exception { + ActionListener listener) { if (ccrLicenseChecker.isCcrAllowed() == false) { listener.onFailure(LicenseUtils.newComplianceException("ccr")); return; @@ -89,15 +89,9 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request, ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> { if (e == null) { clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getRemoteCluster(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - + new AckedClusterStateUpdateTask(request, listener) { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { return innerPut(request, filteredHeaders, currentState, remoteClusterState.getState()); } }); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java index 080418978dd75..ea7294c179b14 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java @@ -262,7 +262,7 @@ && isProductionMode(settings, clusterService.localNode())) { } clusterService.submitStateUpdateTask("register license [" + newLicense.uid() + "]", new - AckedClusterStateUpdateTask(request, listener) { + AckedClusterStateUpdateTask(request, listener) { @Override protected PutLicenseResponse newResponse(boolean acknowledged) { return new PutLicenseResponse(acknowledged, LicensesStatus.VALID); diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/action/TransportFreezeIndexAction.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/action/TransportFreezeIndexAction.java index 25b75eb9adb3f..de4351aeaeacc 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/action/TransportFreezeIndexAction.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/action/TransportFreezeIndexAction.java @@ -16,10 +16,10 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -130,15 +130,15 @@ public void onFailure(final Exception t) { private void toggleFrozenSettings(final Index[] concreteIndices, final FreezeRequest request, final ActionListener listener) { clusterService.submitStateUpdateTask("toggle-frozen-settings", - new AckedClusterStateUpdateTask(Priority.URGENT, request, new ActionListener() { + new AckedClusterStateUpdateTask(Priority.URGENT, request, new ActionListener() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest() .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .indices(concreteIndices).waitForActiveShards(request.waitForActiveShards()); - indexStateService.openIndex(updateRequest, new ActionListener() { + indexStateService.openIndex(updateRequest, new ActionListener() { @Override - public void onResponse(OpenIndexClusterStateUpdateResponse openIndexClusterStateUpdateResponse) { + public void onResponse(ShardsAcknowledgedResponse openIndexClusterStateUpdateResponse) { listener.onResponse(new FreezeResponse(openIndexClusterStateUpdateResponse.isAcknowledged(), openIndexClusterStateUpdateResponse.isShardsAcknowledged())); } @@ -183,11 +183,6 @@ public ClusterState execute(ClusterState currentState) { } return ClusterState.builder(currentState).blocks(blocks).metadata(builder).build(); } - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } }); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java index ea6ef670b04a8..74b10a8b0253d 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java @@ -49,7 +49,7 @@ public TransportDeleteLifecycleAction(TransportService transportService, Cluster @Override protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { clusterService.submitStateUpdateTask("delete-lifecycle-" + request.getPolicyName(), - new AckedClusterStateUpdateTask(request, listener) { + new AckedClusterStateUpdateTask(request, listener) { @Override protected Response newResponse(boolean acknowledged) { return new Response(acknowledged); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportMoveToStepAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportMoveToStepAction.java index 2755b51f8049e..148c1813f9b61 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportMoveToStepAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportMoveToStepAction.java @@ -47,7 +47,7 @@ protected void masterOperation(Request request, ClusterState state, ActionListen return; } clusterService.submitStateUpdateTask("index[" + request.getIndex() + "]-move-to-step", - new AckedClusterStateUpdateTask(request, listener) { + new AckedClusterStateUpdateTask(request, listener) { @Override public ClusterState execute(ClusterState currentState) { return indexLifecycleService.moveClusterStateToStep(currentState, indexMetadata.getIndex(), request.getCurrentStepKey(), diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java index 7fbd11ff717ef..f64f468b8c9da 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java @@ -86,7 +86,7 @@ protected void masterOperation(Request request, ClusterState state, ActionListen .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); LifecyclePolicy.validatePolicyName(request.getPolicy().getName()); clusterService.submitStateUpdateTask("put-lifecycle-" + request.getPolicy().getName(), - new AckedClusterStateUpdateTask(request, listener) { + new AckedClusterStateUpdateTask(request, listener) { @Override protected Response newResponse(boolean acknowledged) { return new Response(acknowledged); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRemoveIndexLifecyclePolicyAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRemoveIndexLifecyclePolicyAction.java index 280242326f144..d54db0846a73a 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRemoveIndexLifecyclePolicyAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRemoveIndexLifecyclePolicyAction.java @@ -9,8 +9,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -46,7 +46,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { final Index[] indices = indexNameExpressionResolver.concreteIndices(state, request.indicesOptions(), true, request.indices()); clusterService.submitStateUpdateTask("remove-lifecycle-for-index", - new AckedClusterStateUpdateTask(request, listener) { + new ClusterStateUpdateTask(request.masterNodeTimeout()) { private final List failedIndexes = new ArrayList<>(); @@ -61,8 +61,8 @@ public void onFailure(String source, Exception e) { } @Override - protected Response newResponse(boolean acknowledged) { - return new Response(failedIndexes); + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new Response(failedIndexes)); } }); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java index 5cfb872cf73e9..2bb24dc3e2c52 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportRetryAction.java @@ -46,7 +46,7 @@ public TransportRetryAction(TransportService transportService, ClusterService cl @Override protected void masterOperation(Request request, ClusterState state, ActionListener listener) { clusterService.submitStateUpdateTask("ilm-re-run", - new AckedClusterStateUpdateTask(request, listener) { + new AckedClusterStateUpdateTask(request, listener) { @Override public ClusterState execute(ClusterState currentState) { return indexLifecycleService.moveClusterStateToPreviouslyFailedStep(currentState, request.indices()); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java index a5ab74d95ba3a..bc993897b58da 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStartILMAction.java @@ -35,18 +35,12 @@ public TransportStartILMAction(TransportService transportService, ClusterService @Override protected void masterOperation(StartILMRequest request, ClusterState state, ActionListener listener) { - clusterService.submitStateUpdateTask("ilm_operation_mode_update", - new AckedClusterStateUpdateTask(request, listener) { - @Override - public ClusterState execute(ClusterState currentState) { - return (OperationModeUpdateTask.ilmMode(OperationMode.RUNNING)).execute(currentState); - } - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - }); + clusterService.submitStateUpdateTask("ilm_operation_mode_update", new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + return (OperationModeUpdateTask.ilmMode(OperationMode.RUNNING)).execute(currentState); + } + }); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java index 5de02a0776b6e..02a2cefeb7d99 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java @@ -37,16 +37,10 @@ public TransportStopILMAction(TransportService transportService, ClusterService @Override protected void masterOperation(StopILMRequest request, ClusterState state, ActionListener listener) { clusterService.submitStateUpdateTask("ilm_operation_mode_update", - new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) { - + new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) { @Override public ClusterState execute(ClusterState currentState) { - return (OperationModeUpdateTask.ilmMode(OperationMode.STOPPING)).execute(currentState); - } - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); + return (OperationModeUpdateTask.ilmMode(OperationMode.STOPPING)).execute(currentState); } }); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportDeleteSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportDeleteSnapshotLifecycleAction.java index a9318d28db14a..2c4e59f621400 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportDeleteSnapshotLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportDeleteSnapshotLifecycleAction.java @@ -43,7 +43,7 @@ protected void masterOperation(DeleteSnapshotLifecycleAction.Request request, ClusterState state, ActionListener listener) throws Exception { clusterService.submitStateUpdateTask("delete-snapshot-lifecycle-" + request.getLifecycleId(), - new AckedClusterStateUpdateTask(request, listener) { + new AckedClusterStateUpdateTask(request, listener) { @Override protected DeleteSnapshotLifecycleAction.Response newResponse(boolean acknowledged) { return new DeleteSnapshotLifecycleAction.Response(acknowledged); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportPutSnapshotLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportPutSnapshotLifecycleAction.java index 6d4e6d9be45cb..8fd7053f93656 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportPutSnapshotLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportPutSnapshotLifecycleAction.java @@ -66,7 +66,7 @@ protected void masterOperation(final PutSnapshotLifecycleAction.Request request, .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); LifecyclePolicy.validatePolicyName(request.getLifecycleId()); clusterService.submitStateUpdateTask("put-snapshot-lifecycle-" + request.getLifecycleId(), - new AckedClusterStateUpdateTask(request, listener) { + new AckedClusterStateUpdateTask(request, listener) { @Override public ClusterState execute(ClusterState currentState) { SnapshotLifecycleMetadata snapMeta = currentState.metadata().custom(SnapshotLifecycleMetadata.TYPE); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java index 2982ce1214de0..15ff780a2ca70 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStartSLMAction.java @@ -35,18 +35,12 @@ public TransportStartSLMAction(TransportService transportService, ClusterService @Override protected void masterOperation(StartSLMAction.Request request, ClusterState state, ActionListener listener) { - clusterService.submitStateUpdateTask("slm_operation_mode_update", - new AckedClusterStateUpdateTask(request, listener) { - @Override - public ClusterState execute(ClusterState currentState) { - return (OperationModeUpdateTask.slmMode(OperationMode.RUNNING)).execute(currentState); - } - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - }); + clusterService.submitStateUpdateTask("slm_operation_mode_update", new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + return (OperationModeUpdateTask.slmMode(OperationMode.RUNNING)).execute(currentState); + } + }); } @Override diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java index d70bda347ea4e..26ec5a4a2aec8 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportStopSLMAction.java @@ -35,18 +35,12 @@ public TransportStopSLMAction(TransportService transportService, ClusterService @Override protected void masterOperation(StopSLMAction.Request request, ClusterState state, ActionListener listener) { - clusterService.submitStateUpdateTask("slm_operation_mode_update", - new AckedClusterStateUpdateTask(request, listener) { - @Override - public ClusterState execute(ClusterState currentState) { - return (OperationModeUpdateTask.slmMode(OperationMode.STOPPING)).execute(currentState); - } - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } - }); + clusterService.submitStateUpdateTask("slm_operation_mode_update", new AckedClusterStateUpdateTask(request, listener) { + @Override + public ClusterState execute(ClusterState currentState) { + return (OperationModeUpdateTask.slmMode(OperationMode.STOPPING)).execute(currentState); + } + }); } @Override diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportStopILMActionTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportStopILMActionTests.java index 34d73b7cdc251..02cf37ef0ff88 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportStopILMActionTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportStopILMActionTests.java @@ -40,7 +40,6 @@ public void onFailure(Exception e) { } }; - @SuppressWarnings("unchecked") public void testStopILMClusterStatePriorityIsImmediate() { ClusterService clusterService = mock(ClusterService.class); @@ -51,7 +50,7 @@ public void testStopILMClusterStatePriorityIsImmediate() { verify(clusterService).submitStateUpdateTask( eq("ilm_operation_mode_update"), - argThat(new ArgumentMatcher>() { + argThat(new ArgumentMatcher() { Priority actualPriority = null; @@ -60,7 +59,7 @@ public boolean matches(Object argument) { if (argument instanceof AckedClusterStateUpdateTask == false) { return false; } - actualPriority = ((AckedClusterStateUpdateTask) argument).priority(); + actualPriority = ((AckedClusterStateUpdateTask) argument).priority(); return actualPriority == Priority.IMMEDIATE; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java index 149cf79aa22b4..dd41da6be5139 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java @@ -207,7 +207,7 @@ protected void masterOperation(SetUpgradeModeAction.Request request, ClusterStat //<1> Change MlMetadata to indicate that upgrade_mode is now enabled clusterService.submitStateUpdateTask("ml-set-upgrade-mode", - new AckedClusterStateUpdateTask(request, clusterStateUpdateListener) { + new AckedClusterStateUpdateTask(request, clusterStateUpdateListener) { @Override protected AcknowledgedResponse newResponse(boolean acknowledged) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 4c9e0cb9c1c31..2f1705595e688 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -254,8 +254,7 @@ public void testPutJob_AddsCreateTime() throws IOException { PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob()); doAnswer(invocation -> { - AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocation.getArguments()[1]; - task.onAllNodesAcked(null); + ((AckedClusterStateUpdateTask) invocation.getArguments()[1]).onAllNodesAcked(null); return null; }).when(clusterService).submitStateUpdateTask(Matchers.eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/TokenAuthIntegTests.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/TokenAuthIntegTests.java index d9fe11f8a7204..b2a036d494f51 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/TokenAuthIntegTests.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/authc/TokenAuthIntegTests.java @@ -15,11 +15,11 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -136,7 +136,7 @@ public void testTokenServiceCanRotateKeys() throws Exception { assertEquals(activeKeyHash, tokenService.getActiveKeyHash()); } client().admin().cluster().prepareHealth().execute().get(); - PlainActionFuture rotateActionFuture = new PlainActionFuture<>(); + PlainActionFuture rotateActionFuture = new PlainActionFuture<>(); logger.info("rotate on master: {}", masterName); masterTokenService.rotateKeysOnMaster(rotateActionFuture); assertTrue(rotateActionFuture.actionGet().isAcknowledged()); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java index 67b8b36d82c7e..599a987417332 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; @@ -42,7 +43,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ack.AckedRequest; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; @@ -2127,7 +2127,7 @@ synchronized String getActiveKeyHash() { return new BytesRef(Base64.getUrlEncoder().withoutPadding().encode(this.keyCache.currentTokenKeyHash.bytes)).utf8ToString(); } - void rotateKeysOnMaster(ActionListener listener) { + void rotateKeysOnMaster(ActionListener listener) { logger.info("rotate keys on master"); TokenMetadata tokenMetadata = generateSpareKey(); clusterService.submitStateUpdateTask("publish next key to prepare key rotation", @@ -2143,11 +2143,11 @@ void rotateKeysOnMaster(ActionListener listener) { }, listener::onFailure))); } - private final class TokenMetadataPublishAction extends AckedClusterStateUpdateTask { + private static final class TokenMetadataPublishAction extends AckedClusterStateUpdateTask { private final TokenMetadata tokenMetadata; - protected TokenMetadataPublishAction(TokenMetadata tokenMetadata, ActionListener listener) { + protected TokenMetadataPublishAction(TokenMetadata tokenMetadata, ActionListener listener) { super(new AckedRequest() { @Override public TimeValue ackTimeout() { @@ -2171,12 +2171,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { } return ClusterState.builder(currentState).putCustom(TokenMetadata.TYPE, tokenMetadata).build(); } - - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); - } - } private void initialize(ClusterService clusterService) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java index 52f0d4bc35ee6..09d6721fea15f 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java @@ -70,39 +70,32 @@ protected void masterOperation(WatcherServiceRequest request, ClusterState state private void setWatcherMetadataAndWait(boolean manuallyStopped, final ActionListener listener) { String source = manuallyStopped ? "update_watcher_manually_stopped" : "update_watcher_manually_started"; - clusterService.submitStateUpdateTask(source, - new AckedClusterStateUpdateTask(ackedRequest, listener) { + clusterService.submitStateUpdateTask(source, new AckedClusterStateUpdateTask(ackedRequest, listener) { + @Override + public ClusterState execute(ClusterState clusterState) { + XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return AcknowledgedResponse.of(acknowledged); - } + WatcherMetadata newWatcherMetadata = new WatcherMetadata(manuallyStopped); + WatcherMetadata currentMetadata = clusterState.metadata().custom(WatcherMetadata.TYPE); - @Override - public ClusterState execute(ClusterState clusterState) { - XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); + // adhere to the contract of returning the original state if nothing has changed + if (newWatcherMetadata.equals(currentMetadata)) { + return clusterState; + } else { + ClusterState.Builder builder = new ClusterState.Builder(clusterState); + builder.metadata(Metadata.builder(clusterState.getMetadata()) + .putCustom(WatcherMetadata.TYPE, newWatcherMetadata)); + return builder.build(); + } + } - WatcherMetadata newWatcherMetadata = new WatcherMetadata(manuallyStopped); - WatcherMetadata currentMetadata = clusterState.metadata().custom(WatcherMetadata.TYPE); - - // adhere to the contract of returning the original state if nothing has changed - if (newWatcherMetadata.equals(currentMetadata)) { - return clusterState; - } else { - ClusterState.Builder builder = new ClusterState.Builder(clusterState); - builder.metadata(Metadata.builder(clusterState.getMetadata()) - .putCustom(WatcherMetadata.TYPE, newWatcherMetadata)); - return builder.build(); - } - } - - @Override - public void onFailure(String source, Exception e) { - logger.error(new ParameterizedMessage("could not update watcher stopped status to [{}], source [{}]", - manuallyStopped, source), e); - listener.onFailure(e); - } - }); + @Override + public void onFailure(String source, Exception e) { + logger.error(new ParameterizedMessage("could not update watcher stopped status to [{}], source [{}]", + manuallyStopped, source), e); + listener.onFailure(e); + } + }); } @Override