-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] Persist data counts and datafeed timing stats asynchronously #93000
[ML] Persist data counts and datafeed timing stats asynchronously #93000
Conversation
When an anomaly detection job runs, the majority of results originate from the C++ autodetect process, so can be persisted in bulk. However, there are two types of results, namely data counts and datafeed timing stats, that are generated wholly within the ML Java code and where there are serious downsides to batching them up with the output of the C++ process. (If we batched them and the C++ process stopped generating results then the input side stats would also stall, so it is better that the input side stats are written independently.) The approach used in this PR is to write data counts and datafeed timing stats asynchronously _except_ at certain key points, like job flush and close, and datafeed stop. At these key points the latest stats _are_ persisted synchronously, like before. When large amounts of data are being processed the code will generate updated stats documents faster than they can be indexed. The approach taken here is to skip persistence of the newer document if persistence of the previous document is still in progress. This can lead to the stats being slightly out of date while a job is running. However, at key points like flush and close the data counts will be up-to-date, and the datafeed timing stats will get written at least once per datafeed `frequency`, so should not be more out-of-date than that.
Pinging @elastic/ml-core (Team:ML) |
Hi @droberts195, I've created a changelog YAML for you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
logger.trace("[{}] not persisting datafeed timing stats as persistence is disallowed", jobId); | ||
return; | ||
} | ||
if (persistInProgressLatch != null && persistInProgressLatch.await(1, TimeUnit.NANOSECONDS) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
persistInProgressLatch.await(1, TimeUnit.NANOSECONDS)
This looks like you want to test if the latch is waitable, i.e. the count is greater than zero. In which case it could be replaced with persistInProgressLatch.getCount() > 0
When a datafeed starts up it looks at the job's data counts to decide where to pick up from where a previous invocation finished. Previously the datafeed was always getting the data counts from the index. In the case where a datafeed has been stopped and restarted while its corresponding job was continually opened this is incorrect. In this case the data counts need to be obtained from the running job. (In the case where the job was closed and reopened while the datafeed was stopped this does not matter, as closing the job will have persisted the up-to-date data counts.) This bug has always existed, yet is made much more likely to cause a noticeable discrepancy by the changes made in elastic#93000. Fixes elastic#93298
When a datafeed starts up it looks at the job's data counts to decide where to pick up from where a previous invocation finished. Previously the datafeed was always getting the data counts from the index. In the case where a datafeed has been stopped and restarted while its corresponding job was continually opened this is incorrect. In this case the data counts need to be obtained from the running job. (In the case where the job was closed and reopened while the datafeed was stopped this does not matter, as closing the job will have persisted the up-to-date data counts.) This bug has always existed, yet is made much more likely to cause a noticeable discrepancy by the changes made in #93000. Fixes #93298
When an anomaly detection job runs, the majority of results originate from the C++ autodetect process, so can be persisted in bulk. However, there are two types of results, namely data counts and datafeed timing stats, that are generated wholly within the ML Java code and where there are serious downsides to batching them up with the output of the C++ process. (If we batched them and the C++ process stopped generating results then the input side stats would also stall, so it is better that the input side stats are written independently.)
The approach used in this PR is to write data counts and datafeed timing stats asynchronously except at certain key points, like job flush and close, and datafeed stop. At these key points the latest stats are persisted synchronously, like before. When large amounts of data are being processed the code will generate updated stats documents faster than they can be indexed. The approach taken here is to skip persistence of the newer document if persistence of the previous document is still in progress. This can lead to the stats being slightly out of date while a job is running. However, at key points like flush and close the data counts will be up-to-date, and the datafeed timing stats will get written at least once per datafeed
frequency
, so should not be more out-of-date than that.