From 39020f3900d2ac0b621cc5026cf0362c245cc489 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 12 Jun 2020 09:44:17 +0100 Subject: [PATCH] HLRC for delete expired data by job Id (#57722) (#57975) High level rest client changes for #57337 --- .../client/MLRequestConverters.java | 1 + .../client/ml/DeleteExpiredDataRequest.java | 27 ++++++++++++++++--- .../client/MLRequestConvertersTests.java | 15 ++++++++--- .../MlClientDocumentationIT.java | 5 ++-- .../ml/DeleteExpiredDataRequestTests.java | 8 ++++-- .../ml/delete-expired-data.asciidoc | 5 ++-- .../ml/action/DeleteExpiredDataAction.java | 16 ++++++++--- .../TransportDeleteExpiredDataAction.java | 2 -- .../ml/rest/RestDeleteExpiredDataAction.java | 11 +++----- 9 files changed, 64 insertions(+), 26 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index ec6b2da719bf9..f34e144fcec6c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -171,6 +171,7 @@ static Request deleteExpiredData(DeleteExpiredDataRequest deleteExpiredDataReque String endpoint = new EndpointBuilder() .addPathPartAsIs("_ml") .addPathPartAsIs("_delete_expired_data") + .addPathPart(deleteExpiredDataRequest.getJobId()) .build(); Request request = new Request(HttpDelete.METHOD_NAME, endpoint); request.setEntity(createEntity(deleteExpiredDataRequest, REQUEST_BODY_CONTENT_TYPE)); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java index 7a9cda69ffca7..af37e8c5e3198 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java @@ -34,16 +34,21 @@ public class DeleteExpiredDataRequest extends ActionRequest implements ToXConten static final String REQUESTS_PER_SECOND = "requests_per_second"; static final String TIMEOUT = "timeout"; + static final String JOB_ID = "job_id"; + + private final String jobId; private final Float requestsPerSecond; private final TimeValue timeout; + /** * Create a new request to delete expired data */ public DeleteExpiredDataRequest() { - this(null, null); + this(null, null, null); } - public DeleteExpiredDataRequest(Float requestsPerSecond, TimeValue timeout) { + public DeleteExpiredDataRequest(String jobId, Float requestsPerSecond, TimeValue timeout) { + this.jobId = jobId; this.requestsPerSecond = requestsPerSecond; this.timeout = timeout; } @@ -68,13 +73,24 @@ public TimeValue getTimeout() { return timeout; } + /** + * The optional job id + * + * The default is `null` meaning all jobs. + * @return The job id or null + */ + public String getJobId() { + return jobId; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; DeleteExpiredDataRequest that = (DeleteExpiredDataRequest) o; return Objects.equals(requestsPerSecond, that.requestsPerSecond) && - Objects.equals(timeout, that.timeout); + Objects.equals(timeout, that.timeout) && + Objects.equals(jobId, that.jobId); } @Override @@ -83,12 +99,15 @@ public ActionRequestValidationException validate() { } public int hashCode() { - return Objects.hash(requestsPerSecond, timeout); + return Objects.hash(requestsPerSecond, timeout, jobId); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + if (jobId != null) { + builder.field(JOB_ID, jobId); + } if (requestsPerSecond != null) { builder.field(REQUESTS_PER_SECOND, requestsPerSecond); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index 56ef8225da53c..fd99f06aeb9bf 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -216,14 +216,23 @@ public void testCloseJob() throws Exception { public void testDeleteExpiredData() throws Exception { float requestsPerSec = randomBoolean() ? -1.0f : (float)randomDoubleBetween(0.0, 100000.0, false); + String jobId = randomBoolean() ? null : randomAlphaOfLength(8); DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest( + jobId, requestsPerSec, TimeValue.timeValueHours(1)); Request request = MLRequestConverters.deleteExpiredData(deleteExpiredDataRequest); assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); - assertEquals("/_ml/_delete_expired_data", request.getEndpoint()); - assertEquals("{\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", requestEntityToString(request)); + + String expectedPath = jobId == null ? "/_ml/_delete_expired_data" : "/_ml/_delete_expired_data/" + jobId; + assertEquals(expectedPath, request.getEndpoint()); + if (jobId == null) { + assertEquals("{\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", requestEntityToString(request)); + } else { + assertEquals("{\"job_id\":\"" + jobId + "\",\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", + requestEntityToString(request)); + } } public void testDeleteJob() { @@ -946,7 +955,7 @@ public void testPutFilter() throws IOException { } } - public void testGetFilter() throws IOException { + public void testGetFilter() { String id = randomAlphaOfLength(10); GetFiltersRequest getFiltersRequest = new GetFiltersRequest(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 5e3f1ff9be03d..1c39864ec07b1 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -2037,8 +2037,9 @@ public void testDeleteExpiredData() throws IOException, InterruptedException { { // tag::delete-expired-data-request DeleteExpiredDataRequest request = new DeleteExpiredDataRequest( // <1> - 1000.0f, // <2> - TimeValue.timeValueHours(12) // <3> + null, // <2> + 1000.0f, // <3> + TimeValue.timeValueHours(12) // <4> ); // end::delete-expired-data-request diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java index b5ed7a1e94b09..f892de5beb581 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java @@ -33,9 +33,11 @@ public class DeleteExpiredDataRequestTests extends AbstractXContentTestCase PARSER = new ConstructingObjectParser<>( "delete_expired_data_request", true, - (a) -> new DeleteExpiredDataRequest((Float) a[0], (TimeValue) a[1]) + (a) -> new DeleteExpiredDataRequest((String) a[0], (Float) a[1], (TimeValue) a[2]) ); static { + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), + new ParseField(DeleteExpiredDataRequest.JOB_ID)); PARSER.declareFloat(ConstructingObjectParser.optionalConstructorArg(), new ParseField(DeleteExpiredDataRequest.REQUESTS_PER_SECOND)); PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), @@ -46,7 +48,9 @@ public class DeleteExpiredDataRequestTests extends AbstractXContentTestCase Constructing a new request. -<2> Providing requests per second throttling for the +<2> Optionally set a job ID. Use `null` for the default wild card all `*`. +<3> Providing requests per second throttling for the deletion processes. Default is no throttling. -<3> Setting how long the deletion processes will be allowed +<4> Setting how long the deletion processes will be allowed to run before they are canceled. Default value is `8h` (8 hours). [id="{upid}-{api}-response"] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java index bf0502a426d5d..8de7a1fb2b6f2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -20,6 +19,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.job.config.Job; import java.io.IOException; @@ -51,9 +51,17 @@ public static class Request extends ActionRequest { PARSER.declareString(Request::setJobId, Job.ID); } + public static Request parseRequest(String jobId, XContentParser parser) { + Request request = PARSER.apply(parser, null); + if (jobId != null) { + request.jobId = jobId; + } + return request; + } + private Float requestsPerSecond; private TimeValue timeout; - private String jobId = Metadata.ALL; + private String jobId; public Request() {} @@ -72,7 +80,7 @@ public Request(StreamInput in) throws IOException { this.timeout = null; } if (in.getVersion().onOrAfter(Version.V_7_9_0)) { - jobId = in.readString(); + jobId = in.readOptionalString(); } } @@ -136,7 +144,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalTimeValue(timeout); } if (out.getVersion().onOrAfter(Version.V_7_9_0)) { - out.writeString(jobId); + out.writeOptionalString(jobId); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index 649c3db0a9c54..970c09f2d4143 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -114,8 +114,6 @@ private void deleteExpiredData(DeleteExpiredDataAction.Request request, List dataRemovers, ActionListener listener, Supplier isTimedOutSupplier) { - - Iterator dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); // If there is no throttle provided, default to none float requestsPerSec = request.getRequestsPerSecond() == null ? Float.POSITIVE_INFINITY : request.getRequestsPerSecond(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java index 0c906cad82f14..21529f1800804 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.ml.rest; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; @@ -44,11 +43,14 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String jobId = restRequest.param(Job.ID.getPreferredName()); + DeleteExpiredDataAction.Request request; if (restRequest.hasContent()) { - request = DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null); + request = DeleteExpiredDataAction.Request.parseRequest(jobId, restRequest.contentParser()); } else { request = new DeleteExpiredDataAction.Request(); + request.setJobId(jobId); String perSecondParam = restRequest.param(DeleteExpiredDataAction.Request.REQUESTS_PER_SECOND.getPreferredName()); if (perSecondParam != null) { @@ -67,11 +69,6 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient } } - String jobId = restRequest.param(Job.ID.getPreferredName()); - if (Strings.isNullOrEmpty(jobId) == false) { - request.setJobId(jobId); - } - return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel)); } }