Skip to content

Commit

Permalink
Enable reading auto-follow patterns from x-content (elastic#40130)
Browse files Browse the repository at this point in the history
This named writable was never registered, so it means that we could not
read auto-follow patterns that were registered in the cluster
state. This causes them to be lost on restarts, a bad bug. This commit
addresses this by registering this named writable, and we add a basic
CCR restart test to ensure that CCR keeps functioning properly when the
follower is restarted.
  • Loading branch information
jasontedor committed Mar 19, 2019
1 parent 53325d8 commit def6616
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 10 deletions.
61 changes: 61 additions & 0 deletions x-pack/plugin/ccr/qa/restart/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import org.elasticsearch.gradle.test.RestIntegTestTask

apply plugin: 'elasticsearch.standalone-test'

dependencies {
testCompile project(':x-pack:plugin:ccr:qa')
}

task leaderClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
}

leaderClusterTestCluster {
numNodes = 1
clusterName = 'leader-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'node.name', 'leader'
}

leaderClusterTestRunner {
systemProperty 'tests.target_cluster', 'leader'
}

task followClusterTest(type: RestIntegTestTask) {}

followClusterTestCluster {
dependsOn leaderClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.monitoring.collection.enabled', 'true'
setting 'xpack.license.self_generated.type', 'trial'
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'node.name', 'follow'
}

followClusterTestRunner {
systemProperty 'tests.target_cluster', 'follow'
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
}

task followClusterRestartTest(type: RestIntegTestTask) {}

followClusterRestartTestCluster {
dependsOn followClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
dataDir = { nodeNumber -> followClusterTest.nodes[0].dataDir }
setting 'xpack.monitoring.collection.enabled', 'true'
setting 'xpack.license.self_generated.type', 'trial'
setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
setting 'node.name', 'follow'
}

followClusterRestartTestRunner {
systemProperty 'tests.target_cluster', 'follow-restart'
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
finalizedBy 'leaderClusterTestCluster#stop'
}

check.dependsOn followClusterRestartTest
unitTest.enabled = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;

import java.io.IOException;

public class RestartIT extends ESCCRRestTestCase {

public void testRestart() throws Exception {
final int numberOfDocuments = 128;
final String testsTargetCluster = System.getProperty("tests.target_cluster");
switch (testsTargetCluster) {
case "leader": {
// create a single index "leader" on the leader
createIndexAndIndexDocuments("leader", numberOfDocuments, client());
break;
}
case "follow": {
// follow "leader" with "follow-leader" on the follower
followIndex("leader", "follow-leader");
verifyFollower("follow-leader", numberOfDocuments, client());

// now create an auto-follow pattern for "leader-*"
final Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
putPatternRequest.setJsonEntity("{" +
"\"leader_index_patterns\": [\"leader-*\"]," +
"\"remote_cluster\": \"leader_cluster\"," +
"\"follow_index_pattern\":\"follow-{{leader_index}}\"}");
assertOK(client().performRequest(putPatternRequest));
try (RestClient leaderClient = buildLeaderClient()) {
// create "leader-1" on the leader, which should be replicated to "follow-leader-1" on the follower
createIndexAndIndexDocuments("leader-1", numberOfDocuments, leaderClient);
// the follower should catch up
verifyFollower("follow-leader-1", numberOfDocuments, client());
}
break;
}
case "follow-restart": {
try (RestClient leaderClient = buildLeaderClient()) {
// create "leader-2" on the leader, and index some additional documents into existing indices
createIndexAndIndexDocuments("leader-2", numberOfDocuments, leaderClient);
for (final String index : new String[]{"leader", "leader-1", "leader-2"}) {
indexDocuments(index, numberOfDocuments, numberOfDocuments, leaderClient);
}
// the followers should catch up
for (final String index : new String[]{"follow-leader", "follow-leader-1", "follow-leader-2"}) {
logger.info("verifying {} using {}", index, client().getNodes());
verifyFollower(index, 2 * numberOfDocuments, client());
}
// one more index "leader-3" on the follower
createIndexAndIndexDocuments("leader-3", 2 * numberOfDocuments, leaderClient);
// the follower should catch up
verifyFollower("follow-leader-3", 2 * numberOfDocuments, client());
}
break;
}
default: {
throw new IllegalArgumentException("unexpected value [" + testsTargetCluster + "] for tests.target_cluster");
}
}
}

private void createIndexAndIndexDocuments(final String index, final int numberOfDocuments, final RestClient client) throws IOException {
final Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
final Request createIndexRequest = new Request("PUT", "/" + index);
createIndexRequest.setJsonEntity("{\"settings\":" + Strings.toString(settings) + "}");
assertOK(client.performRequest(createIndexRequest));
indexDocuments(index, numberOfDocuments, 0, client);
}

private void indexDocuments(
final String index,
final int numberOfDocuments,
final int initial,
final RestClient client) throws IOException {
for (int i = 0, j = initial; i < numberOfDocuments; i++, j++) {
index(client, index, Integer.toString(j), "field", j);
}
assertOK(client.performRequest(new Request("POST", "/" + index + "/_refresh")));
}

private void verifyFollower(final String index, final int numberOfDocuments, final RestClient client) throws Exception {
assertBusy(() -> {
ensureYellow(index, client);
verifyDocuments(index, numberOfDocuments, "*:*", client);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ protected static void verifyDocuments(final String index,
Map<String, ?> response = toMap(client.performRequest(request));

int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(numDocs, equalTo(expectedNumDocs));
assertThat(index, numDocs, equalTo(expectedNumDocs));

List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
assertThat(hits.size(), equalTo(expectedNumDocs));
for (int i = 0; i < expectedNumDocs; i++) {
int value = (int) XContentMapValues.extractValue("_source.field", (Map<?, ?>) hits.get(i));
assertThat(i, equalTo(value));
assertThat(index, i, equalTo(value));
}
}

Expand Down Expand Up @@ -205,15 +205,19 @@ protected static Map<String, Object> toMap(String response) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
}

protected static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
protected static void ensureYellow(final String index) throws IOException {
ensureYellow(index, adminClient());
}

protected static void ensureYellow(final String index, final RestClient client) throws IOException {
final Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_active_shards", "1");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("timeout", "5s");
request.addParameter("level", "shards");
adminClient().performRequest(request);
client.performRequest(request);
}

protected int countCcrNodeTasks() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -84,6 +85,7 @@
import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.CCRFeatureSet;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
Expand Down Expand Up @@ -275,11 +277,17 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {

public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Arrays.asList(
// Persistent action requests
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ShardFollowTask.NAME),
// auto-follow metadata, persisted into the cluster state as XContent
new NamedXContentRegistry.Entry(
MetaData.Custom.class,
new ParseField(AutoFollowMetadata.TYPE),
AutoFollowMetadata::fromXContent),
// persistent action requests
new NamedXContentRegistry.Entry(
PersistentTaskParams.class,
new ParseField(ShardFollowTask.NAME),
ShardFollowTask::fromXContent),

// Task statuses
// task statuses
new NamedXContentRegistry.Entry(
ShardFollowNodeTaskStatus.class,
new ParseField(ShardFollowNodeTaskStatus.STATUS_PARSER_NAME),
Expand Down

0 comments on commit def6616

Please sign in to comment.