Skip to content

Commit

Permalink
Make the TransportRolloverAction execute in one cluster state update (#…
Browse files Browse the repository at this point in the history
…50388)

This commit makes the TransportRolloverAction more resilient, by having it execute
only one cluster state update that creates the new (rollover index), rolls over
the alias from the source to the target index and set the RolloverInfo on the
source index. Before these 3 steps were represented as 3 chained cluster state
updates, which would've seen the user manually intervene if, say, the alias
rollover cluster state update (second in the chain) failed but the creation of
the rollover index (first in the chain) update succeeded

* Rename innerExecute to applyAliasActions

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
andreidan and elasticmachine authored Dec 20, 2019
1 parent fee5b31 commit 1ba4339
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
Expand Down Expand Up @@ -130,7 +129,7 @@ protected void masterOperation(Task task, final RolloverRequest rolloverRequest,
.docs(true);
statsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
client.execute(IndicesStatsAction.INSTANCE, statsRequest,
new ActionListener<IndicesStatsResponse>() {
new ActionListener<>() {
@Override
public void onResponse(IndicesStatsResponse statsResponse) {
final Map<String, Boolean> conditionResults = evaluateConditions(rolloverRequest.getConditions().values(),
Expand All @@ -141,56 +140,44 @@ public void onResponse(IndicesStatsResponse statsResponse) {
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, true, false, false, false));
return;
}
List<Condition<?>> metConditions = rolloverRequest.getConditions().values().stream()
List<Condition<?>> metConditions = rolloverRequest.getConditions().values().stream()
.filter(condition -> conditionResults.get(condition.toString())).collect(Collectors.toList());
if (conditionResults.size() == 0 || metConditions.size() > 0) {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(unresolvedName, rolloverIndexName,
rolloverRequest);
createIndexService.createIndex(updateRequest, ActionListener.wrap(createIndexClusterStateUpdateResponse -> {
final IndicesAliasesClusterStateUpdateRequest aliasesUpdateRequest;
if (explicitWriteIndex) {
aliasesUpdateRequest = prepareRolloverAliasesWriteIndexUpdateRequest(sourceIndexName,
rolloverIndexName, rolloverRequest);
} else {
aliasesUpdateRequest = prepareRolloverAliasesUpdateRequest(sourceIndexName,
rolloverIndexName, rolloverRequest);
CreateIndexClusterStateUpdateRequest createIndexRequest = prepareCreateIndexRequest(unresolvedName,
rolloverIndexName, rolloverRequest);
clusterService.submitStateUpdateTask("rollover_index source [" + sourceIndexName + "] to target ["
+ rolloverIndexName + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexRequest);
newState = indexAliasesService.applyAliasActions(newState,
rolloverAliasToNewIndex(sourceIndexName, rolloverIndexName, rolloverRequest, explicitWriteIndex));
RolloverInfo rolloverInfo = new RolloverInfo(rolloverRequest.getAlias(), metConditions,
threadPool.absoluteTimeInMillis());
return ClusterState.builder(newState)
.metaData(MetaData.builder(newState.metaData())
.put(IndexMetaData.builder(newState.metaData().index(sourceIndexName))
.putRolloverInfo(rolloverInfo))).build();
}
indexAliasesService.indicesAliases(aliasesUpdateRequest,
ActionListener.wrap(aliasClusterStateUpdateResponse -> {
if (aliasClusterStateUpdateResponse.isAcknowledged()) {
clusterService.submitStateUpdateTask("update_rollover_info", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RolloverInfo rolloverInfo = new RolloverInfo(rolloverRequest.getAlias(), metConditions,
threadPool.absoluteTimeInMillis());
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData())
.put(IndexMetaData.builder(currentState.metaData().index(sourceIndexName))
.putRolloverInfo(rolloverInfo))).build();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcknowledged -> listener.onResponse(new RolloverResponse(
sourceIndexName, rolloverIndexName, conditionResults, false, true, true,
isShardsAcknowledged)),
listener::onFailure);
}
});
} else {
listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults,
false, true, false, false));
}
}, listener::onFailure));
}, listener::onFailure));
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (newState.equals(oldState) == false) {
activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcknowledged -> listener.onResponse(new RolloverResponse(
sourceIndexName, rolloverIndexName, conditionResults, false, true, true,
isShardsAcknowledged)),
listener::onFailure);
}
}
});
} else {
// conditions not met
listener.onResponse(
Expand All @@ -207,27 +194,24 @@ public void onFailure(Exception e) {
);
}

static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesUpdateRequest(String oldIndex, String newIndex,
RolloverRequest request) {
final List<AliasAction> actions = List.of(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, null),
new AliasAction.Remove(oldIndex, request.getAlias()));
return new IndicesAliasesClusterStateUpdateRequest(actions)
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
}

static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesWriteIndexUpdateRequest(String oldIndex, String newIndex,
RolloverRequest request) {
final List<AliasAction> actions = List.of(
/**
* Creates the alias actions to reflect the alias rollover from the old (source) index to the new (target/rolled over) index. An
* alias pointing to multiple indices will have to be an explicit write index (ie. the old index alias has is_write_index set to true)
* in which case, after the rollover, the new index will need to be the explicit write index.
*/
static List<AliasAction> rolloverAliasToNewIndex(String oldIndex, String newIndex, RolloverRequest request,
boolean explicitWriteIndex) {
if (explicitWriteIndex) {
return List.of(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, true),
new AliasAction.Add(oldIndex, request.getAlias(), null, null, null, false));
return new IndicesAliasesClusterStateUpdateRequest(actions)
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
} else {
return List.of(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, null),
new AliasAction.Remove(oldIndex, request.getAlias()));
}
}


static String generateRolloverIndexName(String sourceIndexName, IndexNameExpressionResolver indexNameExpressionResolver) {
String resolvedName = indexNameExpressionResolver.resolveDateMathExpression(sourceIndexName);
final boolean isDateMath = sourceIndexName.equals(resolvedName) == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public void onFailure(String source, Exception e) {
* Handles the cluster state transition to a version that reflects the {@link CreateIndexClusterStateUpdateRequest}.
* All the requested changes are firstly validated before mutating the {@link ClusterState}.
*/
ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request) throws Exception {
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request) throws Exception {
logger.trace("executing IndexCreationTask for [{}] against cluster state version [{}]", request, currentState.version());
Index createdIndex = null;
String removalExtraInfo = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,15 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) {
return innerExecute(currentState, request.actions());
return applyAliasActions(currentState, request.actions());
}
});
}

ClusterState innerExecute(ClusterState currentState, Iterable<AliasAction> actions) {
/**
* Handles the cluster state transition to a version that reflects the provided {@link AliasAction}s.
*/
public ClusterState applyAliasActions(ClusterState currentState, Iterable<AliasAction> actions) {
List<Index> indicesToClose = new ArrayList<>();
Map<String, IndexService> indices = new HashMap<>();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
Expand Down Expand Up @@ -219,15 +218,13 @@ public void testEvaluateWithoutMetaData() {
results2.forEach((k, v) -> assertFalse(v));
}

public void testCreateUpdateAliasRequest() {
public void testRolloverAliasActions() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, targetIndex);
final IndicesAliasesClusterStateUpdateRequest updateRequest =
TransportRolloverAction.prepareRolloverAliasesUpdateRequest(sourceIndex, targetIndex, rolloverRequest);

List<AliasAction> actions = updateRequest.actions();
List<AliasAction> actions = TransportRolloverAction.rolloverAliasToNewIndex(sourceIndex, targetIndex, rolloverRequest, false);
assertThat(actions, hasSize(2));
boolean foundAdd = false;
boolean foundRemove = false;
Expand All @@ -246,15 +243,13 @@ public void testCreateUpdateAliasRequest() {
assertTrue(foundRemove);
}

public void testCreateUpdateAliasRequestWithExplicitWriteIndex() {
public void testRolloverAliasActionsWithExplicitWriteIndex() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, targetIndex);
final IndicesAliasesClusterStateUpdateRequest updateRequest =
TransportRolloverAction.prepareRolloverAliasesWriteIndexUpdateRequest(sourceIndex, targetIndex, rolloverRequest);
List<AliasAction> actions = TransportRolloverAction.rolloverAliasToNewIndex(sourceIndex, targetIndex, rolloverRequest, true);

List<AliasAction> actions = updateRequest.actions();
assertThat(actions, hasSize(2));
boolean foundAddWrite = false;
boolean foundRemoveWrite = false;
Expand Down
Loading

0 comments on commit 1ba4339

Please sign in to comment.