From c6ee6d474efefe7bbf42d1a4d72ff0eed9c90589 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 15 Feb 2021 13:22:09 +0100 Subject: [PATCH] Remove node mapping refresh action (#68909) The mapping refresh action was used when every local node used to process mappings, to eventually ask the master node to refresh its mappings and re-send them to all nodes. This is no longer needed and could actually be hiding some unexpected situations that we would like to know about and handle specifically. This commit removes the node refresh action which is not expected to be called by any node, and replaces it with an assertion that verifies that we never end up in a situation where we need a refresh. Also, the received mappings received by any local node are now applied as they are, without any further merging, as they were already merged by the master node. --- .../elasticsearch/cluster/ClusterModule.java | 2 - .../index/NodeMappingRefreshAction.java | 115 --------------- .../metadata/MetadataMappingService.java | 138 ------------------ .../org/elasticsearch/index/IndexService.java | 8 +- .../index/mapper/MapperService.java | 94 ++++++------ .../cluster/IndicesClusterStateService.java | 28 +--- ...actIndicesClusterStateServiceTestCase.java | 5 +- ...ClusterStateServiceRandomUpdatesTests.java | 1 - .../snapshots/SnapshotResiliencyTests.java | 4 +- .../action/TransportResumeFollowAction.java | 1 - .../TransportResumeFollowActionTests.java | 12 +- 11 files changed, 64 insertions(+), 344 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 54fb1d20a80a0..5a36413be9d2f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -9,7 +9,6 @@ package org.elasticsearch.cluster; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; -import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.ComponentTemplateMetadata; import org.elasticsearch.cluster.metadata.ComposableIndexTemplateMetadata; @@ -269,7 +268,6 @@ protected void configure() { bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver); bind(DelayedAllocationService.class).asEagerSingleton(); bind(ShardStateAction.class).asEagerSingleton(); - bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); bind(TaskResultsService.class).asEagerSingleton(); bind(AllocationDeciders.class).toInstance(allocationDeciders); diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java deleted file mode 100644 index 22d8278fce208..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/NodeMappingRefreshAction.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.cluster.action.index; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.MetadataMappingService; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportService; - -import java.io.IOException; - -public class NodeMappingRefreshAction { - - private static final Logger logger = LogManager.getLogger(NodeMappingRefreshAction.class); - - public static final String ACTION_NAME = "internal:cluster/node/mapping/refresh"; - - private final TransportService transportService; - private final MetadataMappingService metadataMappingService; - - @Inject - public NodeMappingRefreshAction(TransportService transportService, MetadataMappingService metadataMappingService) { - this.transportService = transportService; - this.metadataMappingService = metadataMappingService; - transportService.registerRequestHandler(ACTION_NAME, - ThreadPool.Names.SAME, NodeMappingRefreshRequest::new, new NodeMappingRefreshTransportHandler()); - } - - public void nodeMappingRefresh(final DiscoveryNode masterNode, final NodeMappingRefreshRequest request) { - if (masterNode == null) { - logger.warn("can't send mapping refresh for [{}], no master known.", request.index()); - return; - } - transportService.sendRequest(masterNode, ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); - } - - private class NodeMappingRefreshTransportHandler implements TransportRequestHandler { - - @Override - public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel, Task task) throws Exception { - metadataMappingService.refreshMapping(request.index(), request.indexUUID()); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } - - public static class NodeMappingRefreshRequest extends TransportRequest implements IndicesRequest { - - private String index; - private String indexUUID = IndexMetadata.INDEX_UUID_NA_VALUE; - private String nodeId; - - public NodeMappingRefreshRequest(StreamInput in) throws IOException { - super(in); - index = in.readString(); - nodeId = in.readString(); - indexUUID = in.readString(); - } - - public NodeMappingRefreshRequest(String index, String indexUUID, String nodeId) { - this.index = index; - this.indexUUID = indexUUID; - this.nodeId = nodeId; - } - - @Override - public String[] indices() { - return new String[]{index}; - } - - @Override - public IndicesOptions indicesOptions() { - return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); - } - - public String index() { - return index; - } - - public String indexUUID() { - return indexUUID; - } - - public String nodeId() { - return nodeId; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(index); - out.writeString(nodeId); - out.writeString(indexUUID); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java index 0540290f3083c..027d69d55818f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMappingService.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -28,7 +27,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService.MergeReason; @@ -36,13 +34,10 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED; - /** * Service responsible for submitting mapping changes */ @@ -53,7 +48,6 @@ public class MetadataMappingService { private final ClusterService clusterService; private final IndicesService indicesService; - final RefreshTaskExecutor refreshExecutor = new RefreshTaskExecutor(); final PutMappingExecutor putMappingExecutor = new PutMappingExecutor(); @@ -63,138 +57,6 @@ public MetadataMappingService(ClusterService clusterService, IndicesService indi this.indicesService = indicesService; } - static class RefreshTask { - final String index; - final String indexUUID; - - RefreshTask(String index, final String indexUUID) { - this.index = index; - this.indexUUID = indexUUID; - } - - @Override - public String toString() { - return "[" + index + "][" + indexUUID + "]"; - } - } - - class RefreshTaskExecutor implements ClusterStateTaskExecutor { - @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { - ClusterState newClusterState = executeRefresh(currentState, tasks); - return ClusterTasksResult.builder().successes(tasks).build(newClusterState); - } - } - - /** - * Batch method to apply all the queued refresh operations. The idea is to try and batch as much - * as possible so we won't create the same index all the time for example for the updates on the same mapping - * and generate a single cluster change event out of all of those. - */ - ClusterState executeRefresh(final ClusterState currentState, final List allTasks) throws Exception { - // break down to tasks per index, so we can optimize the on demand index service creation - // to only happen for the duration of a single index processing of its respective events - Map> tasksPerIndex = new HashMap<>(); - for (RefreshTask task : allTasks) { - if (task.index == null) { - logger.debug("ignoring a mapping task of type [{}] with a null index.", task); - } - tasksPerIndex.computeIfAbsent(task.index, k -> new ArrayList<>()).add(task); - } - - boolean dirty = false; - Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); - - for (Map.Entry> entry : tasksPerIndex.entrySet()) { - IndexMetadata indexMetadata = mdBuilder.get(entry.getKey()); - if (indexMetadata == null) { - // index got deleted on us, ignore... - logger.debug("[{}] ignoring tasks - index meta data doesn't exist", entry.getKey()); - continue; - } - final Index index = indexMetadata.getIndex(); - // the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep - // the latest (based on order) update mapping one per node - List allIndexTasks = entry.getValue(); - boolean hasTaskWithRightUUID = false; - for (RefreshTask task : allIndexTasks) { - if (indexMetadata.isSameUUID(task.indexUUID)) { - hasTaskWithRightUUID = true; - } else { - logger.debug("{} ignoring task [{}] - index meta data doesn't match task uuid", index, task); - } - } - if (hasTaskWithRightUUID == false) { - continue; - } - - // construct the actual index if needed, and make sure the relevant mappings are there - boolean removeIndex = false; - IndexService indexService = indicesService.indexService(indexMetadata.getIndex()); - if (indexService == null) { - // we need to create the index here, and add the current mapping to it, so we can merge - indexService = indicesService.createIndex(indexMetadata, Collections.emptyList(), false); - removeIndex = true; - indexService.mapperService().merge(indexMetadata, MergeReason.MAPPING_RECOVERY); - } - - IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata); - try { - boolean indexDirty = refreshIndexMapping(indexService, builder); - if (indexDirty) { - mdBuilder.put(builder); - dirty = true; - } - } finally { - if (removeIndex) { - indicesService.removeIndex(index, NO_LONGER_ASSIGNED, "created for mapping processing"); - } - } - } - - if (dirty == false) { - return currentState; - } - return ClusterState.builder(currentState).metadata(mdBuilder).build(); - } - - private boolean refreshIndexMapping(IndexService indexService, IndexMetadata.Builder builder) { - boolean dirty = false; - String index = indexService.index().getName(); - try { - MapperService mapperService = indexService.mapperService(); - DocumentMapper mapper = mapperService.documentMapper(); - if (mapper != null) { - if (mapper.mappingSource().equals(builder.mapping().source()) == false) { - dirty = true; - } - } - - // if the mapping is not up-to-date, re-send everything - if (dirty) { - logger.warn("[{}] re-syncing mappings with cluster state]", index); - builder.putMapping(new MappingMetadata(mapper)); - - } - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to refresh-mapping in cluster state", index), e); - } - return dirty; - } - - /** - * Refreshes mappings if they are not the same between original and parsed version - */ - public void refreshMapping(final String index, final String indexUUID) { - final RefreshTask refreshTask = new RefreshTask(index, indexUUID); - clusterService.submitStateUpdateTask("refresh-mapping [" + index + "]", - refreshTask, - ClusterStateTaskConfig.build(Priority.HIGH), - refreshExecutor, - (source, e) -> logger.warn(() -> new ParameterizedMessage("failure during [{}]", source), e) - ); - } - class PutMappingExecutor implements ClusterStateTaskExecutor { @Override public ClusterTasksResult diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 185811d5b5965..15e0ba355c350 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -646,12 +646,10 @@ List getSearchOperationListener() { // pkg private for return searchOperationListeners; } - @Override - public boolean updateMapping(final IndexMetadata currentIndexMetadata, final IndexMetadata newIndexMetadata) throws IOException { - if (mapperService == null) { - return false; + public void updateMapping(final IndexMetadata currentIndexMetadata, final IndexMetadata newIndexMetadata) throws IOException { + if (mapperService != null) { + mapperService.updateMapping(currentIndexMetadata, newIndexMetadata); } - return mapperService.updateMapping(currentIndexMetadata, newIndexMetadata); } private class StoreCloseListener implements Store.OnClose { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java index 7d607174f49d0..98bfecdd7c8f1 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -8,7 +8,6 @@ package org.elasticsearch.index.mapper; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Assertions; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -21,6 +20,7 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; @@ -169,54 +169,54 @@ public static Map parseMapping(NamedXContentRegistry xContentReg /** * Update mapping by only merging the metadata that is different between received and stored entries */ - public boolean updateMapping(final IndexMetadata currentIndexMetadata, final IndexMetadata newIndexMetadata) throws IOException { + public void updateMapping(final IndexMetadata currentIndexMetadata, final IndexMetadata newIndexMetadata) throws IOException { assert newIndexMetadata.getIndex().equals(index()) : "index mismatch: expected " + index() + " but was " + newIndexMetadata.getIndex(); if (currentIndexMetadata != null && currentIndexMetadata.getMappingVersion() == newIndexMetadata.getMappingVersion()) { assertMappingVersion(currentIndexMetadata, newIndexMetadata, this.mapper); - return false; + return; } - final DocumentMapper updatedMapper; - try { - updatedMapper = merge(newIndexMetadata, MergeReason.MAPPING_RECOVERY); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to apply mappings", index()), e); - throw e; - } - - if (updatedMapper == null) { - return false; - } - - boolean requireRefresh = false; - - assertMappingVersion(currentIndexMetadata, newIndexMetadata, updatedMapper); - - MappingMetadata mappingMetadata = newIndexMetadata.mapping(); - CompressedXContent incomingMappingSource = mappingMetadata.source(); - - String op = mapper != null ? "updated" : "added"; - if (logger.isDebugEnabled() && incomingMappingSource.compressed().length < 512) { - logger.debug("[{}] {} mapping, source [{}]", index(), op, incomingMappingSource.string()); - } else if (logger.isTraceEnabled()) { - logger.trace("[{}] {} mapping, source [{}]", index(), op, incomingMappingSource.string()); - } else { - logger.debug("[{}] {} mapping (source suppressed due to length, use TRACE level if needed)", - index(), op); + MappingMetadata newMappingMetadata = newIndexMetadata.mapping(); + if (newMappingMetadata != null) { + String type = newMappingMetadata.type(); + CompressedXContent incomingMappingSource = newMappingMetadata.source(); + Mapping incomingMapping = parseMappings(type, incomingMappingSource); + DocumentMapper previousMapper; + synchronized (this) { + previousMapper = this.mapper; + assert assertRefreshIsNotNeeded(previousMapper, type, incomingMappingSource, incomingMapping); + this.mapper = newDocumentMapper(incomingMapping, MergeReason.MAPPING_RECOVERY); + } + String op = previousMapper != null ? "updated" : "added"; + if (logger.isDebugEnabled() && incomingMappingSource.compressed().length < 512) { + logger.debug("[{}] {} mapping, source [{}]", index(), op, incomingMappingSource.string()); + } else if (logger.isTraceEnabled()) { + logger.trace("[{}] {} mapping, source [{}]", index(), op, incomingMappingSource.string()); + } else { + logger.debug("[{}] {} mapping (source suppressed due to length, use TRACE level if needed)", + index(), op); + } } + } - // refresh mapping can happen when the parsing/merging of the mapping from the metadata doesn't result in the same - // mapping, in this case, we send to the master to refresh its own version of the mappings (to conform with the - // merge version of it, which it does when refreshing the mappings), and warn log it. - if (documentMapper().mappingSource().equals(incomingMappingSource) == false) { - logger.debug("[{}] parsed mapping, and got different sources\noriginal:\n{}\nparsed:\n{}", - index(), incomingMappingSource, documentMapper().mappingSource()); - - requireRefresh = true; + private boolean assertRefreshIsNotNeeded(DocumentMapper currentMapper, + String type, + CompressedXContent incomingMappingSource, + Mapping incomingMapping) { + Mapping mergedMapping = mergeMappings(currentMapper, incomingMapping, MergeReason.MAPPING_RECOVERY); + CompressedXContent mergedMappingSource; + try { + mergedMappingSource = new CompressedXContent(mergedMapping, XContentType.JSON, ToXContent.EMPTY_PARAMS); + } catch (Exception e) { + throw new AssertionError("failed to serialize source for type [" + type + "]", e); } - return requireRefresh; + // we used to ask the master to refresh its mappings whenever the result of merging the incoming mappings with the + // current mappings differs from the incoming mappings. We now rather assert that this situation never happens. + assert mergedMappingSource.equals(incomingMappingSource) : "[" + index() + "] parsed mapping, and got different sources\n" + + "incoming:\n" + incomingMappingSource + "\nmerged:\n" + mergedMappingSource; + return true; } private void assertMappingVersion( @@ -264,13 +264,12 @@ public void merge(String type, Map mappings, MergeReason reason) mergeAndApplyMappings(type, content, reason); } - public DocumentMapper merge(IndexMetadata indexMetadata, MergeReason reason) { + public void merge(IndexMetadata indexMetadata, MergeReason reason) { assert reason != MergeReason.MAPPING_UPDATE_PREFLIGHT; MappingMetadata mappingMetadata = indexMetadata.mapping(); if (mappingMetadata != null) { - return mergeAndApplyMappings(mappingMetadata.type(), mappingMetadata.source(), reason); + mergeAndApplyMappings(mappingMetadata.type(), mappingMetadata.source(), reason); } - return null; } public DocumentMapper merge(String type, CompressedXContent mappingSource, MergeReason reason) { @@ -278,7 +277,8 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge } private synchronized DocumentMapper mergeAndApplyMappings(String mappingType, CompressedXContent mappingSource, MergeReason reason) { - Mapping mapping = mergeMappings(mappingType, mappingSource, reason); + Mapping incomingMapping = parseMappings(mappingType, mappingSource); + Mapping mapping = mergeMappings(this.mapper, incomingMapping, reason); DocumentMapper newMapper = newDocumentMapper(mapping, reason); if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) { return newMapper; @@ -295,16 +295,16 @@ private DocumentMapper newDocumentMapper(Mapping mapping, MergeReason reason) { return newMapper; } - private Mapping mergeMappings(String mappingType, CompressedXContent mappingSource, MergeReason reason) { - Mapping incomingMapping; + private Mapping parseMappings(String mappingType, CompressedXContent mappingSource) { try { - incomingMapping = mappingParser.parse(mappingType, mappingSource); + return mappingParser.parse(mappingType, mappingSource); } catch (Exception e) { throw new MapperParsingException("Failed to parse mapping: {}", e, e.getMessage()); } + } + private static Mapping mergeMappings(DocumentMapper currentMapper, Mapping incomingMapping, MergeReason reason) { Mapping newMapping; - DocumentMapper currentMapper = this.mapper; if (currentMapper == null) { newMapping = incomingMapping; } else { diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9599392386a7d..25ff8bfc9d2d0 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -19,7 +19,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -52,10 +51,10 @@ import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardRelocatedException; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.PeerRecoverySourceService; @@ -95,7 +94,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final ThreadPool threadPool; private final PeerRecoveryTargetService recoveryTargetService; private final ShardStateAction shardStateAction; - private final NodeMappingRefreshAction nodeMappingRefreshAction; private static final ActionListener SHARD_STATE_ACTION_LISTENER = ActionListener.wrap(() -> {}); @@ -107,7 +105,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final FailedShardHandler failedShardHandler = new FailedShardHandler(); - private final boolean sendRefreshMapping; private final List buildInIndexListener; private final PrimaryReplicaSyncer primaryReplicaSyncer; private final RetentionLeaseSyncer retentionLeaseSyncer; @@ -121,7 +118,6 @@ public IndicesClusterStateService( final ThreadPool threadPool, final PeerRecoveryTargetService recoveryTargetService, final ShardStateAction shardStateAction, - final NodeMappingRefreshAction nodeMappingRefreshAction, final RepositoriesService repositoriesService, final SearchService searchService, final PeerRecoverySourceService peerRecoverySourceService, @@ -136,7 +132,6 @@ public IndicesClusterStateService( threadPool, recoveryTargetService, shardStateAction, - nodeMappingRefreshAction, repositoriesService, searchService, peerRecoverySourceService, @@ -154,7 +149,6 @@ public IndicesClusterStateService( final ThreadPool threadPool, final PeerRecoveryTargetService recoveryTargetService, final ShardStateAction shardStateAction, - final NodeMappingRefreshAction nodeMappingRefreshAction, final RepositoriesService repositoriesService, final SearchService searchService, final PeerRecoverySourceService peerRecoverySourceService, @@ -169,11 +163,9 @@ public IndicesClusterStateService( this.threadPool = threadPool; this.recoveryTargetService = recoveryTargetService; this.shardStateAction = shardStateAction; - this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.repositoriesService = repositoriesService; this.primaryReplicaSyncer = primaryReplicaSyncer; this.retentionLeaseSyncer = retentionLeaseSyncer; - this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); this.client = client; } @@ -488,12 +480,7 @@ private void createIndices(final ClusterState state) { AllocatedIndex indexService = null; try { indexService = indicesService.createIndex(indexMetadata, buildInIndexListener, true); - if (indexService.updateMapping(null, indexMetadata) && sendRefreshMapping) { - nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(), - new NodeMappingRefreshAction.NodeMappingRefreshRequest(indexMetadata.getIndex().getName(), - indexMetadata.getIndexUUID(), state.nodes().getLocalNodeId()) - ); - } + indexService.updateMapping(null, indexMetadata); } catch (Exception e) { final String failShardReason; if (indexService == null) { @@ -531,12 +518,7 @@ private void updateIndices(ClusterChangedEvent event) { } reason = "mapping update failed"; - if (indexService.updateMapping(currentIndexMetadata, newIndexMetadata) && sendRefreshMapping) { - nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(), - new NodeMappingRefreshAction.NodeMappingRefreshRequest(newIndexMetadata.getIndex().getName(), - newIndexMetadata.getIndexUUID(), state.nodes().getLocalNodeId()) - ); - } + indexService.updateMapping(currentIndexMetadata, newIndexMetadata); } catch (Exception e) { indicesService.removeIndex(indexService.index(), FAILURE, "removing index (" + reason + ")"); @@ -845,9 +827,9 @@ public interface AllocatedIndex extends Iterable, IndexCompo void updateMetadata(IndexMetadata currentIndexMetadata, IndexMetadata newIndexMetadata); /** - * Checks if index requires refresh from master. + * Updates the mappings by applying the incoming ones */ - boolean updateMapping(IndexMetadata currentIndexMetadata, IndexMetadata newIndexMetadata) throws IOException; + void updateMapping(IndexMetadata currentIndexMetadata, IndexMetadata newIndexMetadata) throws IOException; /** * Returns shard with given id. diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 106323a97771b..c220807899eb7 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -26,9 +26,9 @@ import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; -import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardLongFieldRange; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndex; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices; @@ -271,9 +271,8 @@ public IndexSettings getIndexSettings() { } @Override - public boolean updateMapping(final IndexMetadata currentIndexMetadata, final IndexMetadata newIndexMetadata) throws IOException { + public void updateMapping(final IndexMetadata currentIndexMetadata, final IndexMetadata newIndexMetadata) throws IOException { failRandomly(); - return false; } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 28e4c15a1340d..a486487519e60 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -498,7 +498,6 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod threadPool, recoveryTargetService, shardStateAction, - null, repositoriesService, null, null, diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 155824dfb682d..df5a75f405b74 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -89,7 +89,6 @@ import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; -import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase; import org.elasticsearch.cluster.coordination.ClusterBootstrapService; @@ -103,10 +102,10 @@ import org.elasticsearch.cluster.coordination.MockSinglePrioritizingExecutor; import org.elasticsearch.cluster.metadata.AliasValidator; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexMetadataVerifier; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; -import org.elasticsearch.cluster.metadata.IndexMetadataVerifier; import org.elasticsearch.cluster.metadata.MetadataMappingService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -1512,7 +1511,6 @@ protected NamedWriteableRegistry writeableRegistry() { threadPool, new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService), shardStateAction, - new NodeMappingRefreshAction(transportService, metadataMappingService), repositoriesService, mock(SearchService.class), peerRecoverySourceService, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 5163db6434569..84fec38e84410 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -179,7 +179,6 @@ static void validate( final IndexMetadata followIndex, final String[] leaderIndexHistoryUUID, final MapperService followerMapperService) { - FollowParameters parameters = request.getParameters(); Map ccrIndexMetadata = followIndex.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); if (ccrIndexMetadata == null) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowActionTests.java index 756b2a42fa09c..4032f4f512827 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowActionTests.java @@ -115,7 +115,7 @@ public void testValidation() throws IOException { IndexMetadata leaderIMD = createIMD("index1", 1, Settings.EMPTY, null); IndexMetadata followIMD = createIMD("index2", 1, Settings.EMPTY, customMetadata); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); - mapperService.updateMapping(null, followIMD); + mapperService.merge(followIMD, MapperService.MergeReason.MAPPING_RECOVERY); Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, mapperService)); assertThat(e.getMessage(), equalTo("the following index [index2] is not ready to follow; " + @@ -128,7 +128,7 @@ public void testValidation() throws IOException { IndexMetadata followIMD = createIMD("index2", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"text\"}}}", 5, Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), customMetadata); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); - mapperService.updateMapping(null, followIMD); + mapperService.merge(followIMD, MapperService.MergeReason.MAPPING_RECOVERY); Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, mapperService)); assertThat(e.getMessage(), equalTo("mapper [field] cannot be changed from type [text] to [keyword]")); } @@ -155,7 +155,7 @@ public void testValidation() throws IOException { IndexMetadata followIMD = createIMD("index2", 5, followingIndexSettings, customMetadata); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followingIndexSettings, "index2"); - mapperService.updateMapping(null, followIMD); + mapperService.merge(followIMD, MapperService.MergeReason.MAPPING_RECOVERY); IllegalArgumentException error = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, mapperService)); assertThat(error.getMessage(), equalTo("the following index [index2] is not ready to follow; " + @@ -167,7 +167,7 @@ public void testValidation() throws IOException { IndexMetadata followIMD = createIMD("index2", 5, Settings.builder() .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), customMetadata); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); - mapperService.updateMapping(null, followIMD); + mapperService.merge(followIMD, MapperService.MergeReason.MAPPING_RECOVERY); validate(request, leaderIMD, followIMD, UUIDs, mapperService); } { @@ -182,7 +182,7 @@ public void testValidation() throws IOException { .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), customMetadata); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followIMD.getSettings(), "index2"); - mapperService.updateMapping(null, followIMD); + mapperService.merge(followIMD, MapperService.MergeReason.MAPPING_RECOVERY); validate(request, leaderIMD, followIMD, UUIDs, mapperService); } { @@ -199,7 +199,7 @@ public void testValidation() throws IOException { .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), customMetadata); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followIMD.getSettings(), "index2"); - mapperService.updateMapping(null, followIMD); + mapperService.merge(followIMD, MapperService.MergeReason.MAPPING_RECOVERY); validate(request, leaderIMD, followIMD, UUIDs, mapperService); } }