Skip to content

Commit

Permalink
HLRC for delete expired data by job Id (#57722) (#57975)
Browse files Browse the repository at this point in the history
High level rest client changes for #57337
  • Loading branch information
davidkyle authored Jun 12, 2020
1 parent c8031c6 commit 39020f3
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ static Request deleteExpiredData(DeleteExpiredDataRequest deleteExpiredDataReque
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ml")
.addPathPartAsIs("_delete_expired_data")
.addPathPart(deleteExpiredDataRequest.getJobId())
.build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
request.setEntity(createEntity(deleteExpiredDataRequest, REQUEST_BODY_CONTENT_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,21 @@ public class DeleteExpiredDataRequest extends ActionRequest implements ToXConten

static final String REQUESTS_PER_SECOND = "requests_per_second";
static final String TIMEOUT = "timeout";
static final String JOB_ID = "job_id";

private final String jobId;
private final Float requestsPerSecond;
private final TimeValue timeout;

/**
* Create a new request to delete expired data
*/
public DeleteExpiredDataRequest() {
this(null, null);
this(null, null, null);
}

public DeleteExpiredDataRequest(Float requestsPerSecond, TimeValue timeout) {
public DeleteExpiredDataRequest(String jobId, Float requestsPerSecond, TimeValue timeout) {
this.jobId = jobId;
this.requestsPerSecond = requestsPerSecond;
this.timeout = timeout;
}
Expand All @@ -68,13 +73,24 @@ public TimeValue getTimeout() {
return timeout;
}

/**
* The optional job id
*
* The default is `null` meaning all jobs.
* @return The job id or null
*/
public String getJobId() {
return jobId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteExpiredDataRequest that = (DeleteExpiredDataRequest) o;
return Objects.equals(requestsPerSecond, that.requestsPerSecond) &&
Objects.equals(timeout, that.timeout);
Objects.equals(timeout, that.timeout) &&
Objects.equals(jobId, that.jobId);
}

@Override
Expand All @@ -83,12 +99,15 @@ public ActionRequestValidationException validate() {
}

public int hashCode() {
return Objects.hash(requestsPerSecond, timeout);
return Objects.hash(requestsPerSecond, timeout, jobId);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (jobId != null) {
builder.field(JOB_ID, jobId);
}
if (requestsPerSecond != null) {
builder.field(REQUESTS_PER_SECOND, requestsPerSecond);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,23 @@ public void testCloseJob() throws Exception {

public void testDeleteExpiredData() throws Exception {
float requestsPerSec = randomBoolean() ? -1.0f : (float)randomDoubleBetween(0.0, 100000.0, false);
String jobId = randomBoolean() ? null : randomAlphaOfLength(8);
DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest(
jobId,
requestsPerSec,
TimeValue.timeValueHours(1));

Request request = MLRequestConverters.deleteExpiredData(deleteExpiredDataRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_ml/_delete_expired_data", request.getEndpoint());
assertEquals("{\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", requestEntityToString(request));

String expectedPath = jobId == null ? "/_ml/_delete_expired_data" : "/_ml/_delete_expired_data/" + jobId;
assertEquals(expectedPath, request.getEndpoint());
if (jobId == null) {
assertEquals("{\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", requestEntityToString(request));
} else {
assertEquals("{\"job_id\":\"" + jobId + "\",\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}",
requestEntityToString(request));
}
}

public void testDeleteJob() {
Expand Down Expand Up @@ -946,7 +955,7 @@ public void testPutFilter() throws IOException {
}
}

public void testGetFilter() throws IOException {
public void testGetFilter() {
String id = randomAlphaOfLength(10);
GetFiltersRequest getFiltersRequest = new GetFiltersRequest();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2037,8 +2037,9 @@ public void testDeleteExpiredData() throws IOException, InterruptedException {
{
// tag::delete-expired-data-request
DeleteExpiredDataRequest request = new DeleteExpiredDataRequest( // <1>
1000.0f, // <2>
TimeValue.timeValueHours(12) // <3>
null, // <2>
1000.0f, // <3>
TimeValue.timeValueHours(12) // <4>
);

// end::delete-expired-data-request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ public class DeleteExpiredDataRequestTests extends AbstractXContentTestCase<Dele
private static ConstructingObjectParser<DeleteExpiredDataRequest, Void> PARSER = new ConstructingObjectParser<>(
"delete_expired_data_request",
true,
(a) -> new DeleteExpiredDataRequest((Float) a[0], (TimeValue) a[1])
(a) -> new DeleteExpiredDataRequest((String) a[0], (Float) a[1], (TimeValue) a[2])
);
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(),
new ParseField(DeleteExpiredDataRequest.JOB_ID));
PARSER.declareFloat(ConstructingObjectParser.optionalConstructorArg(),
new ParseField(DeleteExpiredDataRequest.REQUESTS_PER_SECOND));
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
Expand All @@ -46,7 +48,9 @@ public class DeleteExpiredDataRequestTests extends AbstractXContentTestCase<Dele

@Override
protected DeleteExpiredDataRequest createTestInstance() {
return new DeleteExpiredDataRequest(randomBoolean() ? null : randomFloat(),
return new DeleteExpiredDataRequest(
randomBoolean() ? null : randomAlphaOfLength(6),
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test"));
}

Expand Down
5 changes: 3 additions & 2 deletions docs/java-rest/high-level/ml/delete-expired-data.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ A `DeleteExpiredDataRequest` object does not require any arguments.
include-tagged::{doc-tests-file}[{api}-request]
---------------------------------------------------
<1> Constructing a new request.
<2> Providing requests per second throttling for the
<2> Optionally set a job ID. Use `null` for the default wild card all `*`.
<3> Providing requests per second throttling for the
deletion processes. Default is no throttling.
<3> Setting how long the deletion processes will be allowed
<4> Setting how long the deletion processes will be allowed
to run before they are canceled. Default value is `8h` (8 hours).

[id="{upid}-{api}-response"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.io.IOException;
Expand Down Expand Up @@ -51,9 +51,17 @@ public static class Request extends ActionRequest {
PARSER.declareString(Request::setJobId, Job.ID);
}

public static Request parseRequest(String jobId, XContentParser parser) {
Request request = PARSER.apply(parser, null);
if (jobId != null) {
request.jobId = jobId;
}
return request;
}

private Float requestsPerSecond;
private TimeValue timeout;
private String jobId = Metadata.ALL;
private String jobId;

public Request() {}

Expand All @@ -72,7 +80,7 @@ public Request(StreamInput in) throws IOException {
this.timeout = null;
}
if (in.getVersion().onOrAfter(Version.V_7_9_0)) {
jobId = in.readString();
jobId = in.readOptionalString();
}
}

Expand Down Expand Up @@ -136,7 +144,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalTimeValue(timeout);
}
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeString(jobId);
out.writeOptionalString(jobId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ private void deleteExpiredData(DeleteExpiredDataAction.Request request,
List<MlDataRemover> dataRemovers,
ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier) {


Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
// If there is no throttle provided, default to none
float requestsPerSec = request.getRequestsPerSecond() == null ? Float.POSITIVE_INFINITY : request.getRequestsPerSecond();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ml.rest;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
Expand Down Expand Up @@ -44,11 +43,14 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String jobId = restRequest.param(Job.ID.getPreferredName());

DeleteExpiredDataAction.Request request;
if (restRequest.hasContent()) {
request = DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null);
request = DeleteExpiredDataAction.Request.parseRequest(jobId, restRequest.contentParser());
} else {
request = new DeleteExpiredDataAction.Request();
request.setJobId(jobId);

String perSecondParam = restRequest.param(DeleteExpiredDataAction.Request.REQUESTS_PER_SECOND.getPreferredName());
if (perSecondParam != null) {
Expand All @@ -67,11 +69,6 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
}
}

String jobId = restRequest.param(Job.ID.getPreferredName());
if (Strings.isNullOrEmpty(jobId) == false) {
request.setJobId(jobId);
}

return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

0 comments on commit 39020f3

Please sign in to comment.