Skip to content

Commit

Permalink
Remove node mapping refresh action (#68909)
Browse files Browse the repository at this point in the history
The mapping refresh action was used when every local node used to process mappings, to eventually ask the master node to refresh its mappings and re-send them to all nodes.

This is no longer needed and could actually be hiding some unexpected situations that we would like to know about and handle specifically.

This commit removes the node refresh action which is not expected to be called by any node, and replaces it with an assertion that verifies that we never end up in a situation where we need a refresh.

Also, the received mappings received by any local node are now applied as they are, without any further merging, as they were already merged by the master node.
  • Loading branch information
javanna authored Feb 15, 2021
1 parent b3d5d32 commit c6ee6d4
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 344 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.cluster;

import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.ComponentTemplateMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplateMetadata;
Expand Down Expand Up @@ -269,7 +268,6 @@ protected void configure() {
bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver);
bind(DelayedAllocationService.class).asEagerSingleton();
bind(ShardStateAction.class).asEagerSingleton();
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(allocationDeciders);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -28,21 +27,17 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperService.MergeReason;
import org.elasticsearch.indices.IndicesService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED;

/**
* Service responsible for submitting mapping changes
*/
Expand All @@ -53,7 +48,6 @@ public class MetadataMappingService {
private final ClusterService clusterService;
private final IndicesService indicesService;

final RefreshTaskExecutor refreshExecutor = new RefreshTaskExecutor();
final PutMappingExecutor putMappingExecutor = new PutMappingExecutor();


Expand All @@ -63,138 +57,6 @@ public MetadataMappingService(ClusterService clusterService, IndicesService indi
this.indicesService = indicesService;
}

static class RefreshTask {
final String index;
final String indexUUID;

RefreshTask(String index, final String indexUUID) {
this.index = index;
this.indexUUID = indexUUID;
}

@Override
public String toString() {
return "[" + index + "][" + indexUUID + "]";
}
}

class RefreshTaskExecutor implements ClusterStateTaskExecutor<RefreshTask> {
@Override
public ClusterTasksResult<RefreshTask> execute(ClusterState currentState, List<RefreshTask> tasks) throws Exception {
ClusterState newClusterState = executeRefresh(currentState, tasks);
return ClusterTasksResult.<RefreshTask>builder().successes(tasks).build(newClusterState);
}
}

/**
* Batch method to apply all the queued refresh operations. The idea is to try and batch as much
* as possible so we won't create the same index all the time for example for the updates on the same mapping
* and generate a single cluster change event out of all of those.
*/
ClusterState executeRefresh(final ClusterState currentState, final List<RefreshTask> allTasks) throws Exception {
// break down to tasks per index, so we can optimize the on demand index service creation
// to only happen for the duration of a single index processing of its respective events
Map<String, List<RefreshTask>> tasksPerIndex = new HashMap<>();
for (RefreshTask task : allTasks) {
if (task.index == null) {
logger.debug("ignoring a mapping task of type [{}] with a null index.", task);
}
tasksPerIndex.computeIfAbsent(task.index, k -> new ArrayList<>()).add(task);
}

boolean dirty = false;
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());

for (Map.Entry<String, List<RefreshTask>> entry : tasksPerIndex.entrySet()) {
IndexMetadata indexMetadata = mdBuilder.get(entry.getKey());
if (indexMetadata == null) {
// index got deleted on us, ignore...
logger.debug("[{}] ignoring tasks - index meta data doesn't exist", entry.getKey());
continue;
}
final Index index = indexMetadata.getIndex();
// the tasks lists to iterate over, filled with the list of mapping tasks, trying to keep
// the latest (based on order) update mapping one per node
List<RefreshTask> allIndexTasks = entry.getValue();
boolean hasTaskWithRightUUID = false;
for (RefreshTask task : allIndexTasks) {
if (indexMetadata.isSameUUID(task.indexUUID)) {
hasTaskWithRightUUID = true;
} else {
logger.debug("{} ignoring task [{}] - index meta data doesn't match task uuid", index, task);
}
}
if (hasTaskWithRightUUID == false) {
continue;
}

// construct the actual index if needed, and make sure the relevant mappings are there
boolean removeIndex = false;
IndexService indexService = indicesService.indexService(indexMetadata.getIndex());
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetadata, Collections.emptyList(), false);
removeIndex = true;
indexService.mapperService().merge(indexMetadata, MergeReason.MAPPING_RECOVERY);
}

IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata);
try {
boolean indexDirty = refreshIndexMapping(indexService, builder);
if (indexDirty) {
mdBuilder.put(builder);
dirty = true;
}
} finally {
if (removeIndex) {
indicesService.removeIndex(index, NO_LONGER_ASSIGNED, "created for mapping processing");
}
}
}

if (dirty == false) {
return currentState;
}
return ClusterState.builder(currentState).metadata(mdBuilder).build();
}

private boolean refreshIndexMapping(IndexService indexService, IndexMetadata.Builder builder) {
boolean dirty = false;
String index = indexService.index().getName();
try {
MapperService mapperService = indexService.mapperService();
DocumentMapper mapper = mapperService.documentMapper();
if (mapper != null) {
if (mapper.mappingSource().equals(builder.mapping().source()) == false) {
dirty = true;
}
}

// if the mapping is not up-to-date, re-send everything
if (dirty) {
logger.warn("[{}] re-syncing mappings with cluster state]", index);
builder.putMapping(new MappingMetadata(mapper));

}
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to refresh-mapping in cluster state", index), e);
}
return dirty;
}

/**
* Refreshes mappings if they are not the same between original and parsed version
*/
public void refreshMapping(final String index, final String indexUUID) {
final RefreshTask refreshTask = new RefreshTask(index, indexUUID);
clusterService.submitStateUpdateTask("refresh-mapping [" + index + "]",
refreshTask,
ClusterStateTaskConfig.build(Priority.HIGH),
refreshExecutor,
(source, e) -> logger.warn(() -> new ParameterizedMessage("failure during [{}]", source), e)
);
}

class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> {
@Override
public ClusterTasksResult<PutMappingClusterStateUpdateRequest>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,10 @@ List<SearchOperationListener> getSearchOperationListener() { // pkg private for
return searchOperationListeners;
}

@Override
public boolean updateMapping(final IndexMetadata currentIndexMetadata, final IndexMetadata newIndexMetadata) throws IOException {
if (mapperService == null) {
return false;
public void updateMapping(final IndexMetadata currentIndexMetadata, final IndexMetadata newIndexMetadata) throws IOException {
if (mapperService != null) {
mapperService.updateMapping(currentIndexMetadata, newIndexMetadata);
}
return mapperService.updateMapping(currentIndexMetadata, newIndexMetadata);
}

private class StoreCloseListener implements Store.OnClose {
Expand Down
Loading

0 comments on commit c6ee6d4

Please sign in to comment.