Skip to content

Commit

Permalink
[ML] Throw an error when a datafeed needs CCS but it is not enabled f…
Browse files Browse the repository at this point in the history
…or the node (elastic#46044) (elastic#46181)

Though we allow CCS within datafeeds, users could prevent nodes from accessing remote clusters. This can cause mysterious errors and difficult to troubleshoot.

This commit adds a check to verify that `cluster.remote.connect` is enabled on the current node when a datafeed is configured with a remote index pattern.
  • Loading branch information
benwtrent authored Sep 3, 2019
1 parent cc9e3af commit b4ed711
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ those same roles.
`indices`::
(Required, array) An array of index names. Wildcards are supported. For
example: `["it_ops_metrics", "server*"]`.
+
--
NOTE: If any indices are in remote clusters then `cluster.remote.connect` must
not be set to `false` on any ML node.
--

`job_id`::
(Required, string) A numerical character string that uniquely identifies the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public final class Messages {
public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL =
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {1}" +
" but the current node [{2}] is not allowed to connect to remote clusters." +
" Please enable cluster.remote.connect for all machine learning nodes.";

public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable";
public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,9 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
jobConfigProvider,
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister);
jobResultsPersister,
settings,
clusterService.getNodeName());
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, anomalyDetectionAuditor, autodetectProcessManager);
this.datafeedManager.set(datafeedManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand All @@ -46,6 +47,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
Expand Down Expand Up @@ -84,6 +86,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private final AnomalyDetectionAuditor auditor;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
private final NamedXContentRegistry xContentRegistry;
private final boolean remoteClusterSearchSupported;

@Inject
public TransportStartDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool,
Expand All @@ -102,6 +105,7 @@ public TransportStartDatafeedAction(Settings settings, TransportService transpor
this.auditor = auditor;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.xContentRegistry = xContentRegistry;
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
}

static void validate(Job job,
Expand Down Expand Up @@ -180,7 +184,7 @@ public void onFailure(Exception e) {
};

// Verify data extractor factory can be created, then start persistent task
Consumer<Job> createDataExtrator = job -> {
Consumer<Job> createDataExtractor = job -> {
if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) {
final RemoteClusterLicenseChecker remoteClusterLicenseChecker =
new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode);
Expand All @@ -192,6 +196,13 @@ public void onFailure(Exception e) {
response -> {
if (response.isSuccess() == false) {
listener.onFailure(createUnlicensedError(params.getDatafeedId(), response));
} else if (remoteClusterSearchSupported == false) {
listener.onFailure(
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
datafeedConfigHolder.get().getId(),
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
clusterService.getNodeName())));
} else {
createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
}
Expand All @@ -213,7 +224,7 @@ public void onFailure(Exception e) {
Job job = jobBuilder.build();
validate(job, datafeedConfigHolder.get(), tasks, xContentRegistry);
auditDeprecations(datafeedConfigHolder.get(), job, auditor, xContentRegistry);
createDataExtrator.accept(job);
createDataExtractor.accept(job);
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
Expand All @@ -30,6 +35,7 @@
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -45,11 +51,13 @@ public class DatafeedJobBuilder {
private final JobResultsProvider jobResultsProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsPersister jobResultsPersister;
private final boolean remoteClusterSearchSupported;
private final String nodeName;

public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor,
Supplier<Long> currentTimeSupplier, JobConfigProvider jobConfigProvider,
JobResultsProvider jobResultsProvider, DatafeedConfigProvider datafeedConfigProvider,
JobResultsPersister jobResultsPersister) {
JobResultsPersister jobResultsPersister, Settings settings, String nodeName) {
this.client = client;
this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
this.auditor = Objects.requireNonNull(auditor);
Expand All @@ -58,6 +66,8 @@ public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry,
this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
this.datafeedConfigProvider = Objects.requireNonNull(datafeedConfigProvider);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
this.nodeName = nodeName;
}

void build(String datafeedId, ActionListener<DatafeedJob> listener) {
Expand Down Expand Up @@ -168,6 +178,18 @@ void build(String datafeedId, ActionListener<DatafeedJob> listener) {
configBuilder -> {
try {
datafeedConfigHolder.set(configBuilder.build());
if (remoteClusterSearchSupported == false) {
List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices());
if (remoteIndices.isEmpty() == false) {
listener.onFailure(
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
configBuilder.getId(),
remoteIndices,
nodeName)));
return;
}
}
jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), jobConfigListener);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
Expand Down Expand Up @@ -92,7 +94,9 @@ public void init() {
jobConfigProvider,
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister);
jobResultsPersister,
Settings.EMPTY,
"test_node");
}

public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception {
Expand Down Expand Up @@ -202,6 +206,46 @@ public void testBuild_GivenBucketsRequestFails() {
verify(taskHandler).accept(error);
}

public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {
Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build();
datafeedJobBuilder =
new DatafeedJobBuilder(
client,
xContentRegistry(),
auditor,
System::currentTimeMillis,
jobConfigProvider,
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister,
settings,
"test_node");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
jobBuilder.setCreateTime(new Date());
DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId());
datafeed.setIndices(Collections.singletonList("remotecluster:index-*"));

AtomicBoolean wasHandlerCalled = new AtomicBoolean(false);
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> fail("datafeed builder did not fail when remote index was given and remote clusters were not enabled"),
e -> {
assertThat(e.getMessage(), equalTo(Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
"datafeed1",
"[remotecluster:index-*]",
"test_node")));
wasHandlerCalled.compareAndSet(false, true);
}
);

givenJob(jobBuilder);
givenDatafeed(datafeed);
datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}

private void givenJob(Job.Builder job) {
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Expand Down

0 comments on commit b4ed711

Please sign in to comment.