diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 8177f813eaa7e..946e654034efc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -267,7 +267,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public static final Setting MAX_MACHINE_MEMORY_PERCENT = Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 90, Property.Dynamic, Property.NodeScope); public static final Setting MAX_LAZY_ML_NODES = - Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope); + Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope); private static final Logger logger = LogManager.getLogger(XPackPlugin.class); @@ -303,7 +303,8 @@ public List> getSettings() { AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE, AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE, - AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP)); + AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP, + MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION)); } public Settings additionalSettings() { @@ -439,7 +440,7 @@ public Collection createComponents(Client client, ClusterService cluster jobDataCountsPersister, datafeedManager, auditor, - new MlAssignmentNotifier(auditor, threadPool, client, clusterService), + new MlAssignmentNotifier(settings, auditor, threadPool, client, clusterService), memoryTracker ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index ead55aa10cbd4..2a850ce9c14dc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; @@ -37,10 +38,10 @@ public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMast private final ThreadPool threadPool; private final AtomicBoolean enabled = new AtomicBoolean(false); - MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) { + MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) { this.auditor = auditor; this.clusterService = clusterService; - this.mlConfigMigrator = new MlConfigMigrator(client, clusterService); + this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService); this.threadPool = threadPool; clusterService.addLocalNodeMasterListener(this); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java new file mode 100644 index 0000000000000..0f127919ac3d0 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrationEligibilityCheck.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.job.config.Job; + +/** + * Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds) + * are eligible to be migrated from the cluster state into the config index + */ +public class MlConfigMigrationEligibilityCheck { + + private static final Version MIN_NODE_VERSION = Version.V_6_6_0; + + public static final Setting ENABLE_CONFIG_MIGRATION = Setting.boolSetting( + "xpack.ml.enable_config_migration", true, Setting.Property.Dynamic, Setting.Property.NodeScope); + + private volatile boolean isConfigMigrationEnabled; + + public MlConfigMigrationEligibilityCheck(Settings settings, ClusterService clusterService) { + isConfigMigrationEnabled = ENABLE_CONFIG_MIGRATION.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(ENABLE_CONFIG_MIGRATION, this::setConfigMigrationEnabled); + } + + private void setConfigMigrationEnabled(boolean configMigrationEnabled) { + this.isConfigMigrationEnabled = configMigrationEnabled; + } + + /** + * Can migration start? Returns: + * False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION} + * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} + * True otherwise + * @param clusterState The cluster state + * @return A boolean that dictates if config migration can start + */ + public boolean canStartMigration(ClusterState clusterState) { + if (isConfigMigrationEnabled == false) { + return false; + } + + Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); + if (minNodeVersion.before(MIN_NODE_VERSION)) { + return false; + } + return true; + } + + /** + * Is the job a eligible for migration? Returns: + * False if {@link #canStartMigration(ClusterState)} returns {@code false} + * False if the {@link Job#isDeleting()} + * False if the job has a persistent task + * True otherwise i.e. the job is present, not deleting + * and does not have a persistent task. + * + * @param jobId The job Id + * @param clusterState The cluster state + * @return A boolean depending on the conditions listed above + */ + public boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) { + if (canStartMigration(clusterState) == false) { + return false; + } + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + Job job = mlMetadata.getJobs().get(jobId); + + if (job == null || job.isDeleting()) { + return false; + } + + PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + return MlTasks.openJobIds(persistentTasks).contains(jobId) == false; + } + + /** + * Is the datafeed a eligible for migration? Returns: + * False if {@link #canStartMigration(ClusterState)} returns {@code false} + * False if the datafeed is not in the cluster state + * False if the datafeed has a persistent task + * True otherwise i.e. the datafeed is present and does not have a persistent task. + * + * @param datafeedId The datafeed Id + * @param clusterState The cluster state + * @return A boolean depending on the conditions listed above + */ + public boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) { + if (canStartMigration(clusterState) == false) { + return false; + } + + MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); + if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) { + return false; + } + + PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); + return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 2490b820440bb..e400fe0df4b93 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -41,6 +42,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -80,18 +82,19 @@ public class MlConfigMigrator { private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class); public static final String MIGRATED_FROM_VERSION = "migrated from version"; - public static final Version MIN_NODE_VERSION = Version.V_6_6_0; static final int MAX_BULK_WRITE_SIZE = 100; private final Client client; private final ClusterService clusterService; + private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; private final AtomicBoolean migrationInProgress; - public MlConfigMigrator(Client client, ClusterService clusterService) { - this.client = client; - this.clusterService = clusterService; + public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) { + this.client = Objects.requireNonNull(client); + this.clusterService = Objects.requireNonNull(clusterService); + this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); this.migrationInProgress = new AtomicBoolean(false); } @@ -114,9 +117,8 @@ public MlConfigMigrator(Client client, ClusterService clusterService) { */ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener listener) { - Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); - if (minNodeVersion.before(MIN_NODE_VERSION)) { - listener.onResponse(Boolean.FALSE); + if (migrationEligibilityCheck.canStartMigration(clusterState) == false) { + listener.onResponse(false); return; } @@ -454,60 +456,4 @@ static List filterFailedDatafeedConfigWrites(Set failedDocumentI .filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false) .collect(Collectors.toList()); } - - /** - * Is the job a eligible for migration? Returns: - * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} - * False if the job is not in the cluster state - * False if the {@link Job#isDeleting()} - * False if the job has a persistent task - * True otherwise i.e. the job is present, not deleting - * and does not have a persistent task. - * - * @param jobId The job Id - * @param clusterState clusterstate - * @return A boolean depending on the conditions listed above - */ - public static boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) { - Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); - if (minNodeVersion.before(MIN_NODE_VERSION)) { - return false; - } - - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - Job job = mlMetadata.getJobs().get(jobId); - - if (job == null || job.isDeleting()) { - return false; - } - - PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - return MlTasks.openJobIds(persistentTasks).contains(jobId) == false; - } - - /** - * Is the datafeed a eligible for migration? Returns: - * False if the min node version of the cluster is before {@link #MIN_NODE_VERSION} - * False if the datafeed is not in the cluster state - * False if the datafeed has a persistent task - * True otherwise i.e. the datafeed is present and does not have a persistent task. - * - * @param datafeedId The datafeed Id - * @param clusterState clusterstate - * @return A boolean depending on the conditions listed above - */ - public static boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) { - Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); - if (minNodeVersion.before(MIN_NODE_VERSION)) { - return false; - } - - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState); - if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) { - return false; - } - - PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false; - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index 42be2c7b9ddcd..97e437ec4caf2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; @@ -28,7 +29,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -40,9 +41,10 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction listener) { - if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) { + if (migrationEligibilityCheck.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete datafeed", request.getDatafeedId())); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 1c4505e93afbb..481a144d7fed0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.IdsQueryBuilder; @@ -63,7 +64,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; @@ -96,6 +97,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction>> listenersByJobId; @Inject - public TransportDeleteJobAction(TransportService transportService, ClusterService clusterService, + public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService, Client client, Auditor auditor, JobResultsProvider jobResultsProvider, @@ -121,6 +123,7 @@ public TransportDeleteJobAction(TransportService transportService, ClusterServic this.jobConfigProvider = jobConfigProvider; this.datafeedConfigProvider = datafeedConfigProvider; this.memoryTracker = memoryTracker; + this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); this.listenersByJobId = new HashMap<>(); } @@ -148,7 +151,7 @@ protected void masterOperation(DeleteJobAction.Request request, ClusterState sta protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state, ActionListener listener) { - if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) { + if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete job", request.getJobId())); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 433d1b420303f..557ebd8f87b04 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -67,7 +67,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -107,9 +107,10 @@ public class TransportOpenJobAction extends TransportMasterNodeAction listener) { - if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) { + if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("open job", request.getJobParams().getJobId())); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index cf15f6ab41a66..a940d6666c9fd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -25,7 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter; @@ -41,9 +42,10 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio private final JobManager jobManager; private final JobResultsProvider jobResultsProvider; private final JobDataCountsPersister jobDataCountsPersister; + private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; @Inject - public TransportRevertModelSnapshotAction(ThreadPool threadPool, TransportService transportService, + public TransportRevertModelSnapshotAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager, JobResultsProvider jobResultsProvider, ClusterService clusterService, Client client, JobDataCountsPersister jobDataCountsPersister) { @@ -53,6 +55,7 @@ public TransportRevertModelSnapshotAction(ThreadPool threadPool, TransportServic this.jobManager = jobManager; this.jobResultsProvider = jobResultsProvider; this.jobDataCountsPersister = jobDataCountsPersister; + this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); } @Override @@ -68,7 +71,7 @@ protected RevertModelSnapshotAction.Response newResponse() { @Override protected void masterOperation(RevertModelSnapshotAction.Request request, ClusterState state, ActionListener listener) { - if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) { + if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", request.getJobId())); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 9fa0f5a5acb87..5867948bbad63 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.RemoteClusterLicenseChecker; @@ -44,7 +45,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; @@ -76,9 +77,10 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction listener) throws Exception { - if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getUpdate().getId(), state)) { + if (migrationEligibilityCheck.datafeedIsEligibleForMigration(request.getUpdate().getId(), state)) { listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update datafeed", request.getUpdate().getId())); return; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 291347028453b..33047c1fca39a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -48,7 +48,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.MlConfigMigrator; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; @@ -92,6 +92,7 @@ public class JobManager { private final ThreadPool threadPool; private final UpdateJobProcessNotifier updateJobProcessNotifier; private final JobConfigProvider jobConfigProvider; + private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; private volatile ByteSizeValue maxModelMemoryLimit; @@ -109,6 +110,7 @@ public JobManager(Environment environment, Settings settings, JobResultsProvider this.threadPool = Objects.requireNonNull(threadPool); this.updateJobProcessNotifier = updateJobProcessNotifier; this.jobConfigProvider = new JobConfigProvider(client); + this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings); clusterService.getClusterSettings() @@ -338,7 +340,7 @@ public void updateJob(UpdateJobAction.Request request, ActionListener(Collections.singletonList( + MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + } + + private static Settings newSettings(boolean migrationEnabled) { + return Settings.builder() + .put(MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION.getKey(), migrationEnabled) + .build(); + } + + private DatafeedConfig createCompatibleDatafeed(String jobId) { + // create a datafeed without aggregations or anything + // else that may cause validation errors + DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("df-" + jobId, jobId); + datafeedBuilder.setIndices(Collections.singletonList("my_index")); + return datafeedBuilder.build(); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java index 92d06724a0488..d9ea035e58234 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlConfigMigratorTests.java @@ -11,9 +11,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -24,7 +21,6 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobTests; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -218,124 +214,6 @@ public void testRemoveJobsAndDatafeeds_removeSome() { assertThat(removalResult.removedDatafeedIds, empty()); } - public void testJobIsEligibleForMigration_givenNodesNotUpToVersion() { - // mixed 6.5 and 6.6 nodes - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .build(); - - assertFalse(MlConfigMigrator.jobIsEligibleForMigration("pre-min-version", clusterState)); - } - - public void testJobIsEligibleForMigration_givenJobNotInClusterState() { - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")).build(); - assertFalse(MlConfigMigrator.jobIsEligibleForMigration("not-in-state", clusterState)); - } - - public void testJobIsEligibleForMigration_givenDeletingJob() { - Job deletingJob = JobTests.buildJobBuilder("deleting-job").setDeleting(true).build(); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(deletingJob, false); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.jobTaskId(deletingJob.getId()), - MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(deletingJob.getId()), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) - ) - .build(); - - assertFalse(MlConfigMigrator.jobIsEligibleForMigration(deletingJob.getId(), clusterState)); - } - - public void testJobIsEligibleForMigration_givenOpenJob() { - Job openJob = JobTests.buildJobBuilder("open-job").build(); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.jobTaskId(openJob.getId()), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(openJob.getId()), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) - ) - .build(); - - assertFalse(MlConfigMigrator.jobIsEligibleForMigration(openJob.getId(), clusterState)); - } - - public void testJobIsEligibleForMigration_givenClosedJob() { - Job closedJob = JobTests.buildJobBuilder("closed-job").build(); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(closedJob, false); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - ) - .build(); - - assertTrue(MlConfigMigrator.jobIsEligibleForMigration(closedJob.getId(), clusterState)); - } - - public void testDatafeedIsEligibleForMigration_givenNodesNotUpToVersion() { - // mixed 6.5 and 6.6 nodes - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .nodes(DiscoveryNodes.builder() - .add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0)) - .add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0))) - .build(); - - assertFalse(MlConfigMigrator.datafeedIsEligibleForMigration("pre-min-version", clusterState)); - } - - public void testDatafeedIsEligibleForMigration_givenDatafeedNotInClusterState() { - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")).build(); - assertFalse(MlConfigMigrator.datafeedIsEligibleForMigration("not-in-state", clusterState)); - } - - public void testDatafeedIsEligibleForMigration_givenStartedDatafeed() { - Job openJob = JobTests.buildJobBuilder("open-job").build(); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(openJob, false); - mlMetadata.putDatafeed(createCompatibleDatafeed(openJob.getId()), Collections.emptyMap()); - String datafeedId = "df-" + openJob.getId(); - - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(MlTasks.datafeedTaskId(datafeedId), MlTasks.DATAFEED_TASK_NAME, - new StartDatafeedAction.DatafeedParams(datafeedId, 0L), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - .putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()) - ) - .build(); - - assertFalse(MlConfigMigrator.datafeedIsEligibleForMigration(datafeedId, clusterState)); - } - - public void testDatafeedIsEligibleForMigration_givenStoppedDatafeed() { - Job job = JobTests.buildJobBuilder("closed-job").build(); - MlMetadata.Builder mlMetadata = new MlMetadata.Builder().putJob(job, false); - mlMetadata.putDatafeed(createCompatibleDatafeed(job.getId()), Collections.emptyMap()); - String datafeedId = "df-" + job.getId(); - - ClusterState clusterState = ClusterState.builder(new ClusterName("migratortests")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build()) - ) - .build(); - - assertTrue(MlConfigMigrator.datafeedIsEligibleForMigration(datafeedId, clusterState)); - } - public void testLimitWrites_GivenBelowLimit() { MlConfigMigrator.JobsAndDatafeeds jobsAndDatafeeds = MlConfigMigrator.limitWrites(Collections.emptyList(), Collections.emptyMap()); assertThat(jobsAndDatafeeds.datafeedConfigs, empty()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 51a3b5d2366b0..b81805fb3fbdf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -11,16 +11,21 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.junit.Before; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -28,13 +33,25 @@ import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MlConfigMigratorIT extends MlSingleNodeTestCase { + private ClusterService clusterService; + + @Before + public void setUpTests() { + clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(nodeSettings(), new HashSet<>(Collections.singletonList( + MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + } + public void testWriteConfigToIndex() throws InterruptedException { final String indexJobId = "job-already-migrated"; @@ -50,8 +67,7 @@ public void testWriteConfigToIndex() throws InterruptedException { // put a job representing a previously migrated job blockingCall(actionListener -> jobConfigProvider.putJob(migratedJob, actionListener), indexResponseHolder, exceptionHolder); - ClusterService clusterService = mock(ClusterService.class); - MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(client(), clusterService); + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); AtomicReference> failedIdsHolder = new AtomicReference<>(); Job foo = buildJobBuilder("foo").build(); @@ -97,7 +113,6 @@ public void testMigrateConfigs() throws InterruptedException { .putCustom(MlMetadata.TYPE, mlMetadata.build())) .build(); - ClusterService clusterService = mock(ClusterService.class); doAnswer(invocation -> { ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class)); @@ -108,7 +123,7 @@ public void testMigrateConfigs() throws InterruptedException { AtomicReference responseHolder = new AtomicReference<>(); // do the migration - MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(client(), clusterService); + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), responseHolder, exceptionHolder); @@ -138,6 +153,56 @@ public void testMigrateConfigs() throws InterruptedException { assertThat(datafeedsHolder.get(), hasSize(1)); assertEquals("df-1", datafeedsHolder.get().get(0).getId()); } + + public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws InterruptedException { + Settings settings = Settings.builder().put(nodeSettings()) + .put(MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION.getKey(), false) + .build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Collections.singletonList( + MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + // and jobs and datafeeds clusterstate + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); + mlMetadata.putJob(buildJobBuilder("job-bar").build(), false); + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df-1", "job-foo"); + builder.setIndices(Collections.singletonList("beats*")); + mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + + // do the migration + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(settings, client(), clusterService); + blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + responseHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertFalse(responseHolder.get()); + + // check the jobs have not been migrated + AtomicReference> jobsHolder = new AtomicReference<>(); + JobConfigProvider jobConfigProvider = new JobConfigProvider(client()); + blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener), + jobsHolder, exceptionHolder); + assertNull(exceptionHolder.get()); + assertThat(jobsHolder.get().isEmpty(), is(true)); + + // check datafeeds have not been migrated + DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client(), xContentRegistry()); + AtomicReference> datafeedsHolder = new AtomicReference<>(); + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener), + datafeedsHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertThat(datafeedsHolder.get().isEmpty(), is(true)); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 5341d0466df63..a24950ed0918e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.ml.job.config.RuleScope; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; @@ -59,6 +60,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.TreeSet; @@ -94,10 +96,13 @@ public class JobManagerTests extends ESTestCase { @Before public void setup() throws Exception { - Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); + Settings settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) + .build(); environment = TestEnvironment.newEnvironment(settings); analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(environment); clusterService = mock(ClusterService.class); + givenClusterSettings(settings); jobResultsProvider = mock(JobResultsProvider.class); auditor = mock(Auditor.class); @@ -550,9 +555,6 @@ private Job.Builder createJob() { } private JobManager createJobManager(Client client) { - ClusterSettings clusterSettings = new ClusterSettings(environment.settings(), - Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT)); - when(clusterService.getClusterSettings()).thenReturn(clusterSettings); return new JobManager(environment, environment.settings(), jobResultsProvider, clusterService, auditor, threadPool, client, updateJobProcessNotifier); } @@ -569,4 +571,11 @@ private BytesReference toBytesReference(ToXContent content) throws IOException { return BytesReference.bytes(xContentBuilder); } } + + private void givenClusterSettings(Settings settings) { + ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(Arrays.asList( + MachineLearningField.MAX_MODEL_MEMORY_LIMIT, + MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + } }