Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Throw an error when a datafeed needs CCS but it is not enabled for the node #46044

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ those same roles.
(Required, array) An array of index names. Wildcards are supported. For
example: `["it_ops_metrics", "server*"]`.

NOTE: If any indices are in remote clusters then `cluster.remote.connect` must
not be set to `false` on any ML node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be tied to the previous item I think this needs to be:

+
--
NOTE: If any indices are in remote clusters then `cluster.remote.connect` must
not be set to `false` on any ML node.
--

I.e. + and -- before and -- after.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you often need a blank line after the admonition block (i.e. after the "ML node." and before the final "--"), otherwise it doesn't format properly. If you need any help with the formatting, just let me know.


`job_id`::
(Required, string) A numerical character string that uniquely identifies the
{anomaly-job}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public final class Messages {
public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL =
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {0}" +
" but the current node is not allowed to connect to remote clusters. " +
"Please enable cluster.remote.connect for all machine learning nodes.";
public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {1}" +
" but the current node is not allowed to connect to remote clusters." +
" Please enable cluster.remote.connect for all machine learning nodes (current disabled on [{2}]).";
benwtrent marked this conversation as resolved.
Show resolved Hide resolved

public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable";
public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister,
settings);
settings,
clusterService.getNodeName());
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor, autodetectProcessManager);
this.datafeedManager.set(datafeedManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ public void onFailure(Exception e) {
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
datafeedConfigHolder.get().getId(),
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()))));
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
clusterService.getNodeName())));
} else {
createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ public class DatafeedJobBuilder {
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsPersister jobResultsPersister;
private final boolean remoteClusterSearchSupported;
private final String nodeName;

public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor,
Supplier<Long> currentTimeSupplier, JobConfigProvider jobConfigProvider,
JobResultsProvider jobResultsProvider, DatafeedConfigProvider datafeedConfigProvider,
JobResultsPersister jobResultsPersister, Settings settings) {
JobResultsPersister jobResultsPersister, Settings settings, String nodeName) {
this.client = client;
this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
this.auditor = Objects.requireNonNull(auditor);
Expand All @@ -66,6 +67,7 @@ public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry,
this.datafeedConfigProvider = Objects.requireNonNull(datafeedConfigProvider);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
this.nodeName = nodeName;
}

void build(String datafeedId, ActionListener<DatafeedJob> listener) {
Expand Down Expand Up @@ -183,7 +185,8 @@ void build(String datafeedId, ActionListener<DatafeedJob> listener) {
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
configBuilder.getId(),
remoteIndices)));
remoteIndices,
nodeName)));
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void init() {
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister,
Settings.EMPTY);
Settings.EMPTY,
"test_node");
}

public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception {
Expand Down Expand Up @@ -217,7 +218,8 @@ public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister,
settings);
settings,
"test_node");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
Expand All @@ -228,11 +230,12 @@ public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {

AtomicBoolean wasHandlerCalled = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the handler run in a different thread than the main test thread?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it runs in the same thread but I think because the boolean is updated in a lambda it must be effectively final and you can't do that with a simple boolean it needs to be in some sort of holder.

ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> fail("datafeed builder did not fail when remote index was given and remote clusters were not enabled")
, e -> {
datafeedJob -> fail("datafeed builder did not fail when remote index was given and remote clusters were not enabled"),
e -> {
assertThat(e.getMessage(), equalTo(Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
"datafeed1",
"remotecluster:index-*")));
"[remotecluster:index-*]",
"test_node")));
wasHandlerCalled.compareAndSet(false, true);
}
);
Expand Down