From def66165be2ff57dc79bac08ffbc0a939037aa78 Mon Sep 17 00:00:00 2001
From: Jason Tedor <jason@tedor.me>
Date: Mon, 18 Mar 2019 21:47:52 -0400
Subject: [PATCH] Enable reading auto-follow patterns from x-content (#40130)

This named writable was never registered, so it means that we could not
read auto-follow patterns that were registered in the cluster
state. This causes them to be lost on restarts, a bad bug. This commit
addresses this by registering this named writable, and we add a basic
CCR restart test to ensure that CCR keeps functioning properly when the
follower is restarted.
---
 x-pack/plugin/ccr/qa/restart/build.gradle     | 61 ++++++++++++
 .../elasticsearch/xpack/ccr/RestartIT.java    | 99 +++++++++++++++++++
 .../xpack/ccr/ESCCRRestTestCase.java          | 16 +--
 .../java/org/elasticsearch/xpack/ccr/Ccr.java | 16 ++-
 4 files changed, 182 insertions(+), 10 deletions(-)
 create mode 100644 x-pack/plugin/ccr/qa/restart/build.gradle
 create mode 100644 x-pack/plugin/ccr/qa/restart/src/test/java/org/elasticsearch/xpack/ccr/RestartIT.java

diff --git a/x-pack/plugin/ccr/qa/restart/build.gradle b/x-pack/plugin/ccr/qa/restart/build.gradle
new file mode 100644
index 0000000000000..59082d7819533
--- /dev/null
+++ b/x-pack/plugin/ccr/qa/restart/build.gradle
@@ -0,0 +1,61 @@
+import org.elasticsearch.gradle.test.RestIntegTestTask
+
+apply plugin: 'elasticsearch.standalone-test'
+
+dependencies {
+    testCompile project(':x-pack:plugin:ccr:qa')
+}
+
+task leaderClusterTest(type: RestIntegTestTask) {
+    mustRunAfter(precommit)
+}
+
+leaderClusterTestCluster {
+    numNodes = 1
+    clusterName = 'leader-cluster'
+    setting 'xpack.license.self_generated.type', 'trial'
+    setting 'node.name', 'leader'
+}
+
+leaderClusterTestRunner {
+    systemProperty 'tests.target_cluster', 'leader'
+}
+
+task followClusterTest(type: RestIntegTestTask) {}
+
+followClusterTestCluster {
+    dependsOn leaderClusterTestRunner
+    numNodes = 1
+    clusterName = 'follow-cluster'
+    setting 'xpack.monitoring.collection.enabled', 'true'
+    setting 'xpack.license.self_generated.type', 'trial'
+    setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
+    setting 'node.name', 'follow'
+}
+
+followClusterTestRunner {
+    systemProperty 'tests.target_cluster', 'follow'
+    systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
+}
+
+task followClusterRestartTest(type: RestIntegTestTask) {}
+
+followClusterRestartTestCluster {
+    dependsOn followClusterTestRunner
+    numNodes = 1
+    clusterName = 'follow-cluster'
+    dataDir = { nodeNumber -> followClusterTest.nodes[0].dataDir }
+    setting 'xpack.monitoring.collection.enabled', 'true'
+    setting 'xpack.license.self_generated.type', 'trial'
+    setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
+    setting 'node.name', 'follow'
+}
+
+followClusterRestartTestRunner {
+    systemProperty 'tests.target_cluster', 'follow-restart'
+    systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
+    finalizedBy 'leaderClusterTestCluster#stop'
+}
+
+check.dependsOn followClusterRestartTest
+unitTest.enabled = false
diff --git a/x-pack/plugin/ccr/qa/restart/src/test/java/org/elasticsearch/xpack/ccr/RestartIT.java b/x-pack/plugin/ccr/qa/restart/src/test/java/org/elasticsearch/xpack/ccr/RestartIT.java
new file mode 100644
index 0000000000000..cbbc6945034c0
--- /dev/null
+++ b/x-pack/plugin/ccr/qa/restart/src/test/java/org/elasticsearch/xpack/ccr/RestartIT.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ccr;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexSettings;
+
+import java.io.IOException;
+
+public class RestartIT extends ESCCRRestTestCase {
+
+    public void testRestart() throws Exception {
+        final int numberOfDocuments = 128;
+        final String testsTargetCluster = System.getProperty("tests.target_cluster");
+        switch (testsTargetCluster) {
+            case "leader": {
+                // create a single index "leader" on the leader
+                createIndexAndIndexDocuments("leader", numberOfDocuments, client());
+                break;
+            }
+            case "follow": {
+                // follow "leader" with "follow-leader" on the follower
+                followIndex("leader", "follow-leader");
+                verifyFollower("follow-leader", numberOfDocuments, client());
+
+                // now create an auto-follow pattern for "leader-*"
+                final Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
+                putPatternRequest.setJsonEntity("{" +
+                        "\"leader_index_patterns\": [\"leader-*\"]," +
+                        "\"remote_cluster\": \"leader_cluster\"," +
+                        "\"follow_index_pattern\":\"follow-{{leader_index}}\"}");
+                assertOK(client().performRequest(putPatternRequest));
+                try (RestClient leaderClient = buildLeaderClient()) {
+                    // create "leader-1" on the leader, which should be replicated to "follow-leader-1" on the follower
+                    createIndexAndIndexDocuments("leader-1", numberOfDocuments, leaderClient);
+                    // the follower should catch up
+                    verifyFollower("follow-leader-1", numberOfDocuments, client());
+                }
+                break;
+            }
+            case "follow-restart": {
+                try (RestClient leaderClient = buildLeaderClient()) {
+                    // create "leader-2" on the leader, and index some additional documents into existing indices
+                    createIndexAndIndexDocuments("leader-2", numberOfDocuments, leaderClient);
+                    for (final String index : new String[]{"leader", "leader-1", "leader-2"}) {
+                        indexDocuments(index, numberOfDocuments, numberOfDocuments, leaderClient);
+                    }
+                    // the followers should catch up
+                    for (final String index : new String[]{"follow-leader", "follow-leader-1", "follow-leader-2"}) {
+                        logger.info("verifying {} using {}", index, client().getNodes());
+                        verifyFollower(index, 2 * numberOfDocuments, client());
+                    }
+                    // one more index "leader-3" on the follower
+                    createIndexAndIndexDocuments("leader-3", 2 * numberOfDocuments, leaderClient);
+                    // the follower should catch up
+                    verifyFollower("follow-leader-3", 2 * numberOfDocuments, client());
+                }
+                break;
+            }
+            default: {
+                throw new IllegalArgumentException("unexpected value [" + testsTargetCluster + "] for tests.target_cluster");
+            }
+        }
+    }
+
+    private void createIndexAndIndexDocuments(final String index, final int numberOfDocuments, final RestClient client) throws IOException {
+        final Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
+        final Request createIndexRequest = new Request("PUT", "/" + index);
+        createIndexRequest.setJsonEntity("{\"settings\":" + Strings.toString(settings) + "}");
+        assertOK(client.performRequest(createIndexRequest));
+        indexDocuments(index, numberOfDocuments, 0, client);
+    }
+
+    private void indexDocuments(
+            final String index,
+            final int numberOfDocuments,
+            final int initial,
+            final RestClient client) throws IOException {
+        for (int i = 0, j = initial; i < numberOfDocuments; i++, j++) {
+            index(client, index, Integer.toString(j), "field", j);
+        }
+        assertOK(client.performRequest(new Request("POST", "/" + index + "/_refresh")));
+    }
+
+    private void verifyFollower(final String index, final int numberOfDocuments, final RestClient client) throws Exception {
+        assertBusy(() -> {
+            ensureYellow(index, client);
+            verifyDocuments(index, numberOfDocuments, "*:*", client);
+        });
+    }
+
+}
diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java
index 3d5c8610a1af4..33e7c2f2bf177 100644
--- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java
+++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java
@@ -120,13 +120,13 @@ protected static void verifyDocuments(final String index,
         Map<String, ?> response = toMap(client.performRequest(request));
 
         int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
-        assertThat(numDocs, equalTo(expectedNumDocs));
+        assertThat(index, numDocs, equalTo(expectedNumDocs));
 
         List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
         assertThat(hits.size(), equalTo(expectedNumDocs));
         for (int i = 0; i < expectedNumDocs; i++) {
             int value = (int) XContentMapValues.extractValue("_source.field", (Map<?, ?>) hits.get(i));
-            assertThat(i, equalTo(value));
+            assertThat(index, i, equalTo(value));
         }
     }
 
@@ -205,15 +205,19 @@ protected static Map<String, Object> toMap(String response) {
         return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
     }
 
-    protected static void ensureYellow(String index) throws IOException {
-        Request request = new Request("GET", "/_cluster/health/" + index);
+    protected static void ensureYellow(final String index) throws IOException {
+        ensureYellow(index, adminClient());
+    }
+
+    protected static void ensureYellow(final String index, final RestClient client) throws IOException {
+        final Request request = new Request("GET", "/_cluster/health/" + index);
         request.addParameter("wait_for_status", "yellow");
         request.addParameter("wait_for_active_shards", "1");
         request.addParameter("wait_for_no_relocating_shards", "true");
         request.addParameter("wait_for_no_initializing_shards", "true");
-        request.addParameter("timeout", "70s");
+        request.addParameter("timeout", "5s");
         request.addParameter("level", "shards");
-        adminClient().performRequest(request);
+        client.performRequest(request);
     }
 
     protected int countCcrNodeTasks() throws IOException {
diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java
index de5580244e078..7fa7f37f4b71d 100644
--- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java
+++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java
@@ -12,6 +12,7 @@
 import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.ParseField;
@@ -84,6 +85,7 @@
 import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
 import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
 import org.elasticsearch.xpack.core.XPackPlugin;
+import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
 import org.elasticsearch.xpack.core.ccr.CCRFeatureSet;
 import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
 import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
@@ -275,11 +277,17 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
 
     public List<NamedXContentRegistry.Entry> getNamedXContent() {
         return Arrays.asList(
-                // Persistent action requests
-                new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ShardFollowTask.NAME),
+                // auto-follow metadata, persisted into the cluster state as XContent
+                new NamedXContentRegistry.Entry(
+                        MetaData.Custom.class,
+                        new ParseField(AutoFollowMetadata.TYPE),
+                        AutoFollowMetadata::fromXContent),
+                // persistent action requests
+                new NamedXContentRegistry.Entry(
+                        PersistentTaskParams.class,
+                        new ParseField(ShardFollowTask.NAME),
                         ShardFollowTask::fromXContent),
-
-                // Task statuses
+                // task statuses
                 new NamedXContentRegistry.Entry(
                         ShardFollowNodeTaskStatus.class,
                         new ParseField(ShardFollowNodeTaskStatus.STATUS_PARSER_NAME),