Skip to content

Commit

Permalink
[ML] improve reliability of job stats in larger clusters (elastic#86305)
Browse files Browse the repository at this point in the history
When gather job stats for closed jobs, we may be inadvertently executing on a transport thread. Typically, this is acceptable. But, when there are many jobs and many indices, this has a cascading effect and may cause the cluster to enter a troubling state.

This is main due to how slow security checks can be for search requests when the cluster has many indices.

To alleviate, gathering information about closed jobs is forked to the ML utility thread pool

related: elastic#82255
  • Loading branch information
benwtrent authored Apr 29, 2022
1 parent 6a7bff4 commit 4e481c3
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 49 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/86305.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 86305
summary: Improve reliability of job stats in larger clusters
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
Expand Down Expand Up @@ -63,6 +63,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<
private final AutodetectProcessManager processManager;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final ThreadPool threadPool;

@Inject
public TransportGetJobsStatsAction(
Expand All @@ -71,7 +72,8 @@ public TransportGetJobsStatsAction(
ClusterService clusterService,
AutodetectProcessManager processManager,
JobResultsProvider jobResultsProvider,
JobConfigProvider jobConfigProvider
JobConfigProvider jobConfigProvider,
ThreadPool threadPool
) {
super(
GetJobsStatsAction.NAME,
Expand All @@ -87,6 +89,7 @@ public TransportGetJobsStatsAction(
this.processManager = processManager;
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
this.threadPool = threadPool;
}

@Override
Expand Down Expand Up @@ -140,7 +143,7 @@ protected void taskOperation(GetJobsStatsAction.Request request, JobTask task, A
JobState jobState = MlTasks.getJobState(jobId, tasks);
String assignmentExplanation = pTask.getAssignment().getExplanation();
TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(task));
gatherForecastStats(jobId, forecastStats -> {
jobResultsProvider.getForecastStats(jobId, forecastStats -> {
JobStats jobStats = new JobStats(
jobId,
dataCounts,
Expand Down Expand Up @@ -186,55 +189,60 @@ void gatherStatsForClosedJobs(
};

PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
for (int i = 0; i < closedJobIds.size(); i++) {
int slot = i;
String jobId = closedJobIds.get(i);
gatherForecastStats(jobId, forecastStats -> {
jobResultsProvider.getDataCountsModelSizeAndTimingStats(jobId, (dataCounts, modelSizeStats, timingStats) -> {
JobState jobState = MlTasks.getJobState(jobId, tasks);
PersistentTasksCustomMetadata.PersistentTask<?> pTask = MlTasks.getJobTask(jobId, tasks);
String assignmentExplanation = null;
if (pTask != null) {
assignmentExplanation = pTask.getAssignment().getExplanation();
}
jobStats.set(
slot,
new JobStats(
jobId,
dataCounts,
modelSizeStats,
forecastStats,
jobState,
null,
assignmentExplanation,
null,
timingStats
)
);
if (counter.decrementAndGet() == 0) {
if (searchException.get() != null) {
// there was an error
listener.onFailure(searchException.get());
return;
}
List<JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
listener.onResponse(
new GetJobsStatsAction.Response(
response.getTaskFailures(),
response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
for (int i = 0; i < closedJobIds.size(); i++) {
int slot = i;
String jobId = closedJobIds.get(i);
jobResultsProvider.getForecastStats(jobId, forecastStats -> {
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
.execute(
() -> jobResultsProvider.getDataCountsModelSizeAndTimingStats(
jobId,
(dataCounts, modelSizeStats, timingStats) -> {
JobState jobState = MlTasks.getJobState(jobId, tasks);
PersistentTasksCustomMetadata.PersistentTask<?> pTask = MlTasks.getJobTask(jobId, tasks);
String assignmentExplanation = null;
if (pTask != null) {
assignmentExplanation = pTask.getAssignment().getExplanation();
}
jobStats.set(
slot,
new JobStats(
jobId,
dataCounts,
modelSizeStats,
forecastStats,
jobState,
null,
assignmentExplanation,
null,
timingStats
)
);
if (counter.decrementAndGet() == 0) {
if (searchException.get() != null) {
// there was an error
listener.onFailure(searchException.get());
return;
}
List<JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId));
listener.onResponse(
new GetJobsStatsAction.Response(
response.getTaskFailures(),
response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)
)
);
}
},
errorHandler
)
);
}
}, errorHandler);
}, errorHandler);
}
}

void gatherForecastStats(String jobId, Consumer<ForecastStats> handler, Consumer<Exception> errorHandler) {
jobResultsProvider.getForecastStats(jobId, handler, errorHandler);
}
});
}

static TimeValue durationToTimeValue(Optional<Duration> duration) {
Expand Down

0 comments on commit 4e481c3

Please sign in to comment.